568 lines
20 KiB
Python
568 lines
20 KiB
Python
"""Property-based tests for the State Builder.
|
|
|
|
**Validates: Requirements 4.1, 4.2, 4.4, 4.5**
|
|
|
|
Properties tested:
|
|
- Property 16: State file structural validity
|
|
- Property 17: State entry completeness and schema correctness
|
|
"""
|
|
|
|
import json
|
|
import re
|
|
import uuid
|
|
|
|
from hypothesis import given, settings, assume
|
|
from hypothesis import strategies as st
|
|
|
|
from iac_reverse.generator.sanitize import sanitize_identifier
|
|
from iac_reverse.models import (
|
|
CodeGenerationResult,
|
|
CpuArchitecture,
|
|
DependencyGraph,
|
|
DiscoveredResource,
|
|
GeneratedFile,
|
|
PlatformCategory,
|
|
PROVIDER_SUPPORTED_RESOURCE_TYPES,
|
|
ProviderType,
|
|
ResourceRelationship,
|
|
)
|
|
from iac_reverse.state_builder import StateBuilder
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hypothesis Strategies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
provider_type_strategy = st.sampled_from(list(ProviderType))
|
|
platform_category_strategy = st.sampled_from(list(PlatformCategory))
|
|
cpu_architecture_strategy = st.sampled_from(list(CpuArchitecture))
|
|
|
|
# All supported resource types across all providers (flat list)
|
|
ALL_SUPPORTED_RESOURCE_TYPES = []
|
|
for _types in PROVIDER_SUPPORTED_RESOURCE_TYPES.values():
|
|
ALL_SUPPORTED_RESOURCE_TYPES.extend(_types)
|
|
|
|
resource_type_strategy = st.sampled_from(ALL_SUPPORTED_RESOURCE_TYPES)
|
|
|
|
# Strategy for resource names (valid identifiers with some variety)
|
|
resource_name_strategy = st.text(
|
|
min_size=1,
|
|
max_size=20,
|
|
alphabet=st.characters(whitelist_categories=("L", "N"), whitelist_characters="_-"),
|
|
).filter(lambda s: s.strip() != "")
|
|
|
|
# Strategy for unique IDs (non-empty strings)
|
|
unique_id_strategy = st.text(
|
|
min_size=1,
|
|
max_size=40,
|
|
alphabet=st.characters(whitelist_categories=("L", "N"), whitelist_characters="_-/:."),
|
|
).filter(lambda s: s.strip() != "")
|
|
|
|
# Strategy for simple attribute values
|
|
simple_attr_value_strategy = st.one_of(
|
|
st.text(
|
|
min_size=1,
|
|
max_size=30,
|
|
alphabet=st.characters(
|
|
whitelist_categories=("L", "N"), whitelist_characters="_-./: "
|
|
),
|
|
).filter(lambda s: s.strip() != ""),
|
|
st.integers(min_value=0, max_value=10000),
|
|
st.booleans(),
|
|
)
|
|
|
|
# Strategy for attribute dictionaries (non-empty)
|
|
attributes_strategy = st.dictionaries(
|
|
keys=st.text(
|
|
min_size=1,
|
|
max_size=15,
|
|
alphabet=st.characters(whitelist_categories=("L",), whitelist_characters="_"),
|
|
).filter(lambda s: s.strip() != "" and s[0].isalpha()),
|
|
values=simple_attr_value_strategy,
|
|
min_size=1,
|
|
max_size=5,
|
|
)
|
|
|
|
# Strategy for provider version strings (semver-like)
|
|
provider_version_strategy = st.from_regex(r"[1-9][0-9]{0,1}\.[0-9]{1,2}\.[0-9]{1,2}", fullmatch=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def make_resource(
|
|
unique_id: str,
|
|
resource_type: str = "kubernetes_deployment",
|
|
name: str = "my_resource",
|
|
provider: ProviderType = ProviderType.KUBERNETES,
|
|
platform_category: PlatformCategory = PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
architecture: CpuArchitecture = CpuArchitecture.AMD64,
|
|
attributes: dict | None = None,
|
|
raw_references: list[str] | None = None,
|
|
) -> DiscoveredResource:
|
|
"""Helper to create a DiscoveredResource with sensible defaults."""
|
|
return DiscoveredResource(
|
|
resource_type=resource_type,
|
|
unique_id=unique_id,
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=platform_category,
|
|
architecture=architecture,
|
|
endpoint="https://api.internal.lab:6443",
|
|
attributes=attributes or {"key": "value"},
|
|
raw_references=raw_references or [],
|
|
)
|
|
|
|
|
|
def make_dependency_graph(
|
|
resources: list[DiscoveredResource],
|
|
relationships: list[ResourceRelationship] | None = None,
|
|
) -> DependencyGraph:
|
|
"""Helper to create a DependencyGraph from resources."""
|
|
return DependencyGraph(
|
|
resources=resources,
|
|
relationships=relationships or [],
|
|
topological_order=[r.unique_id for r in resources],
|
|
cycles=[],
|
|
unresolved_references=[],
|
|
)
|
|
|
|
|
|
def make_code_generation_result() -> CodeGenerationResult:
|
|
"""Helper to create a minimal CodeGenerationResult."""
|
|
return CodeGenerationResult(
|
|
resource_files=[
|
|
GeneratedFile(filename="main.tf", content="", resource_count=0)
|
|
],
|
|
variables_file=GeneratedFile(
|
|
filename="variables.tf", content="", resource_count=0
|
|
),
|
|
provider_file=GeneratedFile(
|
|
filename="provider.tf", content="", resource_count=0
|
|
),
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Composite strategies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@st.composite
|
|
def mappable_resource_strategy(draw):
|
|
"""Generate a single DiscoveredResource that is mappable to state.
|
|
|
|
A mappable resource has a non-empty unique_id and a recognized resource type.
|
|
"""
|
|
resource_type = draw(resource_type_strategy)
|
|
name = draw(resource_name_strategy)
|
|
unique_id = draw(unique_id_strategy)
|
|
provider = draw(provider_type_strategy)
|
|
platform_category = draw(platform_category_strategy)
|
|
architecture = draw(cpu_architecture_strategy)
|
|
attributes = draw(attributes_strategy)
|
|
|
|
return make_resource(
|
|
unique_id=unique_id,
|
|
resource_type=resource_type,
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=platform_category,
|
|
architecture=architecture,
|
|
attributes=attributes,
|
|
)
|
|
|
|
|
|
@st.composite
|
|
def multiple_mappable_resources_strategy(draw):
|
|
"""Generate a list of mappable resources with unique IDs."""
|
|
num_resources = draw(st.integers(min_value=1, max_value=5))
|
|
resources = []
|
|
seen_ids = set()
|
|
|
|
for _ in range(num_resources):
|
|
resource = draw(mappable_resource_strategy())
|
|
# Ensure unique IDs are distinct
|
|
if resource.unique_id in seen_ids:
|
|
continue
|
|
seen_ids.add(resource.unique_id)
|
|
resources.append(resource)
|
|
|
|
assume(len(resources) >= 1)
|
|
return resources
|
|
|
|
|
|
@st.composite
|
|
def resource_with_sensitive_attrs_strategy(draw):
|
|
"""Generate a resource with attributes that include sensitive-looking keys."""
|
|
resource_type = draw(resource_type_strategy)
|
|
name = draw(resource_name_strategy)
|
|
unique_id = draw(unique_id_strategy)
|
|
|
|
# Include at least one sensitive key
|
|
sensitive_key = draw(st.sampled_from([
|
|
"password", "api_secret", "auth_token", "private_key", "tls_certificate",
|
|
]))
|
|
sensitive_value = draw(st.text(min_size=1, max_size=20, alphabet="abcdefghijklmnop"))
|
|
|
|
# Also include non-sensitive attributes
|
|
normal_attrs = draw(attributes_strategy)
|
|
normal_attrs[sensitive_key] = sensitive_value
|
|
|
|
return make_resource(
|
|
unique_id=unique_id,
|
|
resource_type=resource_type,
|
|
name=name,
|
|
attributes=normal_attrs,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 16: State file structural validity
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStateFileStructuralValidity:
|
|
"""Property 16: State file structural validity.
|
|
|
|
**Validates: Requirements 4.1**
|
|
|
|
For any set of resources, the generated state file has version=4,
|
|
valid UUID lineage, serial=1, and valid JSON structure.
|
|
"""
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_file_version_is_4(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""The generated state file always has version=4."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
assert state_file.version == 4, (
|
|
f"Expected version=4, got version={state_file.version}"
|
|
)
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_file_has_valid_uuid_lineage(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""The generated state file has a valid UUID lineage."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
# Lineage should be a valid UUID
|
|
try:
|
|
parsed_uuid = uuid.UUID(state_file.lineage)
|
|
except ValueError:
|
|
raise AssertionError(
|
|
f"Lineage '{state_file.lineage}' is not a valid UUID"
|
|
)
|
|
|
|
assert parsed_uuid.version == 4, (
|
|
f"Expected UUID version 4, got version {parsed_uuid.version}"
|
|
)
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_file_serial_is_1(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""The generated state file always has serial=1."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
assert state_file.serial == 1, (
|
|
f"Expected serial=1, got serial={state_file.serial}"
|
|
)
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_file_produces_valid_json(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""The state file serializes to valid JSON via to_json()."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
json_str = state_file.to_json()
|
|
|
|
# Must parse as valid JSON
|
|
try:
|
|
parsed = json.loads(json_str)
|
|
except json.JSONDecodeError as e:
|
|
raise AssertionError(
|
|
f"State file to_json() produced invalid JSON: {e}"
|
|
)
|
|
|
|
assert isinstance(parsed, dict), "State JSON root must be a dict"
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_json_has_required_top_level_fields(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""The serialized state JSON has version, terraform_version, serial, lineage, resources."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
parsed = json.loads(state_file.to_json())
|
|
|
|
required_fields = {"version", "terraform_version", "serial", "lineage", "resources"}
|
|
missing = required_fields - set(parsed.keys())
|
|
assert not missing, (
|
|
f"State JSON missing required top-level fields: {missing}"
|
|
)
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_json_resource_entries_have_required_fields(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""Each resource entry in the JSON has mode, type, name, provider, and instances."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
parsed = json.loads(state_file.to_json())
|
|
|
|
required_resource_fields = {"mode", "type", "name", "provider", "instances"}
|
|
|
|
for i, entry in enumerate(parsed["resources"]):
|
|
missing = required_resource_fields - set(entry.keys())
|
|
assert not missing, (
|
|
f"Resource entry {i} missing required fields: {missing}. "
|
|
f"Entry keys: {list(entry.keys())}"
|
|
)
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_json_instances_have_schema_and_attributes(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""Each instance in the state JSON has schema_version, attributes, sensitive_attributes, dependencies."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
parsed = json.loads(state_file.to_json())
|
|
|
|
required_instance_fields = {
|
|
"schema_version", "attributes", "sensitive_attributes", "dependencies"
|
|
}
|
|
|
|
for i, entry in enumerate(parsed["resources"]):
|
|
for j, instance in enumerate(entry["instances"]):
|
|
missing = required_instance_fields - set(instance.keys())
|
|
assert not missing, (
|
|
f"Resource {i}, instance {j} missing fields: {missing}. "
|
|
f"Instance keys: {list(instance.keys())}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 17: State entry completeness and schema correctness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStateEntryCompletenessAndSchemaCorrectness:
|
|
"""Property 17: State entry completeness and schema correctness.
|
|
|
|
**Validates: Requirements 4.4, 4.5**
|
|
|
|
For any resource, the state entry has non-empty resource_type,
|
|
resource_name, provider_id, and attributes matching the discovery data.
|
|
"""
|
|
|
|
@given(resource=mappable_resource_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_entry_has_non_empty_resource_type(
|
|
self, resource: DiscoveredResource
|
|
):
|
|
"""Each state entry has a non-empty resource_type."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph([resource])
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
assert len(state_file.resources) == 1
|
|
entry = state_file.resources[0]
|
|
assert entry.resource_type != "", (
|
|
"State entry resource_type must not be empty"
|
|
)
|
|
assert entry.resource_type == resource.resource_type, (
|
|
f"Expected resource_type '{resource.resource_type}', "
|
|
f"got '{entry.resource_type}'"
|
|
)
|
|
|
|
@given(resource=mappable_resource_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_entry_has_non_empty_resource_name(
|
|
self, resource: DiscoveredResource
|
|
):
|
|
"""Each state entry has a non-empty resource_name (sanitized)."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph([resource])
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
assert len(state_file.resources) == 1
|
|
entry = state_file.resources[0]
|
|
assert entry.resource_name != "", (
|
|
"State entry resource_name must not be empty"
|
|
)
|
|
# The name should be a sanitized version of the original
|
|
expected_name = sanitize_identifier(resource.name)
|
|
assert entry.resource_name == expected_name, (
|
|
f"Expected resource_name '{expected_name}', "
|
|
f"got '{entry.resource_name}'"
|
|
)
|
|
|
|
@given(resource=mappable_resource_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_entry_has_non_empty_provider_id(
|
|
self, resource: DiscoveredResource
|
|
):
|
|
"""Each state entry has a non-empty provider_id matching the resource's unique_id."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph([resource])
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
assert len(state_file.resources) == 1
|
|
entry = state_file.resources[0]
|
|
assert entry.provider_id != "", (
|
|
"State entry provider_id must not be empty"
|
|
)
|
|
assert entry.provider_id == resource.unique_id, (
|
|
f"Expected provider_id '{resource.unique_id}', "
|
|
f"got '{entry.provider_id}'"
|
|
)
|
|
|
|
@given(resource=mappable_resource_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_entry_attributes_match_discovery_data(
|
|
self, resource: DiscoveredResource
|
|
):
|
|
"""State entry attributes contain all attributes from the discovered resource."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph([resource])
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
assert len(state_file.resources) == 1
|
|
entry = state_file.resources[0]
|
|
|
|
# All discovery attributes should be present in the state entry
|
|
for key, value in resource.attributes.items():
|
|
assert key in entry.attributes, (
|
|
f"Discovery attribute '{key}' missing from state entry attributes. "
|
|
f"State attrs: {list(entry.attributes.keys())}"
|
|
)
|
|
assert entry.attributes[key] == value, (
|
|
f"Attribute '{key}' mismatch: discovery={value}, "
|
|
f"state={entry.attributes[key]}"
|
|
)
|
|
|
|
@given(
|
|
resource=mappable_resource_strategy(),
|
|
provider_version=provider_version_strategy,
|
|
)
|
|
@settings(max_examples=100)
|
|
def test_state_entry_schema_version_matches_provider_version(
|
|
self, resource: DiscoveredResource, provider_version: str
|
|
):
|
|
"""State entry schema_version matches the major version from provider_version."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph([resource])
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, provider_version)
|
|
|
|
assert len(state_file.resources) == 1
|
|
entry = state_file.resources[0]
|
|
|
|
# Schema version should be the major version number
|
|
expected_schema_version = int(provider_version.split(".")[0])
|
|
assert entry.schema_version == expected_schema_version, (
|
|
f"Expected schema_version={expected_schema_version} "
|
|
f"(from provider_version='{provider_version}'), "
|
|
f"got schema_version={entry.schema_version}"
|
|
)
|
|
|
|
@given(resource=resource_with_sensitive_attrs_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_entry_marks_sensitive_attributes(
|
|
self, resource: DiscoveredResource
|
|
):
|
|
"""State entry identifies and marks sensitive attributes correctly."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph([resource])
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
|
|
assert len(state_file.resources) == 1
|
|
entry = state_file.resources[0]
|
|
|
|
# Sensitive attributes list should not be empty when resource has
|
|
# attributes with sensitive patterns (password, secret, token, key, certificate)
|
|
sensitive_patterns = ["password", "secret", "token", "key", "certificate"]
|
|
has_sensitive = any(
|
|
any(pattern in attr_key.lower() for pattern in sensitive_patterns)
|
|
for attr_key in resource.attributes.keys()
|
|
)
|
|
|
|
if has_sensitive:
|
|
assert len(entry.sensitive_attributes) > 0, (
|
|
f"Resource has sensitive-looking attributes "
|
|
f"{list(resource.attributes.keys())} but sensitive_attributes "
|
|
f"is empty"
|
|
)
|
|
|
|
@given(resources=multiple_mappable_resources_strategy())
|
|
@settings(max_examples=100)
|
|
def test_state_json_id_field_matches_provider_id(
|
|
self, resources: list[DiscoveredResource]
|
|
):
|
|
"""In the serialized JSON, each instance's attributes.id matches the provider_id."""
|
|
builder = StateBuilder()
|
|
graph = make_dependency_graph(resources)
|
|
code_result = make_code_generation_result()
|
|
|
|
state_file = builder.build(code_result, graph, "1.0.0")
|
|
parsed = json.loads(state_file.to_json())
|
|
|
|
for i, entry in enumerate(parsed["resources"]):
|
|
for instance in entry["instances"]:
|
|
assert "id" in instance["attributes"], (
|
|
f"Resource entry {i} instance missing 'id' in attributes"
|
|
)
|
|
# The id should be non-empty
|
|
assert instance["attributes"]["id"] != "", (
|
|
f"Resource entry {i} has empty 'id' attribute"
|
|
)
|