"""Unit tests for the DependencyResolver.""" import pytest from iac_reverse.models import ( CpuArchitecture, DependencyGraph, DiscoveredResource, PlatformCategory, ProviderType, ScanResult, ) from iac_reverse.resolver import DependencyResolver # --------------------------------------------------------------------------- # Helpers / Fixtures # --------------------------------------------------------------------------- def make_resource( resource_type: str = "kubernetes_deployment", unique_id: str = "default/deployments/nginx", name: str = "nginx", raw_references: list[str] | None = None, attributes: dict | None = None, provider: ProviderType = ProviderType.KUBERNETES, platform_category: PlatformCategory = PlatformCategory.CONTAINER_ORCHESTRATION, ) -> DiscoveredResource: """Create a sample DiscoveredResource for testing.""" return DiscoveredResource( resource_type=resource_type, unique_id=unique_id, name=name, provider=provider, platform_category=platform_category, architecture=CpuArchitecture.AARCH64, endpoint="https://k8s-api.local:6443", attributes=attributes or {}, raw_references=raw_references or [], ) def make_scan_result(resources: list[DiscoveredResource]) -> ScanResult: """Create a ScanResult from a list of resources.""" return ScanResult( resources=resources, warnings=[], errors=[], scan_timestamp="2024-01-15T10:30:00Z", profile_hash="test-hash", ) # --------------------------------------------------------------------------- # Tests: Simple linear dependency chain # --------------------------------------------------------------------------- class TestLinearDependencyChain: """Tests for a simple A -> B -> C dependency chain.""" def test_linear_chain_produces_correct_topological_order(self): """Resources in a linear chain appear in dependency order.""" # C depends on B, B depends on A resource_a = make_resource( resource_type="kubernetes_namespace", unique_id="ns/default", name="default", ) resource_b = make_resource( resource_type="kubernetes_service", unique_id="default/services/nginx-svc", name="nginx-svc", raw_references=["ns/default"], attributes={"namespace": "default"}, ) resource_c = make_resource( resource_type="kubernetes_ingress", unique_id="default/ingresses/nginx-ingress", name="nginx-ingress", raw_references=["default/services/nginx-svc"], attributes={"service": "nginx-svc"}, ) scan_result = make_scan_result([resource_a, resource_b, resource_c]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() # A must appear before B, B must appear before C order = graph.topological_order assert order.index("ns/default") < order.index("default/services/nginx-svc") assert order.index("default/services/nginx-svc") < order.index( "default/ingresses/nginx-ingress" ) def test_linear_chain_produces_correct_relationships(self): """Each link in the chain produces a relationship.""" resource_a = make_resource( resource_type="kubernetes_namespace", unique_id="ns/default", name="default", ) resource_b = make_resource( resource_type="kubernetes_service", unique_id="default/services/nginx-svc", name="nginx-svc", raw_references=["ns/default"], ) scan_result = make_scan_result([resource_a, resource_b]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 1 rel = graph.relationships[0] assert rel.source_id == "default/services/nginx-svc" assert rel.target_id == "ns/default" # --------------------------------------------------------------------------- # Tests: Multiple resources with shared dependencies # --------------------------------------------------------------------------- class TestSharedDependencies: """Tests for multiple resources depending on the same resource.""" def test_shared_dependency_appears_before_all_dependents(self): """A shared dependency appears before all resources that depend on it.""" namespace = make_resource( resource_type="kubernetes_namespace", unique_id="ns/production", name="production", ) deployment = make_resource( resource_type="kubernetes_deployment", unique_id="production/deployments/app", name="app", raw_references=["ns/production"], ) service = make_resource( resource_type="kubernetes_service", unique_id="production/services/app-svc", name="app-svc", raw_references=["ns/production"], ) scan_result = make_scan_result([namespace, deployment, service]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() order = graph.topological_order ns_idx = order.index("ns/production") deploy_idx = order.index("production/deployments/app") svc_idx = order.index("production/services/app-svc") assert ns_idx < deploy_idx assert ns_idx < svc_idx def test_shared_dependency_produces_multiple_relationships(self): """A shared dependency creates one relationship per dependent.""" namespace = make_resource( resource_type="kubernetes_namespace", unique_id="ns/production", name="production", ) deployment = make_resource( resource_type="kubernetes_deployment", unique_id="production/deployments/app", name="app", raw_references=["ns/production"], ) service = make_resource( resource_type="kubernetes_service", unique_id="production/services/app-svc", name="app-svc", raw_references=["ns/production"], ) scan_result = make_scan_result([namespace, deployment, service]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 2 target_ids = [r.target_id for r in graph.relationships] assert target_ids.count("ns/production") == 2 # --------------------------------------------------------------------------- # Tests: Resources with no references (standalone) # --------------------------------------------------------------------------- class TestStandaloneResources: """Tests for resources that have no references to other resources.""" def test_standalone_resources_appear_in_topological_order(self): """Resources with no references still appear in the topological order.""" resource_a = make_resource( resource_type="kubernetes_deployment", unique_id="default/deployments/standalone-a", name="standalone-a", ) resource_b = make_resource( resource_type="kubernetes_service", unique_id="default/services/standalone-b", name="standalone-b", ) scan_result = make_scan_result([resource_a, resource_b]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.topological_order) == 2 assert "default/deployments/standalone-a" in graph.topological_order assert "default/services/standalone-b" in graph.topological_order def test_standalone_resources_produce_no_relationships(self): """Resources with no references produce no relationships.""" resource_a = make_resource( resource_type="kubernetes_deployment", unique_id="default/deployments/standalone", name="standalone", ) scan_result = make_scan_result([resource_a]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 0 def test_empty_scan_result_produces_empty_graph(self): """An empty scan result produces an empty dependency graph.""" scan_result = make_scan_result([]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert graph.resources == [] assert graph.relationships == [] assert graph.topological_order == [] assert graph.cycles == [] assert graph.unresolved_references == [] # --------------------------------------------------------------------------- # Tests: Parent-child relationship detection # --------------------------------------------------------------------------- class TestParentChildRelationships: """Tests for parent-child relationship classification.""" def test_namespace_reference_classified_as_parent_child(self): """A reference to a kubernetes_namespace is classified as parent-child.""" namespace = make_resource( resource_type="kubernetes_namespace", unique_id="ns/default", name="default", ) deployment = make_resource( resource_type="kubernetes_deployment", unique_id="default/deployments/app", name="app", raw_references=["ns/default"], attributes={"namespace": "default"}, ) scan_result = make_scan_result([namespace, deployment]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 1 assert graph.relationships[0].relationship_type == "parent-child" def test_docker_network_reference_classified_as_parent_child(self): """A reference to a docker_network is classified as parent-child.""" network = make_resource( resource_type="docker_network", unique_id="networks/overlay-net", name="overlay-net", provider=ProviderType.DOCKER_SWARM, platform_category=PlatformCategory.CONTAINER_ORCHESTRATION, ) service = make_resource( resource_type="docker_service", unique_id="services/web", name="web", raw_references=["networks/overlay-net"], provider=ProviderType.DOCKER_SWARM, platform_category=PlatformCategory.CONTAINER_ORCHESTRATION, ) scan_result = make_scan_result([network, service]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 1 assert graph.relationships[0].relationship_type == "parent-child" def test_dependency_relationship_for_iis_site_to_app_pool(self): """An IIS site referencing an app pool is classified as dependency.""" app_pool = make_resource( resource_type="windows_iis_app_pool", unique_id="win-server/iis/app_pools/DefaultAppPool", name="DefaultAppPool", provider=ProviderType.WINDOWS, platform_category=PlatformCategory.WINDOWS, ) iis_site = make_resource( resource_type="windows_iis_site", unique_id="win-server/iis/sites/Default Web Site", name="Default Web Site", raw_references=["win-server/iis/app_pools/DefaultAppPool"], attributes={"app_pool": "DefaultAppPool"}, provider=ProviderType.WINDOWS, platform_category=PlatformCategory.WINDOWS, ) scan_result = make_scan_result([app_pool, iis_site]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 1 assert graph.relationships[0].relationship_type == "dependency" def test_generic_reference_classified_as_reference(self): """A reference to a non-namespace, non-dependency resource is 'reference'.""" service = make_resource( resource_type="kubernetes_service", unique_id="default/services/nginx-svc", name="nginx-svc", ) deployment = make_resource( resource_type="kubernetes_deployment", unique_id="default/deployments/nginx", name="nginx", raw_references=["default/services/nginx-svc"], ) scan_result = make_scan_result([service, deployment]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 1 assert graph.relationships[0].relationship_type == "reference" # --------------------------------------------------------------------------- # Tests: Topological order validity # --------------------------------------------------------------------------- class TestTopologicalOrderValidity: """Tests that topological order is valid (no resource before its dependencies).""" def test_no_resource_appears_before_its_dependencies(self): """In the topological order, no resource appears before any it depends on.""" namespace = make_resource( resource_type="kubernetes_namespace", unique_id="ns/prod", name="prod", ) config_map = make_resource( resource_type="kubernetes_config_map", unique_id="prod/configmaps/app-config", name="app-config", raw_references=["ns/prod"], ) deployment = make_resource( resource_type="kubernetes_deployment", unique_id="prod/deployments/app", name="app", raw_references=["ns/prod", "prod/configmaps/app-config"], ) service = make_resource( resource_type="kubernetes_service", unique_id="prod/services/app-svc", name="app-svc", raw_references=["ns/prod"], ) ingress = make_resource( resource_type="kubernetes_ingress", unique_id="prod/ingresses/app-ingress", name="app-ingress", raw_references=["prod/services/app-svc"], ) scan_result = make_scan_result( [namespace, config_map, deployment, service, ingress] ) resolver = DependencyResolver(scan_result) graph = resolver.resolve() order = graph.topological_order # Verify: for each relationship, target appears before source for rel in graph.relationships: target_idx = order.index(rel.target_id) source_idx = order.index(rel.source_id) assert target_idx < source_idx, ( f"Target {rel.target_id} (idx={target_idx}) should appear before " f"source {rel.source_id} (idx={source_idx})" ) def test_topological_order_contains_all_resources(self): """The topological order contains every resource exactly once.""" resources = [ make_resource( resource_type="kubernetes_deployment", unique_id=f"default/deployments/app-{i}", name=f"app-{i}", ) for i in range(5) ] scan_result = make_scan_result(resources) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.topological_order) == 5 assert len(set(graph.topological_order)) == 5 # All unique def test_graph_returns_all_resources_in_resources_field(self): """The DependencyGraph.resources field contains all input resources.""" resources = [ make_resource( unique_id=f"resource-{i}", name=f"res-{i}", ) for i in range(3) ] scan_result = make_scan_result(resources) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert graph.resources == resources def test_source_attribute_identified_from_attributes(self): """The source_attribute is identified from the resource's attributes dict.""" app_pool = make_resource( resource_type="windows_iis_app_pool", unique_id="win/iis/app_pools/MyPool", name="MyPool", provider=ProviderType.WINDOWS, platform_category=PlatformCategory.WINDOWS, ) site = make_resource( resource_type="windows_iis_site", unique_id="win/iis/sites/MySite", name="MySite", raw_references=["win/iis/app_pools/MyPool"], attributes={"app_pool": "MyPool", "state": "Started"}, provider=ProviderType.WINDOWS, platform_category=PlatformCategory.WINDOWS, ) scan_result = make_scan_result([app_pool, site]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert graph.relationships[0].source_attribute == "app_pool" def test_unresolved_references_are_skipped(self): """References to IDs not in the inventory are skipped (no relationship created).""" resource = make_resource( resource_type="kubernetes_deployment", unique_id="default/deployments/app", name="app", raw_references=["nonexistent/resource/id"], ) scan_result = make_scan_result([resource]) resolver = DependencyResolver(scan_result) graph = resolver.resolve() assert len(graph.relationships) == 0 # Unresolved references are now tracked (task 4.3) assert len(graph.unresolved_references) == 1 assert graph.unresolved_references[0].referenced_id == "nonexistent/resource/id"