"""Unit tests for the DockerSwarmPlugin.""" from unittest.mock import MagicMock, patch import pytest from iac_reverse.models import ( CpuArchitecture, DiscoveredResource, PlatformCategory, ProviderType, ScanProgress, ) from iac_reverse.scanner import AuthenticationError from iac_reverse.scanner.docker_swarm_plugin import DockerSwarmPlugin # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def plugin(): """Create a fresh DockerSwarmPlugin instance.""" return DockerSwarmPlugin() @pytest.fixture def mock_docker_client(): """Create a mock Docker client with common attributes.""" client = MagicMock() client.ping.return_value = True client.info.return_value = {"Architecture": "x86_64"} client.services.list.return_value = [] client.networks.list.return_value = [] client.volumes.list.return_value = [] client.configs.list.return_value = [] client.secrets.list.return_value = [] return client @pytest.fixture def authenticated_plugin(plugin, mock_docker_client): """Return a plugin that has been authenticated with a mock client.""" with patch("iac_reverse.scanner.docker_swarm_plugin.docker.DockerClient") as mock_cls: mock_cls.return_value = mock_docker_client plugin.authenticate({"host": "tcp://localhost:2376"}) return plugin # --------------------------------------------------------------------------- # Authentication tests # --------------------------------------------------------------------------- class TestAuthenticate: def test_authenticate_success(self, plugin): """Successful authentication connects to Docker daemon.""" with patch("iac_reverse.scanner.docker_swarm_plugin.docker.DockerClient") as mock_cls: mock_client = MagicMock() mock_client.ping.return_value = True mock_cls.return_value = mock_client plugin.authenticate({"host": "tcp://192.168.1.10:2376"}) mock_cls.assert_called_once_with( base_url="tcp://192.168.1.10:2376", tls=False, ) mock_client.ping.assert_called_once() def test_authenticate_missing_host(self, plugin): """Authentication fails when host is not provided.""" with pytest.raises(AuthenticationError) as exc_info: plugin.authenticate({}) assert "host" in str(exc_info.value).lower() def test_authenticate_empty_host(self, plugin): """Authentication fails when host is empty string.""" with pytest.raises(AuthenticationError): plugin.authenticate({"host": ""}) def test_authenticate_connection_failure(self, plugin): """Authentication fails when Docker daemon is unreachable.""" with patch("iac_reverse.scanner.docker_swarm_plugin.docker.DockerClient") as mock_cls: mock_client = MagicMock() mock_client.ping.side_effect = ConnectionError("Connection refused") mock_cls.return_value = mock_client with pytest.raises(AuthenticationError) as exc_info: plugin.authenticate({"host": "tcp://unreachable:2376"}) assert "Connection refused" in str(exc_info.value) def test_authenticate_with_tls(self, plugin): """Authentication configures TLS when tls_verify and cert_path are set.""" with patch("iac_reverse.scanner.docker_swarm_plugin.docker.DockerClient") as mock_cls: with patch("iac_reverse.scanner.docker_swarm_plugin.TLSConfig") as mock_tls: mock_client = MagicMock() mock_client.ping.return_value = True mock_cls.return_value = mock_client mock_tls_instance = MagicMock() mock_tls.return_value = mock_tls_instance plugin.authenticate({ "host": "tcp://secure-host:2376", "tls_verify": "true", "cert_path": "/certs", }) mock_tls.assert_called_once_with( verify=True, client_cert=("/certs/cert.pem", "/certs/key.pem"), ca_cert="/certs/ca.pem", ) mock_cls.assert_called_once_with( base_url="tcp://secure-host:2376", tls=mock_tls_instance, ) # --------------------------------------------------------------------------- # Platform category and resource types # --------------------------------------------------------------------------- class TestPlatformInfo: def test_get_platform_category(self, plugin): """Returns CONTAINER_ORCHESTRATION category.""" assert plugin.get_platform_category() == PlatformCategory.CONTAINER_ORCHESTRATION def test_list_supported_resource_types(self, plugin): """Returns all five Docker Swarm resource types.""" types = plugin.list_supported_resource_types() assert types == [ "docker_service", "docker_network", "docker_volume", "docker_config", "docker_secret", ] def test_list_endpoints_before_auth(self, plugin): """Returns empty list before authentication.""" assert plugin.list_endpoints() == [] def test_list_endpoints_after_auth(self, authenticated_plugin): """Returns the host URL after authentication.""" assert authenticated_plugin.list_endpoints() == ["tcp://localhost:2376"] # --------------------------------------------------------------------------- # Architecture detection # --------------------------------------------------------------------------- class TestDetectArchitecture: def test_detect_amd64(self, authenticated_plugin, mock_docker_client): """Detects AMD64 architecture from x86_64 info.""" mock_docker_client.info.return_value = {"Architecture": "x86_64"} arch = authenticated_plugin.detect_architecture("tcp://localhost:2376") assert arch == CpuArchitecture.AMD64 def test_detect_aarch64(self, authenticated_plugin, mock_docker_client): """Detects AARCH64 architecture from aarch64 info.""" mock_docker_client.info.return_value = {"Architecture": "aarch64"} arch = authenticated_plugin.detect_architecture("tcp://localhost:2376") assert arch == CpuArchitecture.AARCH64 def test_detect_arm(self, authenticated_plugin, mock_docker_client): """Detects ARM architecture from armv7l info.""" mock_docker_client.info.return_value = {"Architecture": "armv7l"} arch = authenticated_plugin.detect_architecture("tcp://localhost:2376") assert arch == CpuArchitecture.ARM def test_detect_unknown_defaults_to_amd64(self, authenticated_plugin, mock_docker_client): """Unknown architecture string defaults to AMD64.""" mock_docker_client.info.return_value = {"Architecture": "sparc"} arch = authenticated_plugin.detect_architecture("tcp://localhost:2376") assert arch == CpuArchitecture.AMD64 def test_detect_without_client(self, plugin): """Returns AMD64 when no client is connected.""" arch = plugin.detect_architecture("tcp://localhost:2376") assert arch == CpuArchitecture.AMD64 # --------------------------------------------------------------------------- # Resource discovery # --------------------------------------------------------------------------- class TestDiscoverResources: def _noop_callback(self, progress: ScanProgress) -> None: pass def test_discover_services(self, authenticated_plugin, mock_docker_client): """Discovers Docker services with correct attributes.""" mock_service = MagicMock() mock_service.attrs = { "ID": "svc123", "Spec": { "Name": "web-app", "Mode": {"Replicated": {"Replicas": 3}}, "Labels": {"env": "prod"}, "TaskTemplate": { "ContainerSpec": { "Image": "nginx:latest", }, "Networks": [{"Target": "net456"}], }, }, } mock_docker_client.services.list.return_value = [mock_service] result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_service"], progress_callback=self._noop_callback, ) assert len(result.resources) == 1 svc = result.resources[0] assert svc.resource_type == "docker_service" assert svc.unique_id == "svc123" assert svc.name == "web-app" assert svc.provider == ProviderType.DOCKER_SWARM assert svc.platform_category == PlatformCategory.CONTAINER_ORCHESTRATION assert svc.attributes["image"] == "nginx:latest" assert svc.attributes["replicas"] == 3 assert "network:net456" in svc.raw_references def test_discover_networks(self, authenticated_plugin, mock_docker_client): """Discovers Docker networks with correct attributes.""" mock_network = MagicMock() mock_network.attrs = { "Id": "net789", "Name": "overlay-net", "Driver": "overlay", "Scope": "swarm", "Attachable": True, "Ingress": False, "Labels": {}, } mock_docker_client.networks.list.return_value = [mock_network] result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_network"], progress_callback=self._noop_callback, ) assert len(result.resources) == 1 net = result.resources[0] assert net.resource_type == "docker_network" assert net.unique_id == "net789" assert net.name == "overlay-net" assert net.attributes["driver"] == "overlay" assert net.attributes["scope"] == "swarm" assert net.attributes["attachable"] is True def test_discover_volumes(self, authenticated_plugin, mock_docker_client): """Discovers Docker volumes with correct attributes.""" mock_volume = MagicMock() mock_volume.attrs = { "Name": "data-vol", "Driver": "local", "Mountpoint": "/var/lib/docker/volumes/data-vol/_data", "Labels": {"backup": "daily"}, } mock_docker_client.volumes.list.return_value = [mock_volume] result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_volume"], progress_callback=self._noop_callback, ) assert len(result.resources) == 1 vol = result.resources[0] assert vol.resource_type == "docker_volume" assert vol.unique_id == "data-vol" assert vol.name == "data-vol" assert vol.attributes["driver"] == "local" def test_discover_configs(self, authenticated_plugin, mock_docker_client): """Discovers Docker configs (metadata only).""" mock_config = MagicMock() mock_config.attrs = { "ID": "cfg001", "Spec": { "Name": "app-config", "Labels": {"version": "2"}, }, "CreatedAt": "2024-01-01T00:00:00Z", "UpdatedAt": "2024-01-02T00:00:00Z", } mock_docker_client.configs.list.return_value = [mock_config] result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_config"], progress_callback=self._noop_callback, ) assert len(result.resources) == 1 cfg = result.resources[0] assert cfg.resource_type == "docker_config" assert cfg.unique_id == "cfg001" assert cfg.name == "app-config" assert cfg.attributes["created_at"] == "2024-01-01T00:00:00Z" def test_discover_secrets(self, authenticated_plugin, mock_docker_client): """Discovers Docker secrets (metadata only, no secret data).""" mock_secret = MagicMock() mock_secret.attrs = { "ID": "sec001", "Spec": { "Name": "db-password", "Labels": {}, }, "CreatedAt": "2024-01-01T00:00:00Z", "UpdatedAt": "2024-01-01T00:00:00Z", } mock_docker_client.secrets.list.return_value = [mock_secret] result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_secret"], progress_callback=self._noop_callback, ) assert len(result.resources) == 1 sec = result.resources[0] assert sec.resource_type == "docker_secret" assert sec.unique_id == "sec001" assert sec.name == "db-password" def test_discover_all_types(self, authenticated_plugin, mock_docker_client): """Discovers all resource types in a single call.""" mock_docker_client.services.list.return_value = [] mock_docker_client.networks.list.return_value = [] mock_docker_client.volumes.list.return_value = [] mock_docker_client.configs.list.return_value = [] mock_docker_client.secrets.list.return_value = [] all_types = [ "docker_service", "docker_network", "docker_volume", "docker_config", "docker_secret", ] result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=all_types, progress_callback=self._noop_callback, ) assert result.errors == [] assert result.warnings == [] def test_discover_reports_progress(self, authenticated_plugin, mock_docker_client): """Progress callback is invoked for each resource type.""" progress_updates: list[ScanProgress] = [] def track_progress(p: ScanProgress): progress_updates.append(p) authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_service", "docker_network"], progress_callback=track_progress, ) assert len(progress_updates) == 2 assert progress_updates[0].current_resource_type == "docker_service" assert progress_updates[0].resource_types_completed == 1 assert progress_updates[0].total_resource_types == 2 assert progress_updates[1].current_resource_type == "docker_network" assert progress_updates[1].resource_types_completed == 2 def test_discover_handles_api_error(self, authenticated_plugin, mock_docker_client): """API errors are captured in the result errors list.""" mock_docker_client.services.list.side_effect = Exception("API timeout") result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_service"], progress_callback=self._noop_callback, ) assert len(result.errors) == 1 assert "API timeout" in result.errors[0] def test_discover_without_authentication(self, plugin): """Returns error when called without authentication.""" result = plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_service"], progress_callback=lambda p: None, ) assert len(result.errors) == 1 assert "Not authenticated" in result.errors[0] def test_service_references_include_volumes_configs_secrets( self, authenticated_plugin, mock_docker_client ): """Service references include network, volume, config, and secret refs.""" mock_service = MagicMock() mock_service.attrs = { "ID": "svc-ref", "Spec": { "Name": "full-service", "Mode": {"Replicated": {"Replicas": 1}}, "Labels": {}, "TaskTemplate": { "ContainerSpec": { "Image": "app:v1", "Mounts": [{"Source": "data-vol"}], "Configs": [{"ConfigID": "cfg-abc"}], "Secrets": [{"SecretID": "sec-xyz"}], }, "Networks": [{"Target": "net-123"}], }, }, } mock_docker_client.services.list.return_value = [mock_service] result = authenticated_plugin.discover_resources( endpoints=["tcp://localhost:2376"], resource_types=["docker_service"], progress_callback=self._noop_callback, ) refs = result.resources[0].raw_references assert "network:net-123" in refs assert "volume:data-vol" in refs assert "config:cfg-abc" in refs assert "secret:sec-xyz" in refs