530 lines
19 KiB
Python
530 lines
19 KiB
Python
"""Unit tests for the Windows Discovery Plugin.
|
|
|
|
Tests use mocks for the winrm session to avoid requiring actual
|
|
Windows hosts for testing.
|
|
"""
|
|
|
|
import json
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from iac_reverse.models import CpuArchitecture, PlatformCategory, ProviderType
|
|
from iac_reverse.scanner.scanner import AuthenticationError
|
|
from iac_reverse.scanner.windows_plugin import (
|
|
InsufficientPrivilegesError,
|
|
WindowsDiscoveryPlugin,
|
|
WinRMNotEnabledError,
|
|
WMIQueryError,
|
|
WINDOWS_RESOURCE_TYPES,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def plugin():
|
|
"""Create a fresh WindowsDiscoveryPlugin instance."""
|
|
return WindowsDiscoveryPlugin()
|
|
|
|
|
|
@pytest.fixture
|
|
def credentials():
|
|
"""Standard test credentials."""
|
|
return {
|
|
"host": "192.168.1.100",
|
|
"username": "admin",
|
|
"password": "secret",
|
|
"transport": "ntlm",
|
|
"port": "5986",
|
|
"use_ssl": "true",
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_session():
|
|
"""Create a mock WinRM session."""
|
|
session = MagicMock()
|
|
return session
|
|
|
|
|
|
def make_ps_result(stdout: str = "", stderr: str = "", status_code: int = 0):
|
|
"""Helper to create a mock PowerShell result."""
|
|
result = MagicMock()
|
|
result.std_out = stdout.encode("utf-8")
|
|
result.std_err = stderr.encode("utf-8")
|
|
result.status_code = status_code
|
|
return result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Authentication Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAuthenticate:
|
|
"""Tests for WindowsDiscoveryPlugin.authenticate()."""
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_authenticate_success(self, mock_session_cls, plugin, credentials):
|
|
"""Successful authentication creates a session."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.return_value = make_ps_result("WIN-SERVER01")
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
|
|
mock_session_cls.assert_called_once_with(
|
|
"https://192.168.1.100:5986/wsman",
|
|
auth=("admin", "secret"),
|
|
transport="ntlm",
|
|
server_cert_validation="ignore",
|
|
)
|
|
assert plugin._host == "192.168.1.100"
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_authenticate_http_no_ssl(self, mock_session_cls, plugin):
|
|
"""Authentication with use_ssl=false uses HTTP."""
|
|
creds = {
|
|
"host": "myhost",
|
|
"username": "user",
|
|
"password": "pass",
|
|
"transport": "ntlm",
|
|
"port": "5985",
|
|
"use_ssl": "false",
|
|
}
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.return_value = make_ps_result("MYHOST")
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(creds)
|
|
|
|
mock_session_cls.assert_called_once_with(
|
|
"http://myhost:5985/wsman",
|
|
auth=("user", "pass"),
|
|
transport="ntlm",
|
|
server_cert_validation="validate",
|
|
)
|
|
|
|
def test_authenticate_missing_host(self, plugin):
|
|
"""Missing host raises AuthenticationError."""
|
|
creds = {"username": "user", "password": "pass"}
|
|
with pytest.raises(AuthenticationError, match="host is required"):
|
|
plugin.authenticate(creds)
|
|
|
|
def test_authenticate_missing_username(self, plugin):
|
|
"""Missing username raises AuthenticationError."""
|
|
creds = {"host": "myhost", "password": "pass"}
|
|
with pytest.raises(AuthenticationError, match="username is required"):
|
|
plugin.authenticate(creds)
|
|
|
|
def test_authenticate_missing_password(self, plugin):
|
|
"""Missing password raises AuthenticationError."""
|
|
creds = {"host": "myhost", "username": "user"}
|
|
with pytest.raises(AuthenticationError, match="password is required"):
|
|
plugin.authenticate(creds)
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_authenticate_connection_refused(self, mock_session_cls, plugin, credentials):
|
|
"""Connection refused raises WinRMNotEnabledError."""
|
|
mock_session_cls.side_effect = Exception("connection refused")
|
|
|
|
with pytest.raises(WinRMNotEnabledError):
|
|
plugin.authenticate(credentials)
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_authenticate_access_denied(self, mock_session_cls, plugin, credentials):
|
|
"""Access denied during auth test raises AuthenticationError."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.return_value = make_ps_result(
|
|
stderr="Access is denied", status_code=1
|
|
)
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
with pytest.raises(AuthenticationError):
|
|
plugin.authenticate(credentials)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Platform Category and Resource Types Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPlatformInfo:
|
|
"""Tests for platform category and resource type listing."""
|
|
|
|
def test_get_platform_category(self, plugin):
|
|
"""Returns PlatformCategory.WINDOWS."""
|
|
assert plugin.get_platform_category() == PlatformCategory.WINDOWS
|
|
|
|
def test_list_supported_resource_types(self, plugin):
|
|
"""Returns all 13 Windows resource types."""
|
|
types = plugin.list_supported_resource_types()
|
|
assert len(types) == 13
|
|
assert "windows_service" in types
|
|
assert "windows_scheduled_task" in types
|
|
assert "windows_iis_site" in types
|
|
assert "windows_iis_app_pool" in types
|
|
assert "windows_network_adapter" in types
|
|
assert "windows_firewall_rule" in types
|
|
assert "windows_installed_software" in types
|
|
assert "windows_feature" in types
|
|
assert "windows_hyperv_vm" in types
|
|
assert "windows_hyperv_switch" in types
|
|
assert "windows_dns_record" in types
|
|
assert "windows_local_user" in types
|
|
assert "windows_local_group" in types
|
|
|
|
def test_list_endpoints_before_auth(self, plugin):
|
|
"""Returns empty list before authentication."""
|
|
assert plugin.list_endpoints() == []
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_list_endpoints_after_auth(self, mock_session_cls, plugin, credentials):
|
|
"""Returns host after authentication."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.return_value = make_ps_result("SERVER")
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
assert plugin.list_endpoints() == ["192.168.1.100"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Architecture Detection Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDetectArchitecture:
|
|
"""Tests for CPU architecture detection via WMI."""
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_detect_amd64(self, mock_session_cls, plugin, credentials):
|
|
"""Architecture code 9 maps to AMD64."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"), # auth test
|
|
make_ps_result("9"), # architecture query
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
arch = plugin.detect_architecture("192.168.1.100")
|
|
assert arch == CpuArchitecture.AMD64
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_detect_arm(self, mock_session_cls, plugin, credentials):
|
|
"""Architecture code 5 maps to ARM."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("5"),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
arch = plugin.detect_architecture("192.168.1.100")
|
|
assert arch == CpuArchitecture.ARM
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_detect_aarch64(self, mock_session_cls, plugin, credentials):
|
|
"""Architecture code 12 maps to AARCH64."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("12"),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
arch = plugin.detect_architecture("192.168.1.100")
|
|
assert arch == CpuArchitecture.AARCH64
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_detect_architecture_wmi_failure(self, mock_session_cls, plugin, credentials):
|
|
"""WMI query failure raises WMIQueryError."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result(stderr="WMI error", status_code=1),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
with pytest.raises(WMIQueryError):
|
|
plugin.detect_architecture("192.168.1.100")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Resource Discovery Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDiscoverResources:
|
|
"""Tests for resource discovery via WinRM."""
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_services(self, mock_session_cls, plugin, credentials):
|
|
"""Discovers Windows services."""
|
|
services_json = json.dumps([
|
|
{"Name": "wuauserv", "DisplayName": "Windows Update", "Status": 4, "StartType": 3},
|
|
{"Name": "Spooler", "DisplayName": "Print Spooler", "Status": 4, "StartType": 2},
|
|
])
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"), # auth
|
|
make_ps_result("9"), # architecture
|
|
make_ps_result("false"), # hyperv check
|
|
make_ps_result(services_json), # services
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_service"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 2
|
|
assert result.resources[0].resource_type == "windows_service"
|
|
assert result.resources[0].name == "wuauserv"
|
|
assert result.resources[0].provider == ProviderType.WINDOWS
|
|
assert result.resources[0].platform_category == PlatformCategory.WINDOWS
|
|
assert result.resources[0].architecture == CpuArchitecture.AMD64
|
|
callback.assert_called()
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_scheduled_tasks(self, mock_session_cls, plugin, credentials):
|
|
"""Discovers scheduled tasks."""
|
|
tasks_json = json.dumps([
|
|
{"TaskName": "Backup", "TaskPath": "\\Custom\\", "State": 3},
|
|
])
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("9"),
|
|
make_ps_result("false"),
|
|
make_ps_result(tasks_json),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_scheduled_task"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 1
|
|
assert result.resources[0].resource_type == "windows_scheduled_task"
|
|
assert result.resources[0].name == "Backup"
|
|
assert result.resources[0].attributes["task_path"] == "\\Custom\\"
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_hyperv_skipped_when_not_installed(
|
|
self, mock_session_cls, plugin, credentials
|
|
):
|
|
"""Hyper-V resources are skipped when role is not installed."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("9"),
|
|
make_ps_result("false"), # hyperv NOT installed
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_hyperv_vm"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 0
|
|
assert any("Hyper-V role not installed" in w for w in result.warnings)
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_hyperv_vms_when_installed(
|
|
self, mock_session_cls, plugin, credentials
|
|
):
|
|
"""Hyper-V VMs are discovered when role is installed."""
|
|
vms_json = json.dumps([
|
|
{
|
|
"Name": "TestVM",
|
|
"VMId": "abc-123",
|
|
"State": 2,
|
|
"MemoryAssigned": 4294967296,
|
|
"ProcessorCount": 4,
|
|
"Generation": 2,
|
|
}
|
|
])
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("9"),
|
|
make_ps_result("true"), # hyperv IS installed
|
|
make_ps_result(vms_json),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_hyperv_vm"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 1
|
|
assert result.resources[0].resource_type == "windows_hyperv_vm"
|
|
assert result.resources[0].name == "TestVM"
|
|
assert result.resources[0].attributes["vm_id"] == "abc-123"
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_local_users(self, mock_session_cls, plugin, credentials):
|
|
"""Discovers local user accounts."""
|
|
users_json = json.dumps([
|
|
{"Name": "Administrator", "Enabled": True, "Description": "Built-in admin", "LastLogon": "2024-01-01"},
|
|
])
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("9"),
|
|
make_ps_result("false"),
|
|
make_ps_result(users_json),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_local_user"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 1
|
|
assert result.resources[0].resource_type == "windows_local_user"
|
|
assert result.resources[0].name == "Administrator"
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_insufficient_privileges(
|
|
self, mock_session_cls, plugin, credentials
|
|
):
|
|
"""Insufficient privileges are captured as errors."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("9"),
|
|
make_ps_result("false"),
|
|
make_ps_result(stderr="Access is denied", status_code=1),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_service"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 0
|
|
assert len(result.errors) == 1
|
|
assert "Insufficient privileges" in result.errors[0]
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_wmi_query_failure(
|
|
self, mock_session_cls, plugin, credentials
|
|
):
|
|
"""WMI query failures are captured as errors."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("9"),
|
|
make_ps_result("false"),
|
|
make_ps_result(stderr="Invalid class", status_code=1),
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_feature"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 0
|
|
assert len(result.errors) == 1
|
|
assert "WMI query failed" in result.errors[0]
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_discover_empty_result(self, mock_session_cls, plugin, credentials):
|
|
"""Empty PowerShell output returns no resources."""
|
|
mock_session = MagicMock()
|
|
mock_session.run_ps.side_effect = [
|
|
make_ps_result("SERVER"),
|
|
make_ps_result("9"),
|
|
make_ps_result("false"),
|
|
make_ps_result(""), # empty output
|
|
]
|
|
mock_session_cls.return_value = mock_session
|
|
|
|
plugin.authenticate(credentials)
|
|
callback = MagicMock()
|
|
result = plugin.discover_resources(
|
|
endpoints=["192.168.1.100"],
|
|
resource_types=["windows_local_group"],
|
|
progress_callback=callback,
|
|
)
|
|
|
|
assert len(result.resources) == 0
|
|
assert len(result.errors) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Error Handling Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Tests for WinRM-specific error handling."""
|
|
|
|
def test_winrm_not_enabled_error(self):
|
|
"""WinRMNotEnabledError contains host info."""
|
|
err = WinRMNotEnabledError("myhost", "connection refused")
|
|
assert "myhost" in str(err)
|
|
assert "connection refused" in str(err)
|
|
assert err.host == "myhost"
|
|
|
|
def test_wmi_query_error(self):
|
|
"""WMIQueryError contains query info."""
|
|
err = WMIQueryError("Win32_Processor", "invalid class")
|
|
assert "Win32_Processor" in str(err)
|
|
assert "invalid class" in str(err)
|
|
assert err.query == "Win32_Processor"
|
|
|
|
def test_insufficient_privileges_error(self):
|
|
"""InsufficientPrivilegesError contains operation info."""
|
|
err = InsufficientPrivilegesError("Get-Service", "access denied")
|
|
assert "Get-Service" in str(err)
|
|
assert "access denied" in str(err)
|
|
assert err.operation == "Get-Service"
|
|
|
|
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
|
|
def test_no_session_raises_winrm_not_enabled(
|
|
self, mock_session_cls, plugin
|
|
):
|
|
"""Running PowerShell without session raises WinRMNotEnabledError."""
|
|
plugin._host = "testhost"
|
|
with pytest.raises(WinRMNotEnabledError):
|
|
plugin._run_powershell("Get-Service")
|