Files
SnarfCode/tests/unit/test_resolver.py
2026-05-22 00:19:30 -04:00

482 lines
18 KiB
Python

"""Unit tests for the DependencyResolver."""
import pytest
from iac_reverse.models import (
CpuArchitecture,
DependencyGraph,
DiscoveredResource,
PlatformCategory,
ProviderType,
ScanResult,
)
from iac_reverse.resolver import DependencyResolver
# ---------------------------------------------------------------------------
# Helpers / Fixtures
# ---------------------------------------------------------------------------
def make_resource(
resource_type: str = "kubernetes_deployment",
unique_id: str = "default/deployments/nginx",
name: str = "nginx",
raw_references: list[str] | None = None,
attributes: dict | None = None,
provider: ProviderType = ProviderType.KUBERNETES,
platform_category: PlatformCategory = PlatformCategory.CONTAINER_ORCHESTRATION,
) -> DiscoveredResource:
"""Create a sample DiscoveredResource for testing."""
return DiscoveredResource(
resource_type=resource_type,
unique_id=unique_id,
name=name,
provider=provider,
platform_category=platform_category,
architecture=CpuArchitecture.AARCH64,
endpoint="https://k8s-api.local:6443",
attributes=attributes or {},
raw_references=raw_references or [],
)
def make_scan_result(resources: list[DiscoveredResource]) -> ScanResult:
"""Create a ScanResult from a list of resources."""
return ScanResult(
resources=resources,
warnings=[],
errors=[],
scan_timestamp="2024-01-15T10:30:00Z",
profile_hash="test-hash",
)
# ---------------------------------------------------------------------------
# Tests: Simple linear dependency chain
# ---------------------------------------------------------------------------
class TestLinearDependencyChain:
"""Tests for a simple A -> B -> C dependency chain."""
def test_linear_chain_produces_correct_topological_order(self):
"""Resources in a linear chain appear in dependency order."""
# C depends on B, B depends on A
resource_a = make_resource(
resource_type="kubernetes_namespace",
unique_id="ns/default",
name="default",
)
resource_b = make_resource(
resource_type="kubernetes_service",
unique_id="default/services/nginx-svc",
name="nginx-svc",
raw_references=["ns/default"],
attributes={"namespace": "default"},
)
resource_c = make_resource(
resource_type="kubernetes_ingress",
unique_id="default/ingresses/nginx-ingress",
name="nginx-ingress",
raw_references=["default/services/nginx-svc"],
attributes={"service": "nginx-svc"},
)
scan_result = make_scan_result([resource_a, resource_b, resource_c])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
# A must appear before B, B must appear before C
order = graph.topological_order
assert order.index("ns/default") < order.index("default/services/nginx-svc")
assert order.index("default/services/nginx-svc") < order.index(
"default/ingresses/nginx-ingress"
)
def test_linear_chain_produces_correct_relationships(self):
"""Each link in the chain produces a relationship."""
resource_a = make_resource(
resource_type="kubernetes_namespace",
unique_id="ns/default",
name="default",
)
resource_b = make_resource(
resource_type="kubernetes_service",
unique_id="default/services/nginx-svc",
name="nginx-svc",
raw_references=["ns/default"],
)
scan_result = make_scan_result([resource_a, resource_b])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 1
rel = graph.relationships[0]
assert rel.source_id == "default/services/nginx-svc"
assert rel.target_id == "ns/default"
# ---------------------------------------------------------------------------
# Tests: Multiple resources with shared dependencies
# ---------------------------------------------------------------------------
class TestSharedDependencies:
"""Tests for multiple resources depending on the same resource."""
def test_shared_dependency_appears_before_all_dependents(self):
"""A shared dependency appears before all resources that depend on it."""
namespace = make_resource(
resource_type="kubernetes_namespace",
unique_id="ns/production",
name="production",
)
deployment = make_resource(
resource_type="kubernetes_deployment",
unique_id="production/deployments/app",
name="app",
raw_references=["ns/production"],
)
service = make_resource(
resource_type="kubernetes_service",
unique_id="production/services/app-svc",
name="app-svc",
raw_references=["ns/production"],
)
scan_result = make_scan_result([namespace, deployment, service])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
order = graph.topological_order
ns_idx = order.index("ns/production")
deploy_idx = order.index("production/deployments/app")
svc_idx = order.index("production/services/app-svc")
assert ns_idx < deploy_idx
assert ns_idx < svc_idx
def test_shared_dependency_produces_multiple_relationships(self):
"""A shared dependency creates one relationship per dependent."""
namespace = make_resource(
resource_type="kubernetes_namespace",
unique_id="ns/production",
name="production",
)
deployment = make_resource(
resource_type="kubernetes_deployment",
unique_id="production/deployments/app",
name="app",
raw_references=["ns/production"],
)
service = make_resource(
resource_type="kubernetes_service",
unique_id="production/services/app-svc",
name="app-svc",
raw_references=["ns/production"],
)
scan_result = make_scan_result([namespace, deployment, service])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 2
target_ids = [r.target_id for r in graph.relationships]
assert target_ids.count("ns/production") == 2
# ---------------------------------------------------------------------------
# Tests: Resources with no references (standalone)
# ---------------------------------------------------------------------------
class TestStandaloneResources:
"""Tests for resources that have no references to other resources."""
def test_standalone_resources_appear_in_topological_order(self):
"""Resources with no references still appear in the topological order."""
resource_a = make_resource(
resource_type="kubernetes_deployment",
unique_id="default/deployments/standalone-a",
name="standalone-a",
)
resource_b = make_resource(
resource_type="kubernetes_service",
unique_id="default/services/standalone-b",
name="standalone-b",
)
scan_result = make_scan_result([resource_a, resource_b])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.topological_order) == 2
assert "default/deployments/standalone-a" in graph.topological_order
assert "default/services/standalone-b" in graph.topological_order
def test_standalone_resources_produce_no_relationships(self):
"""Resources with no references produce no relationships."""
resource_a = make_resource(
resource_type="kubernetes_deployment",
unique_id="default/deployments/standalone",
name="standalone",
)
scan_result = make_scan_result([resource_a])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 0
def test_empty_scan_result_produces_empty_graph(self):
"""An empty scan result produces an empty dependency graph."""
scan_result = make_scan_result([])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert graph.resources == []
assert graph.relationships == []
assert graph.topological_order == []
assert graph.cycles == []
assert graph.unresolved_references == []
# ---------------------------------------------------------------------------
# Tests: Parent-child relationship detection
# ---------------------------------------------------------------------------
class TestParentChildRelationships:
"""Tests for parent-child relationship classification."""
def test_namespace_reference_classified_as_parent_child(self):
"""A reference to a kubernetes_namespace is classified as parent-child."""
namespace = make_resource(
resource_type="kubernetes_namespace",
unique_id="ns/default",
name="default",
)
deployment = make_resource(
resource_type="kubernetes_deployment",
unique_id="default/deployments/app",
name="app",
raw_references=["ns/default"],
attributes={"namespace": "default"},
)
scan_result = make_scan_result([namespace, deployment])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 1
assert graph.relationships[0].relationship_type == "parent-child"
def test_docker_network_reference_classified_as_parent_child(self):
"""A reference to a docker_network is classified as parent-child."""
network = make_resource(
resource_type="docker_network",
unique_id="networks/overlay-net",
name="overlay-net",
provider=ProviderType.DOCKER_SWARM,
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
)
service = make_resource(
resource_type="docker_service",
unique_id="services/web",
name="web",
raw_references=["networks/overlay-net"],
provider=ProviderType.DOCKER_SWARM,
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
)
scan_result = make_scan_result([network, service])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 1
assert graph.relationships[0].relationship_type == "parent-child"
def test_dependency_relationship_for_iis_site_to_app_pool(self):
"""An IIS site referencing an app pool is classified as dependency."""
app_pool = make_resource(
resource_type="windows_iis_app_pool",
unique_id="win-server/iis/app_pools/DefaultAppPool",
name="DefaultAppPool",
provider=ProviderType.WINDOWS,
platform_category=PlatformCategory.WINDOWS,
)
iis_site = make_resource(
resource_type="windows_iis_site",
unique_id="win-server/iis/sites/Default Web Site",
name="Default Web Site",
raw_references=["win-server/iis/app_pools/DefaultAppPool"],
attributes={"app_pool": "DefaultAppPool"},
provider=ProviderType.WINDOWS,
platform_category=PlatformCategory.WINDOWS,
)
scan_result = make_scan_result([app_pool, iis_site])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 1
assert graph.relationships[0].relationship_type == "dependency"
def test_generic_reference_classified_as_reference(self):
"""A reference to a non-namespace, non-dependency resource is 'reference'."""
service = make_resource(
resource_type="kubernetes_service",
unique_id="default/services/nginx-svc",
name="nginx-svc",
)
deployment = make_resource(
resource_type="kubernetes_deployment",
unique_id="default/deployments/nginx",
name="nginx",
raw_references=["default/services/nginx-svc"],
)
scan_result = make_scan_result([service, deployment])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 1
assert graph.relationships[0].relationship_type == "reference"
# ---------------------------------------------------------------------------
# Tests: Topological order validity
# ---------------------------------------------------------------------------
class TestTopologicalOrderValidity:
"""Tests that topological order is valid (no resource before its dependencies)."""
def test_no_resource_appears_before_its_dependencies(self):
"""In the topological order, no resource appears before any it depends on."""
namespace = make_resource(
resource_type="kubernetes_namespace",
unique_id="ns/prod",
name="prod",
)
config_map = make_resource(
resource_type="kubernetes_config_map",
unique_id="prod/configmaps/app-config",
name="app-config",
raw_references=["ns/prod"],
)
deployment = make_resource(
resource_type="kubernetes_deployment",
unique_id="prod/deployments/app",
name="app",
raw_references=["ns/prod", "prod/configmaps/app-config"],
)
service = make_resource(
resource_type="kubernetes_service",
unique_id="prod/services/app-svc",
name="app-svc",
raw_references=["ns/prod"],
)
ingress = make_resource(
resource_type="kubernetes_ingress",
unique_id="prod/ingresses/app-ingress",
name="app-ingress",
raw_references=["prod/services/app-svc"],
)
scan_result = make_scan_result(
[namespace, config_map, deployment, service, ingress]
)
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
order = graph.topological_order
# Verify: for each relationship, target appears before source
for rel in graph.relationships:
target_idx = order.index(rel.target_id)
source_idx = order.index(rel.source_id)
assert target_idx < source_idx, (
f"Target {rel.target_id} (idx={target_idx}) should appear before "
f"source {rel.source_id} (idx={source_idx})"
)
def test_topological_order_contains_all_resources(self):
"""The topological order contains every resource exactly once."""
resources = [
make_resource(
resource_type="kubernetes_deployment",
unique_id=f"default/deployments/app-{i}",
name=f"app-{i}",
)
for i in range(5)
]
scan_result = make_scan_result(resources)
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.topological_order) == 5
assert len(set(graph.topological_order)) == 5 # All unique
def test_graph_returns_all_resources_in_resources_field(self):
"""The DependencyGraph.resources field contains all input resources."""
resources = [
make_resource(
unique_id=f"resource-{i}",
name=f"res-{i}",
)
for i in range(3)
]
scan_result = make_scan_result(resources)
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert graph.resources == resources
def test_source_attribute_identified_from_attributes(self):
"""The source_attribute is identified from the resource's attributes dict."""
app_pool = make_resource(
resource_type="windows_iis_app_pool",
unique_id="win/iis/app_pools/MyPool",
name="MyPool",
provider=ProviderType.WINDOWS,
platform_category=PlatformCategory.WINDOWS,
)
site = make_resource(
resource_type="windows_iis_site",
unique_id="win/iis/sites/MySite",
name="MySite",
raw_references=["win/iis/app_pools/MyPool"],
attributes={"app_pool": "MyPool", "state": "Started"},
provider=ProviderType.WINDOWS,
platform_category=PlatformCategory.WINDOWS,
)
scan_result = make_scan_result([app_pool, site])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert graph.relationships[0].source_attribute == "app_pool"
def test_unresolved_references_are_skipped(self):
"""References to IDs not in the inventory are skipped (no relationship created)."""
resource = make_resource(
resource_type="kubernetes_deployment",
unique_id="default/deployments/app",
name="app",
raw_references=["nonexistent/resource/id"],
)
scan_result = make_scan_result([resource])
resolver = DependencyResolver(scan_result)
graph = resolver.resolve()
assert len(graph.relationships) == 0
# Unresolved references are now tracked (task 4.3)
assert len(graph.unresolved_references) == 1
assert graph.unresolved_references[0].referenced_id == "nonexistent/resource/id"