"""Unit tests for the KubernetesPlugin provider plugin.""" from unittest.mock import MagicMock, patch import pytest from iac_reverse.models import ( CpuArchitecture, DiscoveredResource, PlatformCategory, ProviderType, ScanProgress, ScanResult, ) from iac_reverse.scanner.kubernetes_plugin import KubernetesPlugin from iac_reverse.scanner.scanner import AuthenticationError class TestKubernetesPluginAuthenticate: """Tests for KubernetesPlugin.authenticate().""" @patch("iac_reverse.scanner.kubernetes_plugin.config") @patch("iac_reverse.scanner.kubernetes_plugin.client") def test_authenticate_with_kubeconfig_path(self, mock_client, mock_config): """Successfully authenticates with a kubeconfig path.""" plugin = KubernetesPlugin() credentials = {"kubeconfig_path": "/home/user/.kube/config"} plugin.authenticate(credentials) mock_config.load_kube_config.assert_called_once_with( config_file="/home/user/.kube/config", context=None, ) mock_client.ApiClient.assert_called_once() assert plugin._core_v1 is not None assert plugin._apps_v1 is not None assert plugin._networking_v1 is not None @patch("iac_reverse.scanner.kubernetes_plugin.config") @patch("iac_reverse.scanner.kubernetes_plugin.client") def test_authenticate_with_context(self, mock_client, mock_config): """Authenticates with a specific context.""" plugin = KubernetesPlugin() credentials = { "kubeconfig_path": "/home/user/.kube/config", "context": "production", } plugin.authenticate(credentials) mock_config.load_kube_config.assert_called_once_with( config_file="/home/user/.kube/config", context="production", ) def test_authenticate_missing_kubeconfig_path(self): """Raises AuthenticationError when kubeconfig_path is missing.""" plugin = KubernetesPlugin() with pytest.raises(AuthenticationError) as exc_info: plugin.authenticate({}) assert "kubeconfig_path is required" in str(exc_info.value) @patch("iac_reverse.scanner.kubernetes_plugin.config") def test_authenticate_invalid_kubeconfig(self, mock_config): """Raises AuthenticationError when kubeconfig is invalid.""" mock_config.load_kube_config.side_effect = Exception("file not found") plugin = KubernetesPlugin() with pytest.raises(AuthenticationError) as exc_info: plugin.authenticate({"kubeconfig_path": "/invalid/path"}) assert "Failed to load kubeconfig" in str(exc_info.value) class TestKubernetesPluginPlatformCategory: """Tests for KubernetesPlugin.get_platform_category().""" def test_returns_container_orchestration(self): """Returns CONTAINER_ORCHESTRATION category.""" plugin = KubernetesPlugin() assert plugin.get_platform_category() == PlatformCategory.CONTAINER_ORCHESTRATION class TestKubernetesPluginSupportedResourceTypes: """Tests for KubernetesPlugin.list_supported_resource_types().""" def test_returns_all_kubernetes_resource_types(self): """Returns all six supported Kubernetes resource types.""" plugin = KubernetesPlugin() types = plugin.list_supported_resource_types() expected = [ "kubernetes_deployment", "kubernetes_service", "kubernetes_ingress", "kubernetes_config_map", "kubernetes_persistent_volume", "kubernetes_namespace", ] assert types == expected def test_returns_new_list_each_call(self): """Returns a new list instance each call (not mutable reference).""" plugin = KubernetesPlugin() types1 = plugin.list_supported_resource_types() types2 = plugin.list_supported_resource_types() assert types1 == types2 assert types1 is not types2 class TestKubernetesPluginDetectArchitecture: """Tests for KubernetesPlugin.detect_architecture().""" def test_detects_amd64_from_node_label(self): """Detects AMD64 architecture from kubernetes.io/arch label.""" plugin = KubernetesPlugin() plugin._core_v1 = MagicMock() node = _make_node( addresses=[("InternalIP", "192.168.1.10")], labels={"kubernetes.io/arch": "amd64"}, ) plugin._core_v1.list_node.return_value = MagicMock(items=[node]) result = plugin.detect_architecture("192.168.1.10") assert result == CpuArchitecture.AMD64 def test_detects_arm64_from_node_label(self): """Detects AARCH64 architecture from arm64 label.""" plugin = KubernetesPlugin() plugin._core_v1 = MagicMock() node = _make_node( addresses=[("InternalIP", "192.168.1.20")], labels={"kubernetes.io/arch": "arm64"}, ) plugin._core_v1.list_node.return_value = MagicMock(items=[node]) result = plugin.detect_architecture("192.168.1.20") assert result == CpuArchitecture.AARCH64 def test_detects_arm_from_node_label(self): """Detects ARM architecture from arm label.""" plugin = KubernetesPlugin() plugin._core_v1 = MagicMock() node = _make_node( addresses=[("InternalIP", "192.168.1.30")], labels={"kubernetes.io/arch": "arm"}, ) plugin._core_v1.list_node.return_value = MagicMock(items=[node]) result = plugin.detect_architecture("192.168.1.30") assert result == CpuArchitecture.ARM def test_falls_back_to_beta_label(self): """Falls back to beta.kubernetes.io/arch label.""" plugin = KubernetesPlugin() plugin._core_v1 = MagicMock() node = _make_node( addresses=[("InternalIP", "192.168.1.40")], labels={"beta.kubernetes.io/arch": "arm64"}, ) plugin._core_v1.list_node.return_value = MagicMock(items=[node]) result = plugin.detect_architecture("192.168.1.40") assert result == CpuArchitecture.AARCH64 def test_defaults_to_amd64_when_no_label(self): """Defaults to AMD64 when no arch label is present.""" plugin = KubernetesPlugin() plugin._core_v1 = MagicMock() node = _make_node( addresses=[("InternalIP", "192.168.1.50")], labels={}, ) plugin._core_v1.list_node.return_value = MagicMock(items=[node]) result = plugin.detect_architecture("192.168.1.50") assert result == CpuArchitecture.AMD64 def test_defaults_to_amd64_when_not_authenticated(self): """Returns AMD64 when plugin is not authenticated.""" plugin = KubernetesPlugin() result = plugin.detect_architecture("192.168.1.1") assert result == CpuArchitecture.AMD64 def test_defaults_to_amd64_on_api_error(self): """Returns AMD64 when API call fails.""" plugin = KubernetesPlugin() plugin._core_v1 = MagicMock() plugin._core_v1.list_node.side_effect = Exception("API error") result = plugin.detect_architecture("192.168.1.1") assert result == CpuArchitecture.AMD64 class TestKubernetesPluginListEndpoints: """Tests for KubernetesPlugin.list_endpoints().""" def test_returns_node_internal_ips(self): """Returns InternalIP addresses from nodes.""" plugin = KubernetesPlugin() plugin._core_v1 = MagicMock() nodes = [ _make_node(addresses=[("InternalIP", "10.0.0.1")]), _make_node(addresses=[("InternalIP", "10.0.0.2")]), ] plugin._core_v1.list_node.return_value = MagicMock(items=nodes) result = plugin.list_endpoints() assert result == ["10.0.0.1", "10.0.0.2"] def test_returns_empty_when_not_authenticated(self): """Returns empty list when not authenticated.""" plugin = KubernetesPlugin() assert plugin.list_endpoints() == [] class TestKubernetesPluginDiscoverResources: """Tests for KubernetesPlugin.discover_resources().""" def test_discovers_deployments(self): """Discovers deployments and returns DiscoveredResource objects.""" plugin = _make_authenticated_plugin() dep = MagicMock() dep.metadata.name = "nginx" dep.metadata.namespace = "default" dep.metadata.labels = {"app": "nginx"} dep.spec.replicas = 3 plugin._apps_v1.list_deployment_for_all_namespaces.return_value = MagicMock( items=[dep] ) _stub_empty_apis(plugin, exclude="deployments") progress_updates = [] result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_deployment"], progress_callback=progress_updates.append, ) assert len(result.resources) == 1 resource = result.resources[0] assert resource.resource_type == "kubernetes_deployment" assert resource.unique_id == "default/nginx" assert resource.name == "nginx" assert resource.provider == ProviderType.KUBERNETES assert resource.platform_category == PlatformCategory.CONTAINER_ORCHESTRATION assert resource.attributes["namespace"] == "default" assert resource.attributes["replicas"] == 3 def test_discovers_services(self): """Discovers services and returns DiscoveredResource objects.""" plugin = _make_authenticated_plugin() svc = MagicMock() svc.metadata.name = "my-service" svc.metadata.namespace = "production" svc.metadata.labels = {"app": "web"} svc.spec.type = "ClusterIP" svc.spec.cluster_ip = "10.96.0.1" plugin._core_v1.list_service_for_all_namespaces.return_value = MagicMock( items=[svc] ) _stub_empty_apis(plugin, exclude="services") result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_service"], progress_callback=lambda p: None, ) assert len(result.resources) == 1 resource = result.resources[0] assert resource.resource_type == "kubernetes_service" assert resource.unique_id == "production/my-service" assert resource.attributes["type"] == "ClusterIP" def test_discovers_ingresses(self): """Discovers ingresses and returns DiscoveredResource objects.""" plugin = _make_authenticated_plugin() ing = MagicMock() ing.metadata.name = "web-ingress" ing.metadata.namespace = "default" ing.metadata.labels = {"app": "web"} plugin._networking_v1.list_ingress_for_all_namespaces.return_value = MagicMock( items=[ing] ) _stub_empty_apis(plugin, exclude="ingresses") result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_ingress"], progress_callback=lambda p: None, ) assert len(result.resources) == 1 assert result.resources[0].resource_type == "kubernetes_ingress" assert result.resources[0].unique_id == "default/web-ingress" def test_discovers_config_maps(self): """Discovers config maps and returns DiscoveredResource objects.""" plugin = _make_authenticated_plugin() cm = MagicMock() cm.metadata.name = "app-config" cm.metadata.namespace = "default" cm.metadata.labels = {} cm.data = {"key1": "value1", "key2": "value2"} plugin._core_v1.list_config_map_for_all_namespaces.return_value = MagicMock( items=[cm] ) _stub_empty_apis(plugin, exclude="config_maps") result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_config_map"], progress_callback=lambda p: None, ) assert len(result.resources) == 1 resource = result.resources[0] assert resource.resource_type == "kubernetes_config_map" assert resource.attributes["data_keys"] == ["key1", "key2"] def test_discovers_persistent_volumes(self): """Discovers persistent volumes and returns DiscoveredResource objects.""" plugin = _make_authenticated_plugin() pv = MagicMock() pv.metadata.name = "pv-data" pv.metadata.labels = {} pv.spec.capacity = {"storage": "10Gi"} pv.spec.access_modes = ["ReadWriteOnce"] pv.spec.storage_class_name = "standard" plugin._core_v1.list_persistent_volume.return_value = MagicMock( items=[pv] ) _stub_empty_apis(plugin, exclude="persistent_volumes") result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_persistent_volume"], progress_callback=lambda p: None, ) assert len(result.resources) == 1 resource = result.resources[0] assert resource.resource_type == "kubernetes_persistent_volume" assert resource.unique_id == "pv-data" assert resource.attributes["capacity"] == {"storage": "10Gi"} assert resource.attributes["access_modes"] == ["ReadWriteOnce"] def test_discovers_namespaces(self): """Discovers namespaces and returns DiscoveredResource objects.""" plugin = _make_authenticated_plugin() ns = MagicMock() ns.metadata.name = "production" ns.metadata.labels = {"env": "prod"} ns.status.phase = "Active" plugin._core_v1.list_namespace.return_value = MagicMock(items=[ns]) _stub_empty_apis(plugin, exclude="namespaces") result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_namespace"], progress_callback=lambda p: None, ) assert len(result.resources) == 1 resource = result.resources[0] assert resource.resource_type == "kubernetes_namespace" assert resource.unique_id == "production" assert resource.attributes["status"] == "Active" def test_reports_progress_for_each_resource_type(self): """Reports progress callback for each resource type scanned.""" plugin = _make_authenticated_plugin() _stub_empty_apis(plugin) progress_updates: list[ScanProgress] = [] plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_deployment", "kubernetes_service"], progress_callback=progress_updates.append, ) assert len(progress_updates) == 2 assert progress_updates[0].current_resource_type == "kubernetes_deployment" assert progress_updates[0].resource_types_completed == 1 assert progress_updates[0].total_resource_types == 2 assert progress_updates[1].current_resource_type == "kubernetes_service" assert progress_updates[1].resource_types_completed == 2 def test_handles_api_errors_gracefully(self): """Records errors when API calls fail without crashing.""" plugin = _make_authenticated_plugin() plugin._apps_v1.list_deployment_for_all_namespaces.side_effect = Exception( "API unavailable" ) _stub_empty_apis(plugin, exclude="deployments") result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_deployment"], progress_callback=lambda p: None, ) assert len(result.errors) == 1 assert "API unavailable" in result.errors[0] assert len(result.resources) == 0 def test_returns_scan_result_type(self): """Returns a ScanResult instance.""" plugin = _make_authenticated_plugin() _stub_empty_apis(plugin) result = plugin.discover_resources( endpoints=["10.0.0.1"], resource_types=["kubernetes_namespace"], progress_callback=lambda p: None, ) assert isinstance(result, ScanResult) # --------------------------------------------------------------------------- # Test Helpers # --------------------------------------------------------------------------- def _make_node( addresses: list[tuple[str, str]] | None = None, labels: dict[str, str] | None = None, ) -> MagicMock: """Create a mock Kubernetes node object.""" node = MagicMock() node.metadata.labels = labels or {} if addresses: addr_objects = [] for addr_type, addr_value in addresses: addr = MagicMock() addr.type = addr_type addr.address = addr_value addr_objects.append(addr) node.status.addresses = addr_objects else: node.status.addresses = [] return node def _make_authenticated_plugin() -> KubernetesPlugin: """Create a KubernetesPlugin with mocked API clients.""" plugin = KubernetesPlugin() plugin._api_client = MagicMock() plugin._core_v1 = MagicMock() plugin._apps_v1 = MagicMock() plugin._networking_v1 = MagicMock() # Default: detect_architecture returns AMD64 node = _make_node( addresses=[("InternalIP", "10.0.0.1")], labels={"kubernetes.io/arch": "amd64"}, ) plugin._core_v1.list_node.return_value = MagicMock(items=[node]) return plugin def _stub_empty_apis(plugin: KubernetesPlugin, exclude: str = "") -> None: """Stub all discovery API calls to return empty lists. Args: plugin: The plugin instance with mocked clients. exclude: Resource type to exclude from stubbing (leave for test to set up). """ if exclude != "deployments": plugin._apps_v1.list_deployment_for_all_namespaces.return_value = MagicMock( items=[] ) if exclude != "services": plugin._core_v1.list_service_for_all_namespaces.return_value = MagicMock( items=[] ) if exclude != "ingresses": plugin._networking_v1.list_ingress_for_all_namespaces.return_value = MagicMock( items=[] ) if exclude != "config_maps": plugin._core_v1.list_config_map_for_all_namespaces.return_value = MagicMock( items=[] ) if exclude != "persistent_volumes": plugin._core_v1.list_persistent_volume.return_value = MagicMock(items=[]) if exclude != "namespaces": plugin._core_v1.list_namespace.return_value = MagicMock(items=[])