246 lines
9.3 KiB
Python
246 lines
9.3 KiB
Python
"""Unit tests for the SnapshotStore class."""
|
|
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from iac_reverse.incremental.snapshot_store import SnapshotStore
|
|
from iac_reverse.models import (
|
|
CpuArchitecture,
|
|
DiscoveredResource,
|
|
PlatformCategory,
|
|
ProviderType,
|
|
ScanResult,
|
|
)
|
|
|
|
|
|
def _make_scan_result(
|
|
profile_hash: str = "abc123",
|
|
resource_name: str = "test-resource",
|
|
) -> ScanResult:
|
|
"""Create a sample ScanResult for testing."""
|
|
resource = DiscoveredResource(
|
|
resource_type="kubernetes_deployment",
|
|
unique_id="apps/v1/deployments/default/nginx",
|
|
name=resource_name,
|
|
provider=ProviderType.KUBERNETES,
|
|
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
architecture=CpuArchitecture.AARCH64,
|
|
endpoint="https://k8s-api.internal.lab:6443",
|
|
attributes={"replicas": 3, "image": "nginx:1.25"},
|
|
raw_references=["default/services/nginx-svc"],
|
|
)
|
|
return ScanResult(
|
|
resources=[resource],
|
|
warnings=["some warning"],
|
|
errors=[],
|
|
scan_timestamp="2024-01-15T10:30:00Z",
|
|
profile_hash=profile_hash,
|
|
is_partial=False,
|
|
)
|
|
|
|
|
|
class TestStoreSnapshot:
|
|
"""Tests for storing snapshots."""
|
|
|
|
def test_store_creates_file_in_correct_directory(self, tmp_path: Path) -> None:
|
|
"""Storing a snapshot creates a JSON file in the snapshot directory."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
result = _make_scan_result(profile_hash="prof1")
|
|
|
|
store.store_snapshot(result, "prof1")
|
|
|
|
files = list(snapshot_dir.iterdir())
|
|
assert len(files) == 1
|
|
assert files[0].name.startswith("prof1_")
|
|
assert files[0].name.endswith(".json")
|
|
|
|
def test_store_creates_directory_if_not_exists(self, tmp_path: Path) -> None:
|
|
"""Storing a snapshot creates the snapshot directory if it doesn't exist."""
|
|
snapshot_dir = tmp_path / "nested" / "deep" / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
result = _make_scan_result()
|
|
|
|
store.store_snapshot(result, "abc123")
|
|
|
|
assert snapshot_dir.exists()
|
|
assert len(list(snapshot_dir.iterdir())) == 1
|
|
|
|
def test_stored_file_contains_valid_json(self, tmp_path: Path) -> None:
|
|
"""The stored snapshot file contains valid JSON with expected fields."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
result = _make_scan_result(profile_hash="prof1")
|
|
|
|
store.store_snapshot(result, "prof1")
|
|
|
|
files = list(snapshot_dir.iterdir())
|
|
with open(files[0], "r") as f:
|
|
data = json.load(f)
|
|
|
|
assert data["profile_hash"] == "prof1"
|
|
assert data["scan_timestamp"] == "2024-01-15T10:30:00Z"
|
|
assert len(data["resources"]) == 1
|
|
assert data["resources"][0]["resource_type"] == "kubernetes_deployment"
|
|
assert data["resources"][0]["provider"] == "kubernetes"
|
|
assert data["resources"][0]["architecture"] == "aarch64"
|
|
|
|
|
|
class TestLoadPrevious:
|
|
"""Tests for loading previous snapshots."""
|
|
|
|
def test_load_returns_correct_scan_result(self, tmp_path: Path) -> None:
|
|
"""Loading a previous snapshot returns the correct ScanResult."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
original = _make_scan_result(profile_hash="prof1", resource_name="nginx")
|
|
|
|
store.store_snapshot(original, "prof1")
|
|
loaded = store.load_previous("prof1")
|
|
|
|
assert loaded is not None
|
|
assert loaded.profile_hash == "prof1"
|
|
assert loaded.scan_timestamp == "2024-01-15T10:30:00Z"
|
|
assert loaded.is_partial is False
|
|
assert loaded.warnings == ["some warning"]
|
|
assert loaded.errors == []
|
|
assert len(loaded.resources) == 1
|
|
|
|
resource = loaded.resources[0]
|
|
assert resource.resource_type == "kubernetes_deployment"
|
|
assert resource.unique_id == "apps/v1/deployments/default/nginx"
|
|
assert resource.name == "nginx"
|
|
assert resource.provider == ProviderType.KUBERNETES
|
|
assert resource.platform_category == PlatformCategory.CONTAINER_ORCHESTRATION
|
|
assert resource.architecture == CpuArchitecture.AARCH64
|
|
assert resource.endpoint == "https://k8s-api.internal.lab:6443"
|
|
assert resource.attributes == {"replicas": 3, "image": "nginx:1.25"}
|
|
assert resource.raw_references == ["default/services/nginx-svc"]
|
|
|
|
def test_load_returns_none_when_no_snapshot_exists(self, tmp_path: Path) -> None:
|
|
"""Loading when no snapshot exists returns None."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
|
|
result = store.load_previous("nonexistent")
|
|
|
|
assert result is None
|
|
|
|
def test_load_returns_none_when_directory_does_not_exist(
|
|
self, tmp_path: Path
|
|
) -> None:
|
|
"""Loading when the snapshot directory doesn't exist returns None."""
|
|
snapshot_dir = tmp_path / "does_not_exist"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
|
|
result = store.load_previous("prof1")
|
|
|
|
assert result is None
|
|
|
|
def test_load_returns_most_recent_snapshot(self, tmp_path: Path) -> None:
|
|
"""When multiple snapshots exist, load returns the most recent one."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
|
|
# Store first snapshot
|
|
result1 = _make_scan_result(profile_hash="prof1", resource_name="first")
|
|
store.store_snapshot(result1, "prof1")
|
|
time.sleep(1.1) # Ensure different timestamp
|
|
|
|
# Store second snapshot
|
|
result2 = _make_scan_result(profile_hash="prof1", resource_name="second")
|
|
store.store_snapshot(result2, "prof1")
|
|
|
|
loaded = store.load_previous("prof1")
|
|
assert loaded is not None
|
|
assert loaded.resources[0].name == "second"
|
|
|
|
|
|
class TestRetention:
|
|
"""Tests for snapshot retention/pruning."""
|
|
|
|
def test_retains_at_least_two_most_recent_snapshots(self, tmp_path: Path) -> None:
|
|
"""Only the 2 most recent snapshots are kept per profile."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
|
|
# Store 4 snapshots with different timestamps
|
|
for i in range(4):
|
|
result = _make_scan_result(
|
|
profile_hash="prof1", resource_name=f"resource-{i}"
|
|
)
|
|
store.store_snapshot(result, "prof1")
|
|
time.sleep(1.1) # Ensure different timestamps
|
|
|
|
# Should only have 2 files remaining
|
|
files = list(snapshot_dir.iterdir())
|
|
assert len(files) == 2
|
|
|
|
# The most recent should be loadable
|
|
loaded = store.load_previous("prof1")
|
|
assert loaded is not None
|
|
assert loaded.resources[0].name == "resource-3"
|
|
|
|
def test_two_snapshots_are_not_pruned(self, tmp_path: Path) -> None:
|
|
"""Exactly 2 snapshots are retained without pruning."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
|
|
store.store_snapshot(_make_scan_result(profile_hash="prof1"), "prof1")
|
|
time.sleep(1.1)
|
|
store.store_snapshot(_make_scan_result(profile_hash="prof1"), "prof1")
|
|
|
|
files = list(snapshot_dir.iterdir())
|
|
assert len(files) == 2
|
|
|
|
|
|
class TestMultipleProfiles:
|
|
"""Tests for multiple profile isolation."""
|
|
|
|
def test_multiple_profiles_do_not_interfere(self, tmp_path: Path) -> None:
|
|
"""Snapshots from different profiles don't interfere with each other."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
|
|
result_a = _make_scan_result(profile_hash="profile_a", resource_name="res-a")
|
|
result_b = _make_scan_result(profile_hash="profile_b", resource_name="res-b")
|
|
|
|
store.store_snapshot(result_a, "profile_a")
|
|
store.store_snapshot(result_b, "profile_b")
|
|
|
|
loaded_a = store.load_previous("profile_a")
|
|
loaded_b = store.load_previous("profile_b")
|
|
|
|
assert loaded_a is not None
|
|
assert loaded_a.resources[0].name == "res-a"
|
|
assert loaded_b is not None
|
|
assert loaded_b.resources[0].name == "res-b"
|
|
|
|
def test_pruning_only_affects_matching_profile(self, tmp_path: Path) -> None:
|
|
"""Pruning for one profile does not remove snapshots from another."""
|
|
snapshot_dir = tmp_path / "snapshots"
|
|
store = SnapshotStore(base_dir=str(snapshot_dir))
|
|
|
|
# Store 4 snapshots for profile_a (should prune to 2)
|
|
for i in range(4):
|
|
result = _make_scan_result(
|
|
profile_hash="profile_a", resource_name=f"a-{i}"
|
|
)
|
|
store.store_snapshot(result, "profile_a")
|
|
time.sleep(1.1)
|
|
|
|
# Store 1 snapshot for profile_b
|
|
result_b = _make_scan_result(profile_hash="profile_b", resource_name="b-0")
|
|
store.store_snapshot(result_b, "profile_b")
|
|
|
|
# profile_a should have 2 files, profile_b should have 1
|
|
all_files = list(snapshot_dir.iterdir())
|
|
a_files = [f for f in all_files if f.name.startswith("profile_a_")]
|
|
b_files = [f for f in all_files if f.name.startswith("profile_b_")]
|
|
|
|
assert len(a_files) == 2
|
|
assert len(b_files) == 1
|