Files
SnarfCode/tests/unit/test_windows_plugin.py
2026-05-22 00:19:30 -04:00

530 lines
19 KiB
Python

"""Unit tests for the Windows Discovery Plugin.
Tests use mocks for the winrm session to avoid requiring actual
Windows hosts for testing.
"""
import json
from unittest.mock import MagicMock, patch
import pytest
from iac_reverse.models import CpuArchitecture, PlatformCategory, ProviderType
from iac_reverse.scanner.scanner import AuthenticationError
from iac_reverse.scanner.windows_plugin import (
InsufficientPrivilegesError,
WindowsDiscoveryPlugin,
WinRMNotEnabledError,
WMIQueryError,
WINDOWS_RESOURCE_TYPES,
)
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def plugin():
"""Create a fresh WindowsDiscoveryPlugin instance."""
return WindowsDiscoveryPlugin()
@pytest.fixture
def credentials():
"""Standard test credentials."""
return {
"host": "192.168.1.100",
"username": "admin",
"password": "secret",
"transport": "ntlm",
"port": "5986",
"use_ssl": "true",
}
@pytest.fixture
def mock_session():
"""Create a mock WinRM session."""
session = MagicMock()
return session
def make_ps_result(stdout: str = "", stderr: str = "", status_code: int = 0):
"""Helper to create a mock PowerShell result."""
result = MagicMock()
result.std_out = stdout.encode("utf-8")
result.std_err = stderr.encode("utf-8")
result.status_code = status_code
return result
# ---------------------------------------------------------------------------
# Authentication Tests
# ---------------------------------------------------------------------------
class TestAuthenticate:
"""Tests for WindowsDiscoveryPlugin.authenticate()."""
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_authenticate_success(self, mock_session_cls, plugin, credentials):
"""Successful authentication creates a session."""
mock_session = MagicMock()
mock_session.run_ps.return_value = make_ps_result("WIN-SERVER01")
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
mock_session_cls.assert_called_once_with(
"https://192.168.1.100:5986/wsman",
auth=("admin", "secret"),
transport="ntlm",
server_cert_validation="ignore",
)
assert plugin._host == "192.168.1.100"
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_authenticate_http_no_ssl(self, mock_session_cls, plugin):
"""Authentication with use_ssl=false uses HTTP."""
creds = {
"host": "myhost",
"username": "user",
"password": "pass",
"transport": "ntlm",
"port": "5985",
"use_ssl": "false",
}
mock_session = MagicMock()
mock_session.run_ps.return_value = make_ps_result("MYHOST")
mock_session_cls.return_value = mock_session
plugin.authenticate(creds)
mock_session_cls.assert_called_once_with(
"http://myhost:5985/wsman",
auth=("user", "pass"),
transport="ntlm",
server_cert_validation="validate",
)
def test_authenticate_missing_host(self, plugin):
"""Missing host raises AuthenticationError."""
creds = {"username": "user", "password": "pass"}
with pytest.raises(AuthenticationError, match="host is required"):
plugin.authenticate(creds)
def test_authenticate_missing_username(self, plugin):
"""Missing username raises AuthenticationError."""
creds = {"host": "myhost", "password": "pass"}
with pytest.raises(AuthenticationError, match="username is required"):
plugin.authenticate(creds)
def test_authenticate_missing_password(self, plugin):
"""Missing password raises AuthenticationError."""
creds = {"host": "myhost", "username": "user"}
with pytest.raises(AuthenticationError, match="password is required"):
plugin.authenticate(creds)
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_authenticate_connection_refused(self, mock_session_cls, plugin, credentials):
"""Connection refused raises WinRMNotEnabledError."""
mock_session_cls.side_effect = Exception("connection refused")
with pytest.raises(WinRMNotEnabledError):
plugin.authenticate(credentials)
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_authenticate_access_denied(self, mock_session_cls, plugin, credentials):
"""Access denied during auth test raises AuthenticationError."""
mock_session = MagicMock()
mock_session.run_ps.return_value = make_ps_result(
stderr="Access is denied", status_code=1
)
mock_session_cls.return_value = mock_session
with pytest.raises(AuthenticationError):
plugin.authenticate(credentials)
# ---------------------------------------------------------------------------
# Platform Category and Resource Types Tests
# ---------------------------------------------------------------------------
class TestPlatformInfo:
"""Tests for platform category and resource type listing."""
def test_get_platform_category(self, plugin):
"""Returns PlatformCategory.WINDOWS."""
assert plugin.get_platform_category() == PlatformCategory.WINDOWS
def test_list_supported_resource_types(self, plugin):
"""Returns all 13 Windows resource types."""
types = plugin.list_supported_resource_types()
assert len(types) == 13
assert "windows_service" in types
assert "windows_scheduled_task" in types
assert "windows_iis_site" in types
assert "windows_iis_app_pool" in types
assert "windows_network_adapter" in types
assert "windows_firewall_rule" in types
assert "windows_installed_software" in types
assert "windows_feature" in types
assert "windows_hyperv_vm" in types
assert "windows_hyperv_switch" in types
assert "windows_dns_record" in types
assert "windows_local_user" in types
assert "windows_local_group" in types
def test_list_endpoints_before_auth(self, plugin):
"""Returns empty list before authentication."""
assert plugin.list_endpoints() == []
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_list_endpoints_after_auth(self, mock_session_cls, plugin, credentials):
"""Returns host after authentication."""
mock_session = MagicMock()
mock_session.run_ps.return_value = make_ps_result("SERVER")
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
assert plugin.list_endpoints() == ["192.168.1.100"]
# ---------------------------------------------------------------------------
# Architecture Detection Tests
# ---------------------------------------------------------------------------
class TestDetectArchitecture:
"""Tests for CPU architecture detection via WMI."""
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_detect_amd64(self, mock_session_cls, plugin, credentials):
"""Architecture code 9 maps to AMD64."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"), # auth test
make_ps_result("9"), # architecture query
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
arch = plugin.detect_architecture("192.168.1.100")
assert arch == CpuArchitecture.AMD64
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_detect_arm(self, mock_session_cls, plugin, credentials):
"""Architecture code 5 maps to ARM."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("5"),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
arch = plugin.detect_architecture("192.168.1.100")
assert arch == CpuArchitecture.ARM
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_detect_aarch64(self, mock_session_cls, plugin, credentials):
"""Architecture code 12 maps to AARCH64."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("12"),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
arch = plugin.detect_architecture("192.168.1.100")
assert arch == CpuArchitecture.AARCH64
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_detect_architecture_wmi_failure(self, mock_session_cls, plugin, credentials):
"""WMI query failure raises WMIQueryError."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result(stderr="WMI error", status_code=1),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
with pytest.raises(WMIQueryError):
plugin.detect_architecture("192.168.1.100")
# ---------------------------------------------------------------------------
# Resource Discovery Tests
# ---------------------------------------------------------------------------
class TestDiscoverResources:
"""Tests for resource discovery via WinRM."""
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_services(self, mock_session_cls, plugin, credentials):
"""Discovers Windows services."""
services_json = json.dumps([
{"Name": "wuauserv", "DisplayName": "Windows Update", "Status": 4, "StartType": 3},
{"Name": "Spooler", "DisplayName": "Print Spooler", "Status": 4, "StartType": 2},
])
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"), # auth
make_ps_result("9"), # architecture
make_ps_result("false"), # hyperv check
make_ps_result(services_json), # services
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_service"],
progress_callback=callback,
)
assert len(result.resources) == 2
assert result.resources[0].resource_type == "windows_service"
assert result.resources[0].name == "wuauserv"
assert result.resources[0].provider == ProviderType.WINDOWS
assert result.resources[0].platform_category == PlatformCategory.WINDOWS
assert result.resources[0].architecture == CpuArchitecture.AMD64
callback.assert_called()
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_scheduled_tasks(self, mock_session_cls, plugin, credentials):
"""Discovers scheduled tasks."""
tasks_json = json.dumps([
{"TaskName": "Backup", "TaskPath": "\\Custom\\", "State": 3},
])
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("9"),
make_ps_result("false"),
make_ps_result(tasks_json),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_scheduled_task"],
progress_callback=callback,
)
assert len(result.resources) == 1
assert result.resources[0].resource_type == "windows_scheduled_task"
assert result.resources[0].name == "Backup"
assert result.resources[0].attributes["task_path"] == "\\Custom\\"
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_hyperv_skipped_when_not_installed(
self, mock_session_cls, plugin, credentials
):
"""Hyper-V resources are skipped when role is not installed."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("9"),
make_ps_result("false"), # hyperv NOT installed
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_hyperv_vm"],
progress_callback=callback,
)
assert len(result.resources) == 0
assert any("Hyper-V role not installed" in w for w in result.warnings)
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_hyperv_vms_when_installed(
self, mock_session_cls, plugin, credentials
):
"""Hyper-V VMs are discovered when role is installed."""
vms_json = json.dumps([
{
"Name": "TestVM",
"VMId": "abc-123",
"State": 2,
"MemoryAssigned": 4294967296,
"ProcessorCount": 4,
"Generation": 2,
}
])
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("9"),
make_ps_result("true"), # hyperv IS installed
make_ps_result(vms_json),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_hyperv_vm"],
progress_callback=callback,
)
assert len(result.resources) == 1
assert result.resources[0].resource_type == "windows_hyperv_vm"
assert result.resources[0].name == "TestVM"
assert result.resources[0].attributes["vm_id"] == "abc-123"
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_local_users(self, mock_session_cls, plugin, credentials):
"""Discovers local user accounts."""
users_json = json.dumps([
{"Name": "Administrator", "Enabled": True, "Description": "Built-in admin", "LastLogon": "2024-01-01"},
])
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("9"),
make_ps_result("false"),
make_ps_result(users_json),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_local_user"],
progress_callback=callback,
)
assert len(result.resources) == 1
assert result.resources[0].resource_type == "windows_local_user"
assert result.resources[0].name == "Administrator"
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_insufficient_privileges(
self, mock_session_cls, plugin, credentials
):
"""Insufficient privileges are captured as errors."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("9"),
make_ps_result("false"),
make_ps_result(stderr="Access is denied", status_code=1),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_service"],
progress_callback=callback,
)
assert len(result.resources) == 0
assert len(result.errors) == 1
assert "Insufficient privileges" in result.errors[0]
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_wmi_query_failure(
self, mock_session_cls, plugin, credentials
):
"""WMI query failures are captured as errors."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("9"),
make_ps_result("false"),
make_ps_result(stderr="Invalid class", status_code=1),
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_feature"],
progress_callback=callback,
)
assert len(result.resources) == 0
assert len(result.errors) == 1
assert "WMI query failed" in result.errors[0]
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_discover_empty_result(self, mock_session_cls, plugin, credentials):
"""Empty PowerShell output returns no resources."""
mock_session = MagicMock()
mock_session.run_ps.side_effect = [
make_ps_result("SERVER"),
make_ps_result("9"),
make_ps_result("false"),
make_ps_result(""), # empty output
]
mock_session_cls.return_value = mock_session
plugin.authenticate(credentials)
callback = MagicMock()
result = plugin.discover_resources(
endpoints=["192.168.1.100"],
resource_types=["windows_local_group"],
progress_callback=callback,
)
assert len(result.resources) == 0
assert len(result.errors) == 0
# ---------------------------------------------------------------------------
# Error Handling Tests
# ---------------------------------------------------------------------------
class TestErrorHandling:
"""Tests for WinRM-specific error handling."""
def test_winrm_not_enabled_error(self):
"""WinRMNotEnabledError contains host info."""
err = WinRMNotEnabledError("myhost", "connection refused")
assert "myhost" in str(err)
assert "connection refused" in str(err)
assert err.host == "myhost"
def test_wmi_query_error(self):
"""WMIQueryError contains query info."""
err = WMIQueryError("Win32_Processor", "invalid class")
assert "Win32_Processor" in str(err)
assert "invalid class" in str(err)
assert err.query == "Win32_Processor"
def test_insufficient_privileges_error(self):
"""InsufficientPrivilegesError contains operation info."""
err = InsufficientPrivilegesError("Get-Service", "access denied")
assert "Get-Service" in str(err)
assert "access denied" in str(err)
assert err.operation == "Get-Service"
@patch("iac_reverse.scanner.windows_plugin.winrm.Session")
def test_no_session_raises_winrm_not_enabled(
self, mock_session_cls, plugin
):
"""Running PowerShell without session raises WinRMNotEnabledError."""
plugin._host = "testhost"
with pytest.raises(WinRMNotEnabledError):
plugin._run_powershell("Get-Service")