791 lines
31 KiB
Python
791 lines
31 KiB
Python
"""Property-based tests for Incremental Scan Engine.
|
|
|
|
**Validates: Requirements 8.1, 8.2, 8.3, 8.5, 8.6**
|
|
|
|
Properties tested:
|
|
- Property 23: Change classification correctness
|
|
- Property 24: Incremental update scope
|
|
- Property 25: Removed resource exclusion
|
|
- Property 26: Snapshot retention
|
|
"""
|
|
|
|
import json
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from hypothesis import given, settings, assume
|
|
from hypothesis import strategies as st
|
|
|
|
from iac_reverse.incremental import ChangeDetector, IncrementalUpdater, SnapshotStore
|
|
from iac_reverse.models import (
|
|
ChangeSummary,
|
|
ChangeType,
|
|
CpuArchitecture,
|
|
DiscoveredResource,
|
|
PlatformCategory,
|
|
ProviderType,
|
|
ResourceChange,
|
|
ScanResult,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Hypothesis Strategies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
provider_strategy = st.sampled_from(list(ProviderType))
|
|
platform_strategy = st.sampled_from(list(PlatformCategory))
|
|
architecture_strategy = st.sampled_from(list(CpuArchitecture))
|
|
|
|
# Simple attribute values for resources
|
|
attribute_value_strategy = st.one_of(
|
|
st.text(min_size=1, max_size=20, alphabet="abcdefghijklmnopqrstuvwxyz0123456789"),
|
|
st.integers(min_value=0, max_value=1000),
|
|
st.booleans(),
|
|
)
|
|
|
|
attributes_strategy = st.dictionaries(
|
|
keys=st.text(min_size=1, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz_"),
|
|
values=attribute_value_strategy,
|
|
min_size=1,
|
|
max_size=5,
|
|
)
|
|
|
|
# Resource name strategy (valid identifiers)
|
|
resource_name_strategy = st.text(
|
|
min_size=1,
|
|
max_size=15,
|
|
alphabet="abcdefghijklmnopqrstuvwxyz_",
|
|
).filter(lambda s: s[0].isalpha())
|
|
|
|
# Resource type strategy
|
|
resource_type_strategy = st.sampled_from([
|
|
"docker_service",
|
|
"kubernetes_deployment",
|
|
"synology_shared_folder",
|
|
"harvester_virtualmachine",
|
|
"bare_metal_hardware",
|
|
"windows_service",
|
|
])
|
|
|
|
|
|
@st.composite
|
|
def discovered_resource_strategy(draw, uid=None):
|
|
"""Generate a DiscoveredResource with valid fields."""
|
|
resource_type = draw(resource_type_strategy)
|
|
unique_id = uid or draw(st.text(
|
|
min_size=5, max_size=30,
|
|
alphabet="abcdefghijklmnopqrstuvwxyz0123456789_-/",
|
|
).filter(lambda s: s[0].isalpha()))
|
|
name = draw(resource_name_strategy)
|
|
provider = draw(provider_strategy)
|
|
platform = draw(platform_strategy)
|
|
arch = draw(architecture_strategy)
|
|
endpoint = draw(st.text(min_size=3, max_size=20, alphabet="abcdefghijklmnopqrstuvwxyz."))
|
|
attributes = draw(attributes_strategy)
|
|
|
|
return DiscoveredResource(
|
|
resource_type=resource_type,
|
|
unique_id=unique_id,
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=platform,
|
|
architecture=arch,
|
|
endpoint=endpoint,
|
|
attributes=attributes,
|
|
raw_references=[],
|
|
)
|
|
|
|
|
|
@st.composite
|
|
def scan_result_strategy(draw, min_resources=0, max_resources=8):
|
|
"""Generate a ScanResult with unique resource IDs."""
|
|
num_resources = draw(st.integers(min_value=min_resources, max_value=max_resources))
|
|
resources = []
|
|
seen_ids = set()
|
|
|
|
for i in range(num_resources):
|
|
uid = f"resource_{i}_{draw(st.text(min_size=3, max_size=8, alphabet='abcdefghijklmnopqrstuvwxyz'))}"
|
|
if uid in seen_ids:
|
|
uid = f"resource_{i}_fallback"
|
|
seen_ids.add(uid)
|
|
|
|
resource = draw(discovered_resource_strategy(uid=uid))
|
|
resources.append(resource)
|
|
|
|
return ScanResult(
|
|
resources=resources,
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp="2024-01-15T10:30:00Z",
|
|
profile_hash="test_profile_hash",
|
|
is_partial=False,
|
|
)
|
|
|
|
|
|
@st.composite
|
|
def scan_result_pair_strategy(draw):
|
|
"""Generate a pair of scan results with some overlap for meaningful diffs.
|
|
|
|
Creates a previous and current scan where:
|
|
- Some resources exist in both (potentially modified)
|
|
- Some resources only in previous (removed)
|
|
- Some resources only in current (added)
|
|
"""
|
|
# Shared resources (exist in both, may be modified)
|
|
num_shared = draw(st.integers(min_value=0, max_value=4))
|
|
# Resources only in previous (will be removed)
|
|
num_removed = draw(st.integers(min_value=0, max_value=3))
|
|
# Resources only in current (will be added)
|
|
num_added = draw(st.integers(min_value=0, max_value=3))
|
|
|
|
assume(num_shared + num_removed + num_added >= 1)
|
|
|
|
previous_resources = []
|
|
current_resources = []
|
|
|
|
# Generate shared resources
|
|
for i in range(num_shared):
|
|
uid = f"shared_{i}"
|
|
resource_type = draw(resource_type_strategy)
|
|
name = draw(resource_name_strategy)
|
|
provider = draw(provider_strategy)
|
|
platform = draw(platform_strategy)
|
|
arch = draw(architecture_strategy)
|
|
endpoint = draw(st.text(min_size=3, max_size=10, alphabet="abcdefghijklmnopqrstuvwxyz."))
|
|
prev_attrs = draw(attributes_strategy)
|
|
|
|
prev_resource = DiscoveredResource(
|
|
resource_type=resource_type,
|
|
unique_id=uid,
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=platform,
|
|
architecture=arch,
|
|
endpoint=endpoint,
|
|
attributes=prev_attrs,
|
|
raw_references=[],
|
|
)
|
|
previous_resources.append(prev_resource)
|
|
|
|
# Possibly modify attributes for current version
|
|
modify = draw(st.booleans())
|
|
if modify:
|
|
curr_attrs = draw(attributes_strategy)
|
|
else:
|
|
curr_attrs = dict(prev_attrs)
|
|
|
|
curr_resource = DiscoveredResource(
|
|
resource_type=resource_type,
|
|
unique_id=uid,
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=platform,
|
|
architecture=arch,
|
|
endpoint=endpoint,
|
|
attributes=curr_attrs,
|
|
raw_references=[],
|
|
)
|
|
current_resources.append(curr_resource)
|
|
|
|
# Generate removed resources (only in previous)
|
|
for i in range(num_removed):
|
|
uid = f"removed_{i}"
|
|
resource = draw(discovered_resource_strategy(uid=uid))
|
|
previous_resources.append(resource)
|
|
|
|
# Generate added resources (only in current)
|
|
for i in range(num_added):
|
|
uid = f"added_{i}"
|
|
resource = draw(discovered_resource_strategy(uid=uid))
|
|
current_resources.append(resource)
|
|
|
|
previous = ScanResult(
|
|
resources=previous_resources,
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp="2024-01-14T09:00:00Z",
|
|
profile_hash="test_profile",
|
|
is_partial=False,
|
|
)
|
|
current = ScanResult(
|
|
resources=current_resources,
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp="2024-01-15T10:30:00Z",
|
|
profile_hash="test_profile",
|
|
is_partial=False,
|
|
)
|
|
|
|
return previous, current
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 23: Change classification correctness
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestChangeClassificationCorrectness:
|
|
"""Property 23: Change classification correctness.
|
|
|
|
**Validates: Requirements 8.1, 8.5**
|
|
|
|
For any pair of scan results (previous and current), every resource
|
|
SHALL be classified exactly once as: added, removed, or modified.
|
|
The summary counts SHALL equal the actual number of resources in each
|
|
category.
|
|
"""
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100)
|
|
def test_every_resource_classified_exactly_once(self, data):
|
|
"""Every resource is classified as exactly one of: added, removed, or modified."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
prev_ids = {r.unique_id for r in previous.resources}
|
|
curr_ids = {r.unique_id for r in current.resources}
|
|
all_ids = prev_ids | curr_ids
|
|
|
|
# Each change should reference a resource from either scan
|
|
change_ids = [c.resource_id for c in summary.changes]
|
|
|
|
# No duplicates in changes
|
|
assert len(change_ids) == len(set(change_ids)), (
|
|
f"Duplicate resource IDs in changes: "
|
|
f"{[rid for rid in change_ids if change_ids.count(rid) > 1]}"
|
|
)
|
|
|
|
# Every changed resource must be from the union of both scans
|
|
for change in summary.changes:
|
|
assert change.resource_id in all_ids, (
|
|
f"Change references unknown resource: {change.resource_id}"
|
|
)
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100)
|
|
def test_added_resources_in_current_not_previous(self, data):
|
|
"""Resources classified as ADDED are in current but not in previous."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
prev_ids = {r.unique_id for r in previous.resources}
|
|
curr_ids = {r.unique_id for r in current.resources}
|
|
|
|
added_changes = [c for c in summary.changes if c.change_type == ChangeType.ADDED]
|
|
for change in added_changes:
|
|
assert change.resource_id in curr_ids, (
|
|
f"ADDED resource {change.resource_id} not in current scan"
|
|
)
|
|
assert change.resource_id not in prev_ids, (
|
|
f"ADDED resource {change.resource_id} exists in previous scan"
|
|
)
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100)
|
|
def test_removed_resources_in_previous_not_current(self, data):
|
|
"""Resources classified as REMOVED are in previous but not in current."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
prev_ids = {r.unique_id for r in previous.resources}
|
|
curr_ids = {r.unique_id for r in current.resources}
|
|
|
|
removed_changes = [c for c in summary.changes if c.change_type == ChangeType.REMOVED]
|
|
for change in removed_changes:
|
|
assert change.resource_id in prev_ids, (
|
|
f"REMOVED resource {change.resource_id} not in previous scan"
|
|
)
|
|
assert change.resource_id not in curr_ids, (
|
|
f"REMOVED resource {change.resource_id} exists in current scan"
|
|
)
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100)
|
|
def test_modified_resources_in_both_with_differing_attributes(self, data):
|
|
"""Resources classified as MODIFIED exist in both scans with differing attributes."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
prev_map = {r.unique_id: r for r in previous.resources}
|
|
curr_map = {r.unique_id: r for r in current.resources}
|
|
|
|
modified_changes = [c for c in summary.changes if c.change_type == ChangeType.MODIFIED]
|
|
for change in modified_changes:
|
|
assert change.resource_id in prev_map, (
|
|
f"MODIFIED resource {change.resource_id} not in previous scan"
|
|
)
|
|
assert change.resource_id in curr_map, (
|
|
f"MODIFIED resource {change.resource_id} not in current scan"
|
|
)
|
|
# Attributes must actually differ
|
|
assert prev_map[change.resource_id].attributes != curr_map[change.resource_id].attributes, (
|
|
f"MODIFIED resource {change.resource_id} has identical attributes"
|
|
)
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100)
|
|
def test_summary_counts_match_actual_changes(self, data):
|
|
"""Summary counts equal the actual number of resources in each category."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
actual_added = sum(1 for c in summary.changes if c.change_type == ChangeType.ADDED)
|
|
actual_removed = sum(1 for c in summary.changes if c.change_type == ChangeType.REMOVED)
|
|
actual_modified = sum(1 for c in summary.changes if c.change_type == ChangeType.MODIFIED)
|
|
|
|
assert summary.added_count == actual_added, (
|
|
f"added_count={summary.added_count} != actual={actual_added}"
|
|
)
|
|
assert summary.removed_count == actual_removed, (
|
|
f"removed_count={summary.removed_count} != actual={actual_removed}"
|
|
)
|
|
assert summary.modified_count == actual_modified, (
|
|
f"modified_count={summary.modified_count} != actual={actual_modified}"
|
|
)
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100)
|
|
def test_change_types_are_valid(self, data):
|
|
"""Every change has a valid ChangeType value."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
valid_types = {ChangeType.ADDED, ChangeType.REMOVED, ChangeType.MODIFIED}
|
|
for change in summary.changes:
|
|
assert change.change_type in valid_types, (
|
|
f"Invalid change_type: {change.change_type}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 24: Incremental update scope
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIncrementalUpdateScope:
|
|
"""Property 24: Incremental update scope.
|
|
|
|
**Validates: Requirements 8.2**
|
|
|
|
For any change set applied to existing IaC files, only files containing
|
|
added, modified, or removed resources SHALL be modified. Files containing
|
|
only unchanged resources SHALL remain identical.
|
|
"""
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100, deadline=None)
|
|
def test_only_changed_resource_files_are_modified(self, data):
|
|
"""Only .tf files for resource types with changes are modified."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
# Skip if no changes (nothing to test)
|
|
assume(len(summary.changes) > 0)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
# Create initial .tf files for all resource types in previous scan
|
|
resource_types_in_previous = {r.resource_type for r in previous.resources}
|
|
# Also create a file for an "unchanged" resource type
|
|
unchanged_type = "unchanged_resource_type"
|
|
resource_types_in_previous.add(unchanged_type)
|
|
|
|
for rt in resource_types_in_previous:
|
|
tf_path = Path(tmp_dir) / f"{rt}.tf"
|
|
tf_path.write_text(f'# Placeholder for {rt}\n', encoding="utf-8")
|
|
|
|
# Record original content of the unchanged file
|
|
unchanged_path = Path(tmp_dir) / f"{unchanged_type}.tf"
|
|
original_unchanged_content = unchanged_path.read_text(encoding="utf-8")
|
|
|
|
# Build resource_attributes for added resources
|
|
resource_attributes = {}
|
|
for change in summary.changes:
|
|
if change.change_type == ChangeType.ADDED:
|
|
# Find the resource in current scan
|
|
for r in current.resources:
|
|
if r.unique_id == change.resource_id:
|
|
resource_attributes[change.resource_id] = r.attributes
|
|
break
|
|
|
|
# Apply incremental update
|
|
updater = IncrementalUpdater(
|
|
change_summary=summary,
|
|
output_dir=tmp_dir,
|
|
resource_attributes=resource_attributes,
|
|
)
|
|
updater.apply()
|
|
|
|
# The unchanged file should not be modified
|
|
assert unchanged_path.read_text(encoding="utf-8") == original_unchanged_content, (
|
|
"File for unchanged resource type was modified"
|
|
)
|
|
|
|
# Modified files should only be for resource types with changes
|
|
changed_resource_types = {c.resource_type for c in summary.changes}
|
|
for modified_file in updater.modified_files:
|
|
file_name = Path(modified_file).name
|
|
# Modified files should be .tf files for changed resource types
|
|
# or the state file
|
|
if file_name == "terraform.tfstate":
|
|
continue
|
|
assert file_name.endswith(".tf"), (
|
|
f"Unexpected modified file: {file_name}"
|
|
)
|
|
rt = file_name[:-3] # strip .tf
|
|
assert rt in changed_resource_types, (
|
|
f"File {file_name} was modified but resource type "
|
|
f"'{rt}' has no changes"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 25: Removed resource exclusion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRemovedResourceExclusion:
|
|
"""Property 25: Removed resource exclusion.
|
|
|
|
**Validates: Requirements 8.3**
|
|
|
|
For any resource classified as removed, the updated IaC output SHALL
|
|
not contain a resource block for that resource, AND the updated state
|
|
file SHALL not contain a state entry for that resource.
|
|
"""
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100, deadline=None)
|
|
def test_removed_resources_not_in_tf_files(self, data):
|
|
"""Removed resources do not appear in .tf files after update."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
removed_changes = [c for c in summary.changes if c.change_type == ChangeType.REMOVED]
|
|
assume(len(removed_changes) > 0)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
# Create .tf files with resource blocks for previous resources
|
|
from iac_reverse.generator.sanitize import sanitize_identifier
|
|
|
|
resources_by_type: dict[str, list] = {}
|
|
for r in previous.resources:
|
|
resources_by_type.setdefault(r.resource_type, []).append(r)
|
|
|
|
for rt, resources in resources_by_type.items():
|
|
tf_path = Path(tmp_dir) / f"{rt}.tf"
|
|
lines = []
|
|
for r in resources:
|
|
tf_name = sanitize_identifier(r.name)
|
|
lines.append(f'# Source: {r.unique_id}')
|
|
lines.append(f'resource "{rt}" "{tf_name}" {{')
|
|
for k, v in r.attributes.items():
|
|
lines.append(f' {k} = "{v}"')
|
|
lines.append("}")
|
|
lines.append("")
|
|
tf_path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
# Build resource_attributes for added resources
|
|
resource_attributes = {}
|
|
for change in summary.changes:
|
|
if change.change_type == ChangeType.ADDED:
|
|
for r in current.resources:
|
|
if r.unique_id == change.resource_id:
|
|
resource_attributes[change.resource_id] = r.attributes
|
|
break
|
|
|
|
# Apply incremental update
|
|
updater = IncrementalUpdater(
|
|
change_summary=summary,
|
|
output_dir=tmp_dir,
|
|
resource_attributes=resource_attributes,
|
|
)
|
|
updater.apply()
|
|
|
|
# Verify removed resources are not in any .tf file
|
|
for change in removed_changes:
|
|
tf_path = Path(tmp_dir) / f"{change.resource_type}.tf"
|
|
if tf_path.exists():
|
|
content = tf_path.read_text(encoding="utf-8")
|
|
tf_name = sanitize_identifier(change.resource_name)
|
|
# The resource block should not exist
|
|
block_header = f'resource "{change.resource_type}" "{tf_name}"'
|
|
assert block_header not in content, (
|
|
f"Removed resource {change.resource_id} still has a "
|
|
f"resource block in {tf_path.name}"
|
|
)
|
|
|
|
@given(data=scan_result_pair_strategy())
|
|
@settings(max_examples=100, deadline=None)
|
|
def test_removed_resources_not_in_state_file(self, data):
|
|
"""Removed resources do not appear in the state file after update."""
|
|
previous, current = data
|
|
detector = ChangeDetector()
|
|
summary = detector.compare(current, previous)
|
|
|
|
removed_changes = [c for c in summary.changes if c.change_type == ChangeType.REMOVED]
|
|
assume(len(removed_changes) > 0)
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
from iac_reverse.generator.sanitize import sanitize_identifier
|
|
|
|
# Create initial state file with entries for previous resources
|
|
state = {
|
|
"version": 4,
|
|
"terraform_version": "1.7.0",
|
|
"serial": 1,
|
|
"lineage": "test-lineage",
|
|
"outputs": {},
|
|
"resources": [],
|
|
}
|
|
for r in previous.resources:
|
|
tf_name = sanitize_identifier(r.name)
|
|
state["resources"].append({
|
|
"mode": "managed",
|
|
"type": r.resource_type,
|
|
"name": tf_name,
|
|
"provider": f'provider["registry.terraform.io/hashicorp/{r.resource_type.split("_")[0]}"]',
|
|
"instances": [{
|
|
"schema_version": 0,
|
|
"attributes": {"id": r.unique_id, **r.attributes},
|
|
"sensitive_attributes": [],
|
|
"dependencies": [],
|
|
}],
|
|
})
|
|
|
|
state_path = Path(tmp_dir) / "terraform.tfstate"
|
|
state_path.write_text(json.dumps(state, indent=2), encoding="utf-8")
|
|
|
|
# Create .tf files so updater can process removals
|
|
resources_by_type: dict[str, list] = {}
|
|
for r in previous.resources:
|
|
resources_by_type.setdefault(r.resource_type, []).append(r)
|
|
|
|
for rt, resources in resources_by_type.items():
|
|
tf_path = Path(tmp_dir) / f"{rt}.tf"
|
|
lines = []
|
|
for r in resources:
|
|
tf_name = sanitize_identifier(r.name)
|
|
lines.append(f'# Source: {r.unique_id}')
|
|
lines.append(f'resource "{rt}" "{tf_name}" {{')
|
|
for k, v in r.attributes.items():
|
|
lines.append(f' {k} = "{v}"')
|
|
lines.append("}")
|
|
lines.append("")
|
|
tf_path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
# Build resource_attributes for added resources
|
|
resource_attributes = {}
|
|
for change in summary.changes:
|
|
if change.change_type == ChangeType.ADDED:
|
|
for r in current.resources:
|
|
if r.unique_id == change.resource_id:
|
|
resource_attributes[change.resource_id] = r.attributes
|
|
break
|
|
|
|
# Apply incremental update
|
|
updater = IncrementalUpdater(
|
|
change_summary=summary,
|
|
output_dir=tmp_dir,
|
|
resource_attributes=resource_attributes,
|
|
)
|
|
updater.apply()
|
|
|
|
# Verify removed resources are not in state file
|
|
updated_state = json.loads(
|
|
state_path.read_text(encoding="utf-8")
|
|
)
|
|
state_entries = updated_state.get("resources", [])
|
|
|
|
for change in removed_changes:
|
|
tf_name = sanitize_identifier(change.resource_name)
|
|
matching = [
|
|
e for e in state_entries
|
|
if e.get("type") == change.resource_type
|
|
and e.get("name") == tf_name
|
|
]
|
|
assert len(matching) == 0, (
|
|
f"Removed resource {change.resource_id} still has a "
|
|
f"state entry (type={change.resource_type}, name={tf_name})"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Property 26: Snapshot retention
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestSnapshotRetention:
|
|
"""Property 26: Snapshot retention.
|
|
|
|
**Validates: Requirements 8.6**
|
|
|
|
For any sequence of N scans (N >= 2) for the same Scan_Profile, at
|
|
least the two most recent scan results SHALL be retained in storage
|
|
after each scan completes.
|
|
"""
|
|
|
|
@given(num_scans=st.integers(min_value=2, max_value=8))
|
|
@settings(max_examples=100)
|
|
def test_at_least_two_snapshots_retained(self, num_scans):
|
|
"""After N scans, at least 2 most recent snapshots are retained."""
|
|
from unittest.mock import patch
|
|
from datetime import datetime, timezone
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
store = SnapshotStore(base_dir=tmp_dir)
|
|
profile_hash = "retention_test_profile"
|
|
|
|
# Store N scan results with mocked timestamps to ensure unique filenames
|
|
for i in range(num_scans):
|
|
result = ScanResult(
|
|
resources=[
|
|
DiscoveredResource(
|
|
resource_type="docker_service",
|
|
unique_id=f"svc_{i}",
|
|
name=f"service_{i}",
|
|
provider=ProviderType.DOCKER_SWARM,
|
|
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
architecture=CpuArchitecture.AMD64,
|
|
endpoint="localhost",
|
|
attributes={"version": str(i)},
|
|
raw_references=[],
|
|
)
|
|
],
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp=f"2024-01-{15 + i:02d}T10:00:00Z",
|
|
profile_hash=profile_hash,
|
|
is_partial=False,
|
|
)
|
|
# Mock datetime.now to return unique timestamps
|
|
mock_time = datetime(2024, 1, 15 + i, 10, 0, 0, tzinfo=timezone.utc)
|
|
with patch(
|
|
"iac_reverse.incremental.snapshot_store.datetime"
|
|
) as mock_dt:
|
|
mock_dt.now.return_value = mock_time
|
|
mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
|
|
store.store_snapshot(result, profile_hash)
|
|
|
|
# Count remaining snapshots
|
|
snapshot_files = list(store.snapshot_dir.glob(f"{profile_hash}_*.json"))
|
|
assert len(snapshot_files) >= 2, (
|
|
f"After {num_scans} scans, only {len(snapshot_files)} "
|
|
f"snapshots retained (expected >= 2)"
|
|
)
|
|
|
|
@given(num_scans=st.integers(min_value=2, max_value=8))
|
|
@settings(max_examples=100)
|
|
def test_most_recent_snapshot_is_loadable(self, num_scans):
|
|
"""The most recent snapshot can be loaded after multiple stores."""
|
|
from unittest.mock import patch
|
|
from datetime import datetime, timezone
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
store = SnapshotStore(base_dir=tmp_dir)
|
|
profile_hash = "loadable_test_profile"
|
|
|
|
last_resource_id = None
|
|
for i in range(num_scans):
|
|
last_resource_id = f"svc_{i}"
|
|
result = ScanResult(
|
|
resources=[
|
|
DiscoveredResource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id=last_resource_id,
|
|
name=f"deploy_{i}",
|
|
provider=ProviderType.KUBERNETES,
|
|
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
architecture=CpuArchitecture.AARCH64,
|
|
endpoint="k8s-api.local",
|
|
attributes={"replicas": i + 1},
|
|
raw_references=[],
|
|
)
|
|
],
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp=f"2024-01-{15 + i:02d}T10:00:00Z",
|
|
profile_hash=profile_hash,
|
|
is_partial=False,
|
|
)
|
|
mock_time = datetime(2024, 1, 15 + i, 10, 0, 0, tzinfo=timezone.utc)
|
|
with patch(
|
|
"iac_reverse.incremental.snapshot_store.datetime"
|
|
) as mock_dt:
|
|
mock_dt.now.return_value = mock_time
|
|
mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
|
|
store.store_snapshot(result, profile_hash)
|
|
|
|
# Load the most recent snapshot
|
|
loaded = store.load_previous(profile_hash)
|
|
assert loaded is not None, "Could not load most recent snapshot"
|
|
assert len(loaded.resources) == 1
|
|
assert loaded.resources[0].unique_id == last_resource_id, (
|
|
f"Expected most recent resource '{last_resource_id}', "
|
|
f"got '{loaded.resources[0].unique_id}'"
|
|
)
|
|
|
|
@given(num_scans=st.integers(min_value=3, max_value=10))
|
|
@settings(max_examples=100)
|
|
def test_different_profiles_retain_independently(self, num_scans):
|
|
"""Snapshots for different profiles are retained independently."""
|
|
from unittest.mock import patch
|
|
from datetime import datetime, timezone
|
|
|
|
with tempfile.TemporaryDirectory() as tmp_dir:
|
|
store = SnapshotStore(base_dir=tmp_dir)
|
|
profile_a = "profile_alpha"
|
|
profile_b = "profile_beta"
|
|
|
|
scan_idx = 0
|
|
for i in range(num_scans):
|
|
for profile_hash in [profile_a, profile_b]:
|
|
result = ScanResult(
|
|
resources=[
|
|
DiscoveredResource(
|
|
resource_type="docker_service",
|
|
unique_id=f"{profile_hash}_svc_{i}",
|
|
name=f"svc_{i}",
|
|
provider=ProviderType.DOCKER_SWARM,
|
|
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
architecture=CpuArchitecture.AMD64,
|
|
endpoint="localhost",
|
|
attributes={"idx": i},
|
|
raw_references=[],
|
|
)
|
|
],
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp=f"2024-01-{15 + i:02d}T10:00:00Z",
|
|
profile_hash=profile_hash,
|
|
is_partial=False,
|
|
)
|
|
# Use unique timestamps per store call
|
|
mock_time = datetime(2024, 1, 15, 10, scan_idx, 0, tzinfo=timezone.utc)
|
|
scan_idx += 1
|
|
with patch(
|
|
"iac_reverse.incremental.snapshot_store.datetime"
|
|
) as mock_dt:
|
|
mock_dt.now.return_value = mock_time
|
|
mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
|
|
store.store_snapshot(result, profile_hash)
|
|
|
|
# Both profiles should have at least 2 snapshots
|
|
snapshots_a = list(store.snapshot_dir.glob(f"{profile_a}_*.json"))
|
|
snapshots_b = list(store.snapshot_dir.glob(f"{profile_b}_*.json"))
|
|
|
|
assert len(snapshots_a) >= 2, (
|
|
f"Profile A has {len(snapshots_a)} snapshots (expected >= 2)"
|
|
)
|
|
assert len(snapshots_b) >= 2, (
|
|
f"Profile B has {len(snapshots_b)} snapshots (expected >= 2)"
|
|
)
|