446 lines
17 KiB
Python
446 lines
17 KiB
Python
"""Unit tests for unresolved reference handling in the DependencyResolver."""
|
|
|
|
import logging
|
|
|
|
import pytest
|
|
|
|
from iac_reverse.models import (
|
|
CpuArchitecture,
|
|
DiscoveredResource,
|
|
PlatformCategory,
|
|
ProviderType,
|
|
ScanResult,
|
|
)
|
|
from iac_reverse.resolver import DependencyResolver
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def make_resource(
|
|
resource_type: str = "kubernetes_deployment",
|
|
unique_id: str = "default/deployments/nginx",
|
|
name: str = "nginx",
|
|
raw_references: list[str] | None = None,
|
|
attributes: dict | None = None,
|
|
provider: ProviderType = ProviderType.KUBERNETES,
|
|
platform_category: PlatformCategory = PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
) -> DiscoveredResource:
|
|
"""Create a sample DiscoveredResource for testing."""
|
|
return DiscoveredResource(
|
|
resource_type=resource_type,
|
|
unique_id=unique_id,
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=platform_category,
|
|
architecture=CpuArchitecture.AARCH64,
|
|
endpoint="https://k8s-api.local:6443",
|
|
attributes=attributes or {},
|
|
raw_references=raw_references or [],
|
|
)
|
|
|
|
|
|
def make_scan_result(resources: list[DiscoveredResource]) -> ScanResult:
|
|
"""Create a ScanResult from a list of resources."""
|
|
return ScanResult(
|
|
resources=resources,
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp="2024-01-15T10:30:00Z",
|
|
profile_hash="test-hash",
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Single unresolved reference
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSingleUnresolvedReference:
|
|
"""Tests for a single unresolved reference creating an UnresolvedReference entry."""
|
|
|
|
def test_single_unresolved_reference_creates_entry(self):
|
|
"""A raw_reference pointing to an ID not in the inventory creates an UnresolvedReference."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["nonexistent/resource/id"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.unresolved_references) == 1
|
|
unresolved = graph.unresolved_references[0]
|
|
assert unresolved.source_resource_id == "default/deployments/app"
|
|
assert unresolved.referenced_id == "nonexistent/resource/id"
|
|
|
|
def test_unresolved_reference_source_attribute_from_attributes(self):
|
|
"""The source_attribute is identified from the resource's attributes dict."""
|
|
resource = make_resource(
|
|
resource_type="windows_iis_site",
|
|
unique_id="win/iis/sites/MySite",
|
|
name="MySite",
|
|
raw_references=["missing-pool-id"],
|
|
attributes={"app_pool": "missing-pool-id", "state": "Started"},
|
|
provider=ProviderType.WINDOWS,
|
|
platform_category=PlatformCategory.WINDOWS,
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.unresolved_references) == 1
|
|
assert graph.unresolved_references[0].source_attribute == "app_pool"
|
|
|
|
def test_unresolved_reference_fallback_to_raw_references(self):
|
|
"""Falls back to 'raw_references' when the ref ID isn't in attributes."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["some/external/resource"],
|
|
attributes={"replicas": "3"},
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.unresolved_references[0].source_attribute == "raw_references"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Multiple unresolved references from same resource
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMultipleUnresolvedFromSameResource:
|
|
"""Tests for multiple unresolved references from the same resource."""
|
|
|
|
def test_multiple_unresolved_from_same_resource(self):
|
|
"""Multiple unresolved references from one resource create multiple entries."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=[
|
|
"missing/namespace/id",
|
|
"missing/configmap/id",
|
|
"missing/secret/id",
|
|
],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.unresolved_references) == 3
|
|
referenced_ids = [u.referenced_id for u in graph.unresolved_references]
|
|
assert "missing/namespace/id" in referenced_ids
|
|
assert "missing/configmap/id" in referenced_ids
|
|
assert "missing/secret/id" in referenced_ids
|
|
|
|
def test_all_entries_share_same_source_resource_id(self):
|
|
"""All unresolved entries from the same resource share the source_resource_id."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["missing/ref-a", "missing/ref-b"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
for unresolved in graph.unresolved_references:
|
|
assert unresolved.source_resource_id == "default/deployments/app"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Mix of resolved and unresolved references
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMixedResolvedAndUnresolved:
|
|
"""Tests for a mix of resolved and unresolved references."""
|
|
|
|
def test_resolved_creates_relationship_unresolved_creates_entry(self):
|
|
"""Resolved references create relationships; unresolved create entries."""
|
|
namespace = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/default",
|
|
name="default",
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["ns/default", "missing/configmap/app-config"],
|
|
attributes={"namespace": "default"},
|
|
)
|
|
|
|
scan_result = make_scan_result([namespace, deployment])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
# One resolved relationship
|
|
assert len(graph.relationships) == 1
|
|
assert graph.relationships[0].target_id == "ns/default"
|
|
|
|
# One unresolved reference
|
|
assert len(graph.unresolved_references) == 1
|
|
assert (
|
|
graph.unresolved_references[0].referenced_id
|
|
== "missing/configmap/app-config"
|
|
)
|
|
|
|
def test_mixed_references_multiple_resources(self):
|
|
"""Multiple resources with a mix of resolved and unresolved references."""
|
|
namespace = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/prod",
|
|
name="prod",
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="prod/deployments/web",
|
|
name="web",
|
|
raw_references=["ns/prod", "external/database/postgres"],
|
|
)
|
|
service = make_resource(
|
|
resource_type="kubernetes_service",
|
|
unique_id="prod/services/web-svc",
|
|
name="web-svc",
|
|
raw_references=["ns/prod", "missing/endpoint"],
|
|
)
|
|
|
|
scan_result = make_scan_result([namespace, deployment, service])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
# Two resolved relationships (deployment->ns, service->ns)
|
|
assert len(graph.relationships) == 2
|
|
|
|
# Two unresolved references
|
|
assert len(graph.unresolved_references) == 2
|
|
unresolved_ids = [u.referenced_id for u in graph.unresolved_references]
|
|
assert "external/database/postgres" in unresolved_ids
|
|
assert "missing/endpoint" in unresolved_ids
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: suggested_resolution is "data_source" for ID-like references
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSuggestedResolutionDataSource:
|
|
"""Tests that suggested_resolution is 'data_source' for ID-like references."""
|
|
|
|
def test_reference_with_slash_suggests_data_source(self):
|
|
"""A reference containing '/' is suggested as 'data_source'."""
|
|
resource = make_resource(
|
|
raw_references=["external/vpc/vpc-12345"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.unresolved_references[0].suggested_resolution == "data_source"
|
|
|
|
def test_reference_with_colon_suggests_data_source(self):
|
|
"""A reference containing ':' is suggested as 'data_source'."""
|
|
resource = make_resource(
|
|
raw_references=["arn:aws:ec2:us-east-1:123456:instance/i-abc123"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.unresolved_references[0].suggested_resolution == "data_source"
|
|
|
|
def test_reference_with_both_slash_and_colon_suggests_data_source(self):
|
|
"""A reference containing both '/' and ':' is suggested as 'data_source'."""
|
|
resource = make_resource(
|
|
raw_references=["provider:type/resource-name"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.unresolved_references[0].suggested_resolution == "data_source"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: suggested_resolution is "variable" for simple name references
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSuggestedResolutionVariable:
|
|
"""Tests that suggested_resolution is 'variable' for simple name references."""
|
|
|
|
def test_simple_name_suggests_variable(self):
|
|
"""A simple name without '/' or ':' is suggested as 'variable'."""
|
|
resource = make_resource(
|
|
raw_references=["my-environment"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.unresolved_references[0].suggested_resolution == "variable"
|
|
|
|
def test_alphanumeric_name_suggests_variable(self):
|
|
"""An alphanumeric name is suggested as 'variable'."""
|
|
resource = make_resource(
|
|
raw_references=["production123"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.unresolved_references[0].suggested_resolution == "variable"
|
|
|
|
def test_name_with_dashes_and_underscores_suggests_variable(self):
|
|
"""A name with dashes and underscores (but no / or :) is 'variable'."""
|
|
resource = make_resource(
|
|
raw_references=["my_app-pool-name"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert graph.unresolved_references[0].suggested_resolution == "variable"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Unresolved references don't create graph edges or relationships
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUnresolvedDontCreateEdges:
|
|
"""Tests that unresolved references don't create graph edges or relationships."""
|
|
|
|
def test_unresolved_reference_creates_no_relationship(self):
|
|
"""An unresolved reference does not create a ResourceRelationship."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["nonexistent/resource"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
assert len(graph.relationships) == 0
|
|
|
|
def test_unresolved_reference_does_not_affect_topological_order(self):
|
|
"""Unresolved references don't add extra nodes to the topological order."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["nonexistent/resource"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
# Only the actual resource should be in the topological order
|
|
assert graph.topological_order == ["default/deployments/app"]
|
|
|
|
def test_unresolved_does_not_block_resolved_relationships(self):
|
|
"""Unresolved references don't prevent resolved references from working."""
|
|
namespace = make_resource(
|
|
resource_type="kubernetes_namespace",
|
|
unique_id="ns/default",
|
|
name="default",
|
|
)
|
|
deployment = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["ns/default", "nonexistent/configmap"],
|
|
)
|
|
|
|
scan_result = make_scan_result([namespace, deployment])
|
|
resolver = DependencyResolver(scan_result)
|
|
graph = resolver.resolve()
|
|
|
|
# The resolved relationship still works
|
|
assert len(graph.relationships) == 1
|
|
assert graph.relationships[0].source_id == "default/deployments/app"
|
|
assert graph.relationships[0].target_id == "ns/default"
|
|
|
|
# Topological order is correct
|
|
order = graph.topological_order
|
|
assert order.index("ns/default") < order.index("default/deployments/app")
|
|
|
|
# Unresolved reference is tracked
|
|
assert len(graph.unresolved_references) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Warning logging for unresolved references
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestUnresolvedReferenceLogging:
|
|
"""Tests that warnings are logged for unresolved references."""
|
|
|
|
def test_warning_logged_for_unresolved_reference(self, caplog):
|
|
"""A warning is logged for each unresolved reference."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["missing/resource/id"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
|
|
with caplog.at_level(logging.WARNING, logger="iac_reverse.resolver.resolver"):
|
|
graph = resolver.resolve()
|
|
|
|
assert len(caplog.records) == 1
|
|
record = caplog.records[0]
|
|
assert record.levelname == "WARNING"
|
|
assert "missing/resource/id" in record.message
|
|
assert "default/deployments/app" in record.message
|
|
|
|
def test_multiple_warnings_logged_for_multiple_unresolved(self, caplog):
|
|
"""A warning is logged for each unresolved reference."""
|
|
resource = make_resource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="default/deployments/app",
|
|
name="app",
|
|
raw_references=["missing/ref-a", "missing/ref-b"],
|
|
)
|
|
|
|
scan_result = make_scan_result([resource])
|
|
resolver = DependencyResolver(scan_result)
|
|
|
|
with caplog.at_level(logging.WARNING, logger="iac_reverse.resolver.resolver"):
|
|
graph = resolver.resolve()
|
|
|
|
warning_messages = [r.message for r in caplog.records if r.levelname == "WARNING"]
|
|
assert len(warning_messages) == 2
|
|
assert any("missing/ref-a" in msg for msg in warning_messages)
|
|
assert any("missing/ref-b" in msg for msg in warning_messages)
|