Files
SnarfCode/tests/property/test_multi_provider_prop.py
2026-05-22 00:19:30 -04:00

804 lines
28 KiB
Python

"""Property-based tests for multi-provider merging and filtering.
**Validates: Requirements 5.3, 5.4, 6.1, 6.2, 6.4, 6.6, 6.7**
Property 18: Multi-provider merge with naming conflict resolution
For any two or more resource inventories from different on-premises providers
where resource names collide, the merged inventory SHALL contain all resources
from all providers, with conflicting names prefixed by the provider identifier,
and no resources lost.
Property 19: Provider block generation
For any resource set spanning N distinct on-premises providers, the generated
provider configuration SHALL contain exactly N provider blocks, one per distinct
provider.
Property 20: Scan profile validation completeness (additional multi-provider scenarios)
Already covered in test_scan_profile_validation_prop.py; this adds multi-provider
scenarios.
Property 21: Filtering correctness
For any scan profile with resource type filters, the discovered resources SHALL
be a subset where every resource's type is in the filter list. No resource outside
the filter criteria shall appear.
"""
from typing import Callable
from hypothesis import given, assume, settings
from hypothesis import strategies as st
from iac_reverse.models import (
CpuArchitecture,
DiscoveredResource,
GeneratedFile,
PlatformCategory,
PROVIDER_PLATFORM_MAP,
PROVIDER_SUPPORTED_RESOURCE_TYPES,
ProviderType,
ScanProfile,
ScanProgress,
ScanResult,
)
from iac_reverse.generator import ProviderBlockGenerator, ResourceMerger
from iac_reverse.plugin_base import ProviderPlugin
from iac_reverse.scanner.scanner import Scanner
# ---------------------------------------------------------------------------
# Hypothesis Strategies
# ---------------------------------------------------------------------------
provider_type_strategy = st.sampled_from(list(ProviderType))
architecture_strategy = st.sampled_from(list(CpuArchitecture))
non_empty_credentials_strategy = st.dictionaries(
keys=st.text(
min_size=1, max_size=20,
alphabet=st.characters(whitelist_categories=("L", "N")),
),
values=st.text(min_size=1, max_size=50),
min_size=1,
max_size=5,
)
# Strategy for resource names that could collide across providers
resource_name_strategy = st.text(
min_size=1,
max_size=30,
alphabet=st.characters(whitelist_categories=("L", "N", "Pd")),
).filter(lambda s: s.strip())
def discovered_resource_strategy(
provider: ProviderType,
name: str | None = None,
) -> st.SearchStrategy[DiscoveredResource]:
"""Generate a DiscoveredResource for a given provider with optional fixed name."""
supported_types = PROVIDER_SUPPORTED_RESOURCE_TYPES[provider]
platform_category = PROVIDER_PLATFORM_MAP[provider]
return st.builds(
DiscoveredResource,
resource_type=st.sampled_from(supported_types),
unique_id=st.uuids().map(str),
name=st.just(name) if name else resource_name_strategy,
provider=st.just(provider),
platform_category=st.just(platform_category),
architecture=architecture_strategy,
endpoint=st.just("http://localhost:8080"),
attributes=st.just({"key": "value"}),
raw_references=st.just([]),
)
def scan_result_strategy(
provider: ProviderType,
resources: list[DiscoveredResource] | None = None,
) -> st.SearchStrategy[ScanResult]:
"""Generate a ScanResult for a given provider."""
if resources is not None:
return st.just(ScanResult(
resources=resources,
warnings=[],
errors=[],
scan_timestamp="2024-01-01T00:00:00Z",
profile_hash="abc123",
))
return st.builds(
ScanResult,
resources=st.lists(
discovered_resource_strategy(provider),
min_size=1,
max_size=5,
),
warnings=st.just([]),
errors=st.just([]),
scan_timestamp=st.just("2024-01-01T00:00:00Z"),
profile_hash=st.just("abc123"),
)
# ---------------------------------------------------------------------------
# Mock Plugin for Filtering Tests
# ---------------------------------------------------------------------------
class FilteringPlugin(ProviderPlugin):
"""A plugin that discovers resources only for requested resource types."""
def __init__(self, provider: ProviderType):
self._provider = provider
self._supported = PROVIDER_SUPPORTED_RESOURCE_TYPES[provider]
def authenticate(self, credentials: dict[str, str]) -> None:
pass
def get_platform_category(self) -> PlatformCategory:
return PROVIDER_PLATFORM_MAP[self._provider]
def list_endpoints(self) -> list[str]:
return ["http://localhost:8080"]
def list_supported_resource_types(self) -> list[str]:
return self._supported
def detect_architecture(self, endpoint: str) -> CpuArchitecture:
return CpuArchitecture.AMD64
def discover_resources(
self,
endpoints: list[str],
resource_types: list[str],
progress_callback: Callable[[ScanProgress], None],
) -> ScanResult:
"""Discover exactly one resource per requested resource type."""
resources = []
for i, rt in enumerate(resource_types):
resources.append(
DiscoveredResource(
resource_type=rt,
unique_id=f"id-{rt}-{i}",
name=f"resource-{rt}-{i}",
provider=self._provider,
platform_category=PROVIDER_PLATFORM_MAP[self._provider],
architecture=CpuArchitecture.AMD64,
endpoint="http://localhost:8080",
attributes={"key": "value"},
)
)
progress_callback(ScanProgress(
current_resource_type=rt,
resources_discovered=i + 1,
resource_types_completed=i + 1,
total_resource_types=len(resource_types),
))
return ScanResult(
resources=resources,
warnings=[],
errors=[],
scan_timestamp="2024-01-01T00:00:00Z",
profile_hash="abc123",
)
# ---------------------------------------------------------------------------
# Property 18: Multi-provider merge with naming conflict resolution
# ---------------------------------------------------------------------------
class TestMultiProviderMergeConflictResolution:
"""Property 18: Multi-provider merge with naming conflict resolution.
When resources from different providers share the same name, the merger
prefixes with provider identifier.
**Validates: Requirements 5.3**
"""
@given(
provider_a=provider_type_strategy,
provider_b=provider_type_strategy,
shared_name=resource_name_strategy,
)
@settings(max_examples=100)
def test_conflicting_names_are_prefixed(self, provider_a, provider_b, shared_name):
"""Resources with the same name from different providers get prefixed."""
assume(provider_a != provider_b)
resource_a = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider_a][0],
unique_id="id-a",
name=shared_name,
provider=provider_a,
platform_category=PROVIDER_PLATFORM_MAP[provider_a],
architecture=CpuArchitecture.AMD64,
endpoint="http://host-a:8080",
attributes={"source": "a"},
)
resource_b = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider_b][0],
unique_id="id-b",
name=shared_name,
provider=provider_b,
platform_category=PROVIDER_PLATFORM_MAP[provider_b],
architecture=CpuArchitecture.AMD64,
endpoint="http://host-b:8080",
attributes={"source": "b"},
)
scan_result_a = ScanResult(
resources=[resource_a],
warnings=[], errors=[],
scan_timestamp="", profile_hash="",
)
scan_result_b = ScanResult(
resources=[resource_b],
warnings=[], errors=[],
scan_timestamp="", profile_hash="",
)
merger = ResourceMerger()
merged = merger.merge([scan_result_a, scan_result_b])
# Both resources must be present (no loss)
assert len(merged) == 2
# Conflicting names must be prefixed with provider identifier
merged_names = {r.name for r in merged}
expected_name_a = f"{provider_a.value}_{shared_name}"
expected_name_b = f"{provider_b.value}_{shared_name}"
assert expected_name_a in merged_names, (
f"Expected '{expected_name_a}' in merged names, got: {merged_names}"
)
assert expected_name_b in merged_names, (
f"Expected '{expected_name_b}' in merged names, got: {merged_names}"
)
@given(
provider_a=provider_type_strategy,
provider_b=provider_type_strategy,
name_a=resource_name_strategy,
name_b=resource_name_strategy,
)
@settings(max_examples=100)
def test_non_conflicting_names_unchanged(self, provider_a, provider_b, name_a, name_b):
"""Resources with unique names across providers are not prefixed."""
assume(provider_a != provider_b)
assume(name_a != name_b)
resource_a = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider_a][0],
unique_id="id-a",
name=name_a,
provider=provider_a,
platform_category=PROVIDER_PLATFORM_MAP[provider_a],
architecture=CpuArchitecture.AMD64,
endpoint="http://host-a:8080",
attributes={},
)
resource_b = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider_b][0],
unique_id="id-b",
name=name_b,
provider=provider_b,
platform_category=PROVIDER_PLATFORM_MAP[provider_b],
architecture=CpuArchitecture.AMD64,
endpoint="http://host-b:8080",
attributes={},
)
scan_result_a = ScanResult(
resources=[resource_a],
warnings=[], errors=[],
scan_timestamp="", profile_hash="",
)
scan_result_b = ScanResult(
resources=[resource_b],
warnings=[], errors=[],
scan_timestamp="", profile_hash="",
)
merger = ResourceMerger()
merged = merger.merge([scan_result_a, scan_result_b])
# No resources lost
assert len(merged) == 2
# Names should remain unchanged (no prefix)
merged_names = {r.name for r in merged}
assert name_a in merged_names, (
f"Expected original name '{name_a}' preserved, got: {merged_names}"
)
assert name_b in merged_names, (
f"Expected original name '{name_b}' preserved, got: {merged_names}"
)
@given(
provider_a=provider_type_strategy,
provider_b=provider_type_strategy,
provider_c=provider_type_strategy,
shared_name=resource_name_strategy,
)
@settings(max_examples=100)
def test_three_provider_conflict_all_prefixed(
self, provider_a, provider_b, provider_c, shared_name
):
"""When 3 providers share a name, all get prefixed."""
assume(len({provider_a, provider_b, provider_c}) == 3)
resources_and_results = []
for provider in [provider_a, provider_b, provider_c]:
resource = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider][0],
unique_id=f"id-{provider.value}",
name=shared_name,
provider=provider,
platform_category=PROVIDER_PLATFORM_MAP[provider],
architecture=CpuArchitecture.AMD64,
endpoint=f"http://{provider.value}:8080",
attributes={},
)
result = ScanResult(
resources=[resource],
warnings=[], errors=[],
scan_timestamp="", profile_hash="",
)
resources_and_results.append(result)
merger = ResourceMerger()
merged = merger.merge(resources_and_results)
# All 3 resources preserved
assert len(merged) == 3
# All must be prefixed
for provider in [provider_a, provider_b, provider_c]:
expected = f"{provider.value}_{shared_name}"
assert any(r.name == expected for r in merged), (
f"Expected prefixed name '{expected}' in merged results"
)
@given(
provider_a=provider_type_strategy,
provider_b=provider_type_strategy,
shared_name=resource_name_strategy,
)
@settings(max_examples=100)
def test_merge_preserves_all_resources_no_loss(
self, provider_a, provider_b, shared_name
):
"""Merging never loses resources regardless of naming conflicts."""
assume(provider_a != provider_b)
resource_a = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider_a][0],
unique_id="id-a",
name=shared_name,
provider=provider_a,
platform_category=PROVIDER_PLATFORM_MAP[provider_a],
architecture=CpuArchitecture.AMD64,
endpoint="http://host-a:8080",
attributes={"source": "a"},
)
resource_b = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider_b][0],
unique_id="id-b",
name=shared_name,
provider=provider_b,
platform_category=PROVIDER_PLATFORM_MAP[provider_b],
architecture=CpuArchitecture.AMD64,
endpoint="http://host-b:8080",
attributes={"source": "b"},
)
# Also add a non-conflicting resource
resource_c = DiscoveredResource(
resource_type=PROVIDER_SUPPORTED_RESOURCE_TYPES[provider_a][0],
unique_id="id-c",
name="unique_resource_name",
provider=provider_a,
platform_category=PROVIDER_PLATFORM_MAP[provider_a],
architecture=CpuArchitecture.AMD64,
endpoint="http://host-a:8080",
attributes={"source": "c"},
)
scan_result_a = ScanResult(
resources=[resource_a, resource_c],
warnings=[], errors=[],
scan_timestamp="", profile_hash="",
)
scan_result_b = ScanResult(
resources=[resource_b],
warnings=[], errors=[],
scan_timestamp="", profile_hash="",
)
merger = ResourceMerger()
merged = merger.merge([scan_result_a, scan_result_b])
# Total resources = 3 (no loss)
assert len(merged) == 3
# Provider-specific attributes preserved
unique_ids = {r.unique_id for r in merged}
assert "id-a" in unique_ids
assert "id-b" in unique_ids
assert "id-c" in unique_ids
# ---------------------------------------------------------------------------
# Property 19: Provider block generation
# ---------------------------------------------------------------------------
class TestProviderBlockGeneration:
"""Property 19: Provider block generation.
For any set of providers used, a provider block is generated for each.
**Validates: Requirements 5.4**
"""
@given(
providers=st.lists(
provider_type_strategy,
min_size=1,
max_size=6,
).map(lambda ps: list(set(ps))), # Deduplicate
)
@settings(max_examples=100)
def test_one_provider_block_per_distinct_provider(self, providers):
"""Generated output contains exactly one provider block per distinct provider."""
assume(len(providers) >= 1)
# Create profiles for each provider
profiles = [
ScanProfile(
provider=p,
credentials={"token": "test-token"},
)
for p in providers
]
provider_types = set(providers)
generator = ProviderBlockGenerator()
result = generator.generate(profiles=profiles, provider_types=provider_types)
# Result should be a GeneratedFile
assert isinstance(result, GeneratedFile)
assert result.filename == "providers.tf"
content = result.content
# Count provider blocks: each provider type should have exactly one
# provider "name" { block
for provider_type in provider_types:
# Get the terraform provider name for this type
from iac_reverse.generator.provider_block import _PROVIDER_METADATA
tf_name = _PROVIDER_METADATA[provider_type][0]
provider_block_marker = f'provider "{tf_name}"'
count = content.count(provider_block_marker)
assert count == 1, (
f"Expected exactly 1 provider block for '{tf_name}', "
f"found {count} in:\n{content}"
)
@given(
providers=st.lists(
provider_type_strategy,
min_size=2,
max_size=6,
).map(lambda ps: list(set(ps))),
)
@settings(max_examples=100)
def test_required_providers_block_lists_all(self, providers):
"""The terraform required_providers block lists all providers used."""
assume(len(providers) >= 2)
profiles = [
ScanProfile(
provider=p,
credentials={"token": "test-token"},
)
for p in providers
]
provider_types = set(providers)
generator = ProviderBlockGenerator()
result = generator.generate(profiles=profiles, provider_types=provider_types)
content = result.content
# The required_providers block must exist
assert "required_providers" in content
# Each provider must appear in the required_providers block
from iac_reverse.generator.provider_block import _PROVIDER_METADATA
for provider_type in provider_types:
tf_name, source, _ = _PROVIDER_METADATA[provider_type]
assert tf_name in content, (
f"Expected provider name '{tf_name}' in required_providers block"
)
assert source in content, (
f"Expected source '{source}' in required_providers block"
)
@given(provider=provider_type_strategy)
@settings(max_examples=100)
def test_single_provider_generates_one_block(self, provider):
"""A single provider generates exactly one provider block."""
profiles = [
ScanProfile(
provider=provider,
credentials={"token": "test-token"},
)
]
generator = ProviderBlockGenerator()
result = generator.generate(
profiles=profiles,
provider_types={provider},
)
content = result.content
from iac_reverse.generator.provider_block import _PROVIDER_METADATA
tf_name = _PROVIDER_METADATA[provider][0]
# Exactly one provider block
provider_block_marker = f'provider "{tf_name}"'
assert content.count(provider_block_marker) == 1
# terraform block with required_providers
assert "terraform {" in content
assert "required_providers" in content
# ---------------------------------------------------------------------------
# Property 20: Scan profile validation completeness (multi-provider scenarios)
# ---------------------------------------------------------------------------
class TestScanProfileValidationMultiProvider:
"""Property 20: Scan profile validation completeness (multi-provider scenarios).
Additional multi-provider scenarios beyond what's in test_scan_profile_validation_prop.py.
**Validates: Requirements 6.1, 6.6, 6.7**
"""
@given(
provider=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_valid_multi_provider_profiles_all_pass_validation(
self, provider, credentials
):
"""Each valid profile in a multi-provider set passes validation independently."""
profile = ScanProfile(
provider=provider,
credentials=credentials,
resource_type_filters=None,
)
errors = profile.validate()
assert errors == [], f"Expected no errors for valid profile, got: {errors}"
@given(
provider_a=provider_type_strategy,
provider_b=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_mixed_valid_invalid_profiles_detected_independently(
self, provider_a, provider_b, credentials
):
"""Invalid profiles are detected independently in a multi-provider set."""
# Valid profile
valid_profile = ScanProfile(
provider=provider_a,
credentials=credentials,
resource_type_filters=None,
)
# Invalid profile (empty credentials)
invalid_profile = ScanProfile(
provider=provider_b,
credentials={},
resource_type_filters=None,
)
valid_errors = valid_profile.validate()
invalid_errors = invalid_profile.validate()
assert valid_errors == []
assert len(invalid_errors) >= 1
assert any("credentials" in e for e in invalid_errors)
@given(
provider=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_cross_provider_resource_types_detected_as_unsupported(
self, provider, credentials
):
"""Resource types from a different provider are flagged as unsupported."""
# Pick a resource type from a different provider
other_providers = [p for p in ProviderType if p != provider]
assume(len(other_providers) > 0)
other_provider = other_providers[0]
other_types = PROVIDER_SUPPORTED_RESOURCE_TYPES[other_provider]
profile = ScanProfile(
provider=provider,
credentials=credentials,
resource_type_filters=other_types[:2],
)
errors = profile.validate()
# Should detect unsupported types (unless they happen to overlap)
supported = set(PROVIDER_SUPPORTED_RESOURCE_TYPES[provider])
unsupported = [t for t in other_types[:2] if t not in supported]
if unsupported:
assert any("unsupported" in e.lower() for e in errors), (
f"Expected unsupported error for cross-provider types, got: {errors}"
)
# ---------------------------------------------------------------------------
# Property 21: Filtering correctness
# ---------------------------------------------------------------------------
class TestFilteringCorrectness:
"""Property 21: Filtering correctness.
When resource type filters are specified, only those types appear in the
scan result.
**Validates: Requirements 6.2, 6.4**
"""
@given(
provider=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_filtered_scan_returns_only_filtered_types(self, provider, credentials):
"""When filters are specified, only filtered resource types appear in results."""
supported = PROVIDER_SUPPORTED_RESOURCE_TYPES[provider]
assume(len(supported) >= 2)
# Pick a subset of supported types as filter
filter_types = supported[:2]
plugin = FilteringPlugin(provider=provider)
profile = ScanProfile(
provider=provider,
credentials=credentials,
resource_type_filters=filter_types,
)
scanner = Scanner(profile=profile, plugin=plugin)
result = scanner.scan()
# All discovered resources must have types in the filter list
for resource in result.resources:
assert resource.resource_type in filter_types, (
f"Resource type '{resource.resource_type}' not in filter list "
f"{filter_types}"
)
@given(
provider=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_no_filter_returns_all_supported_types(self, provider, credentials):
"""When no filters are specified, all supported types are discovered."""
supported = PROVIDER_SUPPORTED_RESOURCE_TYPES[provider]
plugin = FilteringPlugin(provider=provider)
profile = ScanProfile(
provider=provider,
credentials=credentials,
resource_type_filters=None,
)
scanner = Scanner(profile=profile, plugin=plugin)
result = scanner.scan()
# All supported types should be discovered
discovered_types = {r.resource_type for r in result.resources}
for rt in supported:
assert rt in discovered_types, (
f"Expected type '{rt}' to be discovered when no filter, "
f"got: {discovered_types}"
)
@given(
provider=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_single_type_filter_returns_only_that_type(self, provider, credentials):
"""A single-type filter returns only resources of that type."""
supported = PROVIDER_SUPPORTED_RESOURCE_TYPES[provider]
assume(len(supported) >= 1)
single_filter = [supported[0]]
plugin = FilteringPlugin(provider=provider)
profile = ScanProfile(
provider=provider,
credentials=credentials,
resource_type_filters=single_filter,
)
scanner = Scanner(profile=profile, plugin=plugin)
result = scanner.scan()
# Only the single filtered type should appear
for resource in result.resources:
assert resource.resource_type == single_filter[0], (
f"Expected only '{single_filter[0]}', got '{resource.resource_type}'"
)
@given(
provider=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_empty_filter_returns_no_resources(self, provider, credentials):
"""An empty filter list results in no resources discovered."""
plugin = FilteringPlugin(provider=provider)
profile = ScanProfile(
provider=provider,
credentials=credentials,
resource_type_filters=[],
)
scanner = Scanner(profile=profile, plugin=plugin)
result = scanner.scan()
# Empty filter means nothing to discover
assert len(result.resources) == 0, (
f"Expected 0 resources with empty filter, got {len(result.resources)}"
)
@given(
provider=provider_type_strategy,
credentials=non_empty_credentials_strategy,
)
@settings(max_examples=100)
def test_filter_subset_excludes_non_filtered_types(self, provider, credentials):
"""Types not in the filter list do not appear in results."""
supported = PROVIDER_SUPPORTED_RESOURCE_TYPES[provider]
assume(len(supported) >= 3)
# Filter to first 2 types only
filter_types = supported[:2]
excluded_types = supported[2:]
plugin = FilteringPlugin(provider=provider)
profile = ScanProfile(
provider=provider,
credentials=credentials,
resource_type_filters=filter_types,
)
scanner = Scanner(profile=profile, plugin=plugin)
result = scanner.scan()
# None of the excluded types should appear
discovered_types = {r.resource_type for r in result.resources}
for excluded in excluded_types:
assert excluded not in discovered_types, (
f"Excluded type '{excluded}' should not appear in filtered results"
)