482 lines
18 KiB
Python
482 lines
18 KiB
Python
"""Unit tests for the DependencyResolver."""
|
|
|
|
import pytest
|
|
|
|
from iac_reverse.models import (
|
|
CpuArchitecture,
|
|
DependencyGraph,
|
|
DiscoveredResource,
|
|
PlatformCategory,
|
|
ProviderType,
|
|
ScanResult,
|
|
)
|
|
from iac_reverse.resolver import DependencyResolver
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers / Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def make_resource(
|
|
resource_type: str = "kubernetes_deployment",
|
|
unique_id: str = "default/deployments/nginx",
|
|
name: str = "nginx",
|
|
raw_references: list[str] | None = None,
|
|
attributes: dict | None = None,
|
|
provider: ProviderType = ProviderType.KUBERNETES,
|
|
platform_category: PlatformCategory = PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
) -> DiscoveredResource:
|
|
"""Create a sample DiscoveredResource for testing."""
|
|
return DiscoveredResource(
|
|
resource_type=resource_type,
|
|
unique_id=unique_id,
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=platform_category,
|
|
architecture=CpuArchitecture.AARCH64,
|
|
endpoint="https://k8s-api.local:6443",
|
|
attributes=attributes or {},
|
|
raw_references=raw_references or [],
|
|
)
|
|
|
|
|
|
def make_scan_result(resources: list[DiscoveredResource]) -> ScanResult:
|
|
"""Create a ScanResult from a list of resources."""
|
|
return ScanResult(
|
|
resources=resources,
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp="2024-01-15T10:30:00Z",
|
|
profile_hash="test-hash",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Simple linear dependency chain
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestLinearDependencyChain:
|
|
"""Tests for a simple A -> B -> C dependency chain."""
|
|
|
|
def test_linear_chain_produces_correct_topological_order(self):
|
|
"""Resources in a linear chain appear in dependency order."""
|
|
# C depends on B, B depends on A
|
|
resource_a = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/default",
|
|
name="default",
|
|
)
|
|
resource_b = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="default/services/nginx-svc",
|
|
name="nginx-svc",
|
|
raw_references=["ns/default"],
|
|
attributes={"namespace": "default"},
|
|
)
|
|
resource_c = make_resource(
|
|
resource_type="kubernetes_ingress",
|
|
unique_id="default/ingresses/nginx-ingress",
|
|
name="nginx-ingress",
|
|
raw_references=["default/services/nginx-svc"],
|
|
attributes={"service": "nginx-svc"},
|
|
)
|
|
|
|
scan_result = make_scan_result([resource_a, resource_b, resource_c])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
# A must appear before B, B must appear before C
|
|
order = graph.topological_order
|
|
assert order.index("ns/default") < order.index("default/services/nginx-svc")
|
|
assert order.index("default/services/nginx-svc") < order.index(
|
|
"default/ingresses/nginx-ingress"
|
|
)
|
|
|
|
def test_linear_chain_produces_correct_relationships(self):
|
|
"""Each link in the chain produces a relationship."""
|
|
resource_a = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/default",
|
|
name="default",
|
|
)
|
|
resource_b = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="default/services/nginx-svc",
|
|
name="nginx-svc",
|
|
raw_references=["ns/default"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource_a, resource_b])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 1
|
|
rel = graph.relationships[0]
|
|
assert rel.source_id == "default/services/nginx-svc"
|
|
assert rel.target_id == "ns/default"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Multiple resources with shared dependencies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSharedDependencies:
|
|
"""Tests for multiple resources depending on the same resource."""
|
|
|
|
def test_shared_dependency_appears_before_all_dependents(self):
|
|
"""A shared dependency appears before all resources that depend on it."""
|
|
namespace = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/production",
|
|
name="production",
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="production/deployments/app",
|
|
name="app",
|
|
raw_references=["ns/production"],
|
|
)
|
|
service = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="production/services/app-svc",
|
|
name="app-svc",
|
|
raw_references=["ns/production"],
|
|
)
|
|
|
|
scan_result = make_scan_result([namespace, deployment, service])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
order = graph.topological_order
|
|
ns_idx = order.index("ns/production")
|
|
deploy_idx = order.index("production/deployments/app")
|
|
svc_idx = order.index("production/services/app-svc")
|
|
|
|
assert ns_idx < deploy_idx
|
|
assert ns_idx < svc_idx
|
|
|
|
def test_shared_dependency_produces_multiple_relationships(self):
|
|
"""A shared dependency creates one relationship per dependent."""
|
|
namespace = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/production",
|
|
name="production",
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="production/deployments/app",
|
|
name="app",
|
|
raw_references=["ns/production"],
|
|
)
|
|
service = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="production/services/app-svc",
|
|
name="app-svc",
|
|
raw_references=["ns/production"],
|
|
)
|
|
|
|
scan_result = make_scan_result([namespace, deployment, service])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 2
|
|
target_ids = [r.target_id for r in graph.relationships]
|
|
assert target_ids.count("ns/production") == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Resources with no references (standalone)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStandaloneResources:
|
|
"""Tests for resources that have no references to other resources."""
|
|
|
|
def test_standalone_resources_appear_in_topological_order(self):
|
|
"""Resources with no references still appear in the topological order."""
|
|
resource_a = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/standalone-a",
|
|
name="standalone-a",
|
|
)
|
|
resource_b = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="default/services/standalone-b",
|
|
name="standalone-b",
|
|
)
|
|
|
|
scan_result = make_scan_result([resource_a, resource_b])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.topological_order) == 2
|
|
assert "default/deployments/standalone-a" in graph.topological_order
|
|
assert "default/services/standalone-b" in graph.topological_order
|
|
|
|
def test_standalone_resources_produce_no_relationships(self):
|
|
"""Resources with no references produce no relationships."""
|
|
resource_a = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/standalone",
|
|
name="standalone",
|
|
)
|
|
|
|
scan_result = make_scan_result([resource_a])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 0
|
|
|
|
def test_empty_scan_result_produces_empty_graph(self):
|
|
"""An empty scan result produces an empty dependency graph."""
|
|
scan_result = make_scan_result([])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.resources == []
|
|
assert graph.relationships == []
|
|
assert graph.topological_order == []
|
|
assert graph.cycles == []
|
|
assert graph.unresolved_references == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Parent-child relationship detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestParentChildRelationships:
|
|
"""Tests for parent-child relationship classification."""
|
|
|
|
def test_namespace_reference_classified_as_parent_child(self):
|
|
"""A reference to a kubernetes_namespace is classified as parent-child."""
|
|
namespace = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/default",
|
|
name="default",
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["ns/default"],
|
|
attributes={"namespace": "default"},
|
|
)
|
|
|
|
scan_result = make_scan_result([namespace, deployment])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 1
|
|
assert graph.relationships[0].relationship_type == "parent-child"
|
|
|
|
def test_docker_network_reference_classified_as_parent_child(self):
|
|
"""A reference to a docker_network is classified as parent-child."""
|
|
network = make_resource(
|
|
resource_type="docker_network",
|
|
unique_id="networks/overlay-net",
|
|
name="overlay-net",
|
|
provider=ProviderType.DOCKER_SWARM,
|
|
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
)
|
|
service = make_resource(
|
|
resource_type="docker_service",
|
|
unique_id="services/web",
|
|
name="web",
|
|
raw_references=["networks/overlay-net"],
|
|
provider=ProviderType.DOCKER_SWARM,
|
|
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
)
|
|
|
|
scan_result = make_scan_result([network, service])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 1
|
|
assert graph.relationships[0].relationship_type == "parent-child"
|
|
|
|
def test_dependency_relationship_for_iis_site_to_app_pool(self):
|
|
"""An IIS site referencing an app pool is classified as dependency."""
|
|
app_pool = make_resource(
|
|
resource_type="windows_iis_app_pool",
|
|
unique_id="win-server/iis/app_pools/DefaultAppPool",
|
|
name="DefaultAppPool",
|
|
provider=ProviderType.WINDOWS,
|
|
platform_category=PlatformCategory.WINDOWS,
|
|
)
|
|
iis_site = make_resource(
|
|
resource_type="windows_iis_site",
|
|
unique_id="win-server/iis/sites/Default Web Site",
|
|
name="Default Web Site",
|
|
raw_references=["win-server/iis/app_pools/DefaultAppPool"],
|
|
attributes={"app_pool": "DefaultAppPool"},
|
|
provider=ProviderType.WINDOWS,
|
|
platform_category=PlatformCategory.WINDOWS,
|
|
)
|
|
|
|
scan_result = make_scan_result([app_pool, iis_site])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 1
|
|
assert graph.relationships[0].relationship_type == "dependency"
|
|
|
|
def test_generic_reference_classified_as_reference(self):
|
|
"""A reference to a non-namespace, non-dependency resource is 'reference'."""
|
|
service = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="default/services/nginx-svc",
|
|
name="nginx-svc",
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/nginx",
|
|
name="nginx",
|
|
raw_references=["default/services/nginx-svc"],
|
|
)
|
|
|
|
scan_result = make_scan_result([service, deployment])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 1
|
|
assert graph.relationships[0].relationship_type == "reference"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Topological order validity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestTopologicalOrderValidity:
|
|
"""Tests that topological order is valid (no resource before its dependencies)."""
|
|
|
|
def test_no_resource_appears_before_its_dependencies(self):
|
|
"""In the topological order, no resource appears before any it depends on."""
|
|
namespace = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/prod",
|
|
name="prod",
|
|
)
|
|
config_map = make_resource(
|
|
resource_type="kubernetes_config_map",
|
|
unique_id="prod/configmaps/app-config",
|
|
name="app-config",
|
|
raw_references=["ns/prod"],
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="prod/deployments/app",
|
|
name="app",
|
|
raw_references=["ns/prod", "prod/configmaps/app-config"],
|
|
)
|
|
service = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="prod/services/app-svc",
|
|
name="app-svc",
|
|
raw_references=["ns/prod"],
|
|
)
|
|
ingress = make_resource(
|
|
resource_type="kubernetes_ingress",
|
|
unique_id="prod/ingresses/app-ingress",
|
|
name="app-ingress",
|
|
raw_references=["prod/services/app-svc"],
|
|
)
|
|
|
|
scan_result = make_scan_result(
|
|
[namespace, config_map, deployment, service, ingress]
|
|
)
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
order = graph.topological_order
|
|
|
|
# Verify: for each relationship, target appears before source
|
|
for rel in graph.relationships:
|
|
target_idx = order.index(rel.target_id)
|
|
source_idx = order.index(rel.source_id)
|
|
assert target_idx < source_idx, (
|
|
f"Target {rel.target_id} (idx={target_idx}) should appear before "
|
|
f"source {rel.source_id} (idx={source_idx})"
|
|
)
|
|
|
|
def test_topological_order_contains_all_resources(self):
|
|
"""The topological order contains every resource exactly once."""
|
|
resources = [
|
|
make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id=f"default/deployments/app-{i}",
|
|
name=f"app-{i}",
|
|
)
|
|
for i in range(5)
|
|
]
|
|
|
|
scan_result = make_scan_result(resources)
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.topological_order) == 5
|
|
assert len(set(graph.topological_order)) == 5 # All unique
|
|
|
|
def test_graph_returns_all_resources_in_resources_field(self):
|
|
"""The DependencyGraph.resources field contains all input resources."""
|
|
resources = [
|
|
make_resource(
|
|
unique_id=f"resource-{i}",
|
|
name=f"res-{i}",
|
|
)
|
|
for i in range(3)
|
|
]
|
|
|
|
scan_result = make_scan_result(resources)
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.resources == resources
|
|
|
|
def test_source_attribute_identified_from_attributes(self):
|
|
"""The source_attribute is identified from the resource's attributes dict."""
|
|
app_pool = make_resource(
|
|
resource_type="windows_iis_app_pool",
|
|
unique_id="win/iis/app_pools/MyPool",
|
|
name="MyPool",
|
|
provider=ProviderType.WINDOWS,
|
|
platform_category=PlatformCategory.WINDOWS,
|
|
)
|
|
site = make_resource(
|
|
resource_type="windows_iis_site",
|
|
unique_id="win/iis/sites/MySite",
|
|
name="MySite",
|
|
raw_references=["win/iis/app_pools/MyPool"],
|
|
attributes={"app_pool": "MyPool", "state": "Started"},
|
|
provider=ProviderType.WINDOWS,
|
|
platform_category=PlatformCategory.WINDOWS,
|
|
)
|
|
|
|
scan_result = make_scan_result([app_pool, site])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.relationships[0].source_attribute == "app_pool"
|
|
|
|
def test_unresolved_references_are_skipped(self):
|
|
"""References to IDs not in the inventory are skipped (no relationship created)."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["nonexistent/resource/id"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 0
|
|
# Unresolved references are now tracked (task 4.3)
|
|
assert len(graph.unresolved_references) == 1
|
|
assert graph.unresolved_references[0].referenced_id == "nonexistent/resource/id"
|