"""Unit tests for the Windows Discovery Plugin. Tests use mocks for the winrm session to avoid requiring actual Windows hosts for testing. """ import json from unittest.mock import MagicMock, patch import pytest from iac_reverse.models import CpuArchitecture, PlatformCategory, ProviderType from iac_reverse.scanner.scanner import AuthenticationError from iac_reverse.scanner.windows_plugin import ( InsufficientPrivilegesError, WindowsDiscoveryPlugin, WinRMNotEnabledError, WMIQueryError, WINDOWS_RESOURCE_TYPES, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def plugin(): """Create a fresh WindowsDiscoveryPlugin instance.""" return WindowsDiscoveryPlugin() @pytest.fixture def credentials(): """Standard test credentials.""" return { "host": "192.168.1.100", "username": "admin", "password": "secret", "transport": "ntlm", "port": "5986", "use_ssl": "true", } @pytest.fixture def mock_session(): """Create a mock WinRM session.""" session = MagicMock() return session def make_ps_result(stdout: str = "", stderr: str = "", status_code: int = 0): """Helper to create a mock PowerShell result.""" result = MagicMock() result.std_out = stdout.encode("utf-8") result.std_err = stderr.encode("utf-8") result.status_code = status_code return result # --------------------------------------------------------------------------- # Authentication Tests # --------------------------------------------------------------------------- class TestAuthenticate: """Tests for WindowsDiscoveryPlugin.authenticate().""" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_authenticate_success(self, mock_session_cls, plugin, credentials): """Successful authentication creates a session.""" mock_session = MagicMock() mock_session.run_ps.return_value = make_ps_result("WIN-SERVER01") mock_session_cls.return_value = mock_session plugin.authenticate(credentials) mock_session_cls.assert_called_once_with( "https://192.168.1.100:5986/wsman", auth=("admin", "secret"), transport="ntlm", server_cert_validation="ignore", ) assert plugin._host == "192.168.1.100" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_authenticate_http_no_ssl(self, mock_session_cls, plugin): """Authentication with use_ssl=false uses HTTP.""" creds = { "host": "myhost", "username": "user", "password": "pass", "transport": "ntlm", "port": "5985", "use_ssl": "false", } mock_session = MagicMock() mock_session.run_ps.return_value = make_ps_result("MYHOST") mock_session_cls.return_value = mock_session plugin.authenticate(creds) mock_session_cls.assert_called_once_with( "http://myhost:5985/wsman", auth=("user", "pass"), transport="ntlm", server_cert_validation="validate", ) def test_authenticate_missing_host(self, plugin): """Missing host raises AuthenticationError.""" creds = {"username": "user", "password": "pass"} with pytest.raises(AuthenticationError, match="host is required"): plugin.authenticate(creds) def test_authenticate_missing_username(self, plugin): """Missing username raises AuthenticationError.""" creds = {"host": "myhost", "password": "pass"} with pytest.raises(AuthenticationError, match="username is required"): plugin.authenticate(creds) def test_authenticate_missing_password(self, plugin): """Missing password raises AuthenticationError.""" creds = {"host": "myhost", "username": "user"} with pytest.raises(AuthenticationError, match="password is required"): plugin.authenticate(creds) @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_authenticate_connection_refused(self, mock_session_cls, plugin, credentials): """Connection refused raises WinRMNotEnabledError.""" mock_session_cls.side_effect = Exception("connection refused") with pytest.raises(WinRMNotEnabledError): plugin.authenticate(credentials) @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_authenticate_access_denied(self, mock_session_cls, plugin, credentials): """Access denied during auth test raises AuthenticationError.""" mock_session = MagicMock() mock_session.run_ps.return_value = make_ps_result( stderr="Access is denied", status_code=1 ) mock_session_cls.return_value = mock_session with pytest.raises(AuthenticationError): plugin.authenticate(credentials) # --------------------------------------------------------------------------- # Platform Category and Resource Types Tests # --------------------------------------------------------------------------- class TestPlatformInfo: """Tests for platform category and resource type listing.""" def test_get_platform_category(self, plugin): """Returns PlatformCategory.WINDOWS.""" assert plugin.get_platform_category() == PlatformCategory.WINDOWS def test_list_supported_resource_types(self, plugin): """Returns all 13 Windows resource types.""" types = plugin.list_supported_resource_types() assert len(types) == 13 assert "windows_service" in types assert "windows_scheduled_task" in types assert "windows_iis_site" in types assert "windows_iis_app_pool" in types assert "windows_network_adapter" in types assert "windows_firewall_rule" in types assert "windows_installed_software" in types assert "windows_feature" in types assert "windows_hyperv_vm" in types assert "windows_hyperv_switch" in types assert "windows_dns_record" in types assert "windows_local_user" in types assert "windows_local_group" in types def test_list_endpoints_before_auth(self, plugin): """Returns empty list before authentication.""" assert plugin.list_endpoints() == [] @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_list_endpoints_after_auth(self, mock_session_cls, plugin, credentials): """Returns host after authentication.""" mock_session = MagicMock() mock_session.run_ps.return_value = make_ps_result("SERVER") mock_session_cls.return_value = mock_session plugin.authenticate(credentials) assert plugin.list_endpoints() == ["192.168.1.100"] # --------------------------------------------------------------------------- # Architecture Detection Tests # --------------------------------------------------------------------------- class TestDetectArchitecture: """Tests for CPU architecture detection via WMI.""" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_detect_amd64(self, mock_session_cls, plugin, credentials): """Architecture code 9 maps to AMD64.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), # auth test make_ps_result("9"), # architecture query ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) arch = plugin.detect_architecture("192.168.1.100") assert arch == CpuArchitecture.AMD64 @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_detect_arm(self, mock_session_cls, plugin, credentials): """Architecture code 5 maps to ARM.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("5"), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) arch = plugin.detect_architecture("192.168.1.100") assert arch == CpuArchitecture.ARM @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_detect_aarch64(self, mock_session_cls, plugin, credentials): """Architecture code 12 maps to AARCH64.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("12"), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) arch = plugin.detect_architecture("192.168.1.100") assert arch == CpuArchitecture.AARCH64 @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_detect_architecture_wmi_failure(self, mock_session_cls, plugin, credentials): """WMI query failure raises WMIQueryError.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result(stderr="WMI error", status_code=1), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) with pytest.raises(WMIQueryError): plugin.detect_architecture("192.168.1.100") # --------------------------------------------------------------------------- # Resource Discovery Tests # --------------------------------------------------------------------------- class TestDiscoverResources: """Tests for resource discovery via WinRM.""" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_services(self, mock_session_cls, plugin, credentials): """Discovers Windows services.""" services_json = json.dumps([ {"Name": "wuauserv", "DisplayName": "Windows Update", "Status": 4, "StartType": 3}, {"Name": "Spooler", "DisplayName": "Print Spooler", "Status": 4, "StartType": 2}, ]) mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), # auth make_ps_result("9"), # architecture make_ps_result("false"), # hyperv check make_ps_result(services_json), # services ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_service"], progress_callback=callback, ) assert len(result.resources) == 2 assert result.resources[0].resource_type == "windows_service" assert result.resources[0].name == "wuauserv" assert result.resources[0].provider == ProviderType.WINDOWS assert result.resources[0].platform_category == PlatformCategory.WINDOWS assert result.resources[0].architecture == CpuArchitecture.AMD64 callback.assert_called() @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_scheduled_tasks(self, mock_session_cls, plugin, credentials): """Discovers scheduled tasks.""" tasks_json = json.dumps([ {"TaskName": "Backup", "TaskPath": "\\Custom\\", "State": 3}, ]) mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("9"), make_ps_result("false"), make_ps_result(tasks_json), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_scheduled_task"], progress_callback=callback, ) assert len(result.resources) == 1 assert result.resources[0].resource_type == "windows_scheduled_task" assert result.resources[0].name == "Backup" assert result.resources[0].attributes["task_path"] == "\\Custom\\" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_hyperv_skipped_when_not_installed( self, mock_session_cls, plugin, credentials ): """Hyper-V resources are skipped when role is not installed.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("9"), make_ps_result("false"), # hyperv NOT installed ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_hyperv_vm"], progress_callback=callback, ) assert len(result.resources) == 0 assert any("Hyper-V role not installed" in w for w in result.warnings) @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_hyperv_vms_when_installed( self, mock_session_cls, plugin, credentials ): """Hyper-V VMs are discovered when role is installed.""" vms_json = json.dumps([ { "Name": "TestVM", "VMId": "abc-123", "State": 2, "MemoryAssigned": 4294967296, "ProcessorCount": 4, "Generation": 2, } ]) mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("9"), make_ps_result("true"), # hyperv IS installed make_ps_result(vms_json), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_hyperv_vm"], progress_callback=callback, ) assert len(result.resources) == 1 assert result.resources[0].resource_type == "windows_hyperv_vm" assert result.resources[0].name == "TestVM" assert result.resources[0].attributes["vm_id"] == "abc-123" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_local_users(self, mock_session_cls, plugin, credentials): """Discovers local user accounts.""" users_json = json.dumps([ {"Name": "Administrator", "Enabled": True, "Description": "Built-in admin", "LastLogon": "2024-01-01"}, ]) mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("9"), make_ps_result("false"), make_ps_result(users_json), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_local_user"], progress_callback=callback, ) assert len(result.resources) == 1 assert result.resources[0].resource_type == "windows_local_user" assert result.resources[0].name == "Administrator" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_insufficient_privileges( self, mock_session_cls, plugin, credentials ): """Insufficient privileges are captured as errors.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("9"), make_ps_result("false"), make_ps_result(stderr="Access is denied", status_code=1), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_service"], progress_callback=callback, ) assert len(result.resources) == 0 assert len(result.errors) == 1 assert "Insufficient privileges" in result.errors[0] @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_wmi_query_failure( self, mock_session_cls, plugin, credentials ): """WMI query failures are captured as errors.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("9"), make_ps_result("false"), make_ps_result(stderr="Invalid class", status_code=1), ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_feature"], progress_callback=callback, ) assert len(result.resources) == 0 assert len(result.errors) == 1 assert "WMI query failed" in result.errors[0] @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_discover_empty_result(self, mock_session_cls, plugin, credentials): """Empty PowerShell output returns no resources.""" mock_session = MagicMock() mock_session.run_ps.side_effect = [ make_ps_result("SERVER"), make_ps_result("9"), make_ps_result("false"), make_ps_result(""), # empty output ] mock_session_cls.return_value = mock_session plugin.authenticate(credentials) callback = MagicMock() result = plugin.discover_resources( endpoints=["192.168.1.100"], resource_types=["windows_local_group"], progress_callback=callback, ) assert len(result.resources) == 0 assert len(result.errors) == 0 # --------------------------------------------------------------------------- # Error Handling Tests # --------------------------------------------------------------------------- class TestErrorHandling: """Tests for WinRM-specific error handling.""" def test_winrm_not_enabled_error(self): """WinRMNotEnabledError contains host info.""" err = WinRMNotEnabledError("myhost", "connection refused") assert "myhost" in str(err) assert "connection refused" in str(err) assert err.host == "myhost" def test_wmi_query_error(self): """WMIQueryError contains query info.""" err = WMIQueryError("Win32_Processor", "invalid class") assert "Win32_Processor" in str(err) assert "invalid class" in str(err) assert err.query == "Win32_Processor" def test_insufficient_privileges_error(self): """InsufficientPrivilegesError contains operation info.""" err = InsufficientPrivilegesError("Get-Service", "access denied") assert "Get-Service" in str(err) assert "access denied" in str(err) assert err.operation == "Get-Service" @patch("iac_reverse.scanner.windows_plugin.winrm.Session") def test_no_session_raises_winrm_not_enabled( self, mock_session_cls, plugin ): """Running PowerShell without session raises WinRMNotEnabledError.""" plugin._host = "testhost" with pytest.raises(WinRMNotEnabledError): plugin._run_powershell("Get-Service")