"""Snapshot storage and retrieval for incremental scan comparison. Stores scan results as timestamped JSON files in `.iac-reverse/snapshots/` and provides retrieval of previous snapshots for change detection. """ import json import os from datetime import datetime, timezone from pathlib import Path from typing import Optional from iac_reverse.models import ( CpuArchitecture, DiscoveredResource, PlatformCategory, ProviderType, ScanResult, ) # Default directory for snapshot storage SNAPSHOT_DIR = os.path.join(".iac-reverse", "snapshots") # Minimum number of snapshots to retain per profile MIN_RETAINED_SNAPSHOTS = 2 def _serialize_scan_result(result: ScanResult) -> dict: """Serialize a ScanResult to a JSON-compatible dictionary.""" return { "scan_timestamp": result.scan_timestamp, "profile_hash": result.profile_hash, "is_partial": result.is_partial, "warnings": result.warnings, "errors": result.errors, "resources": [_serialize_resource(r) for r in result.resources], } def _serialize_resource(resource: DiscoveredResource) -> dict: """Serialize a DiscoveredResource to a JSON-compatible dictionary.""" return { "resource_type": resource.resource_type, "unique_id": resource.unique_id, "name": resource.name, "provider": resource.provider.value, "platform_category": resource.platform_category.value, "architecture": resource.architecture.value, "endpoint": resource.endpoint, "attributes": resource.attributes, "raw_references": resource.raw_references, } def _deserialize_scan_result(data: dict) -> ScanResult: """Deserialize a dictionary into a ScanResult.""" resources = [_deserialize_resource(r) for r in data["resources"]] return ScanResult( resources=resources, warnings=data["warnings"], errors=data["errors"], scan_timestamp=data["scan_timestamp"], profile_hash=data["profile_hash"], is_partial=data.get("is_partial", False), ) def _deserialize_resource(data: dict) -> DiscoveredResource: """Deserialize a dictionary into a DiscoveredResource.""" return DiscoveredResource( resource_type=data["resource_type"], unique_id=data["unique_id"], name=data["name"], provider=ProviderType(data["provider"]), platform_category=PlatformCategory(data["platform_category"]), architecture=CpuArchitecture(data["architecture"]), endpoint=data["endpoint"], attributes=data["attributes"], raw_references=data.get("raw_references", []), ) class SnapshotStore: """Manages storage and retrieval of scan result snapshots. Stores scan results as timestamped JSON files in a configurable directory (defaults to `.iac-reverse/snapshots/`). Supports retrieval of the most recent snapshot for a given profile hash and automatic pruning of old snapshots. """ def __init__(self, base_dir: Optional[str] = None) -> None: """Initialize the snapshot store. Args: base_dir: Base directory for snapshot storage. Defaults to `.iac-reverse/snapshots/`. """ self._snapshot_dir = Path(base_dir) if base_dir else Path(SNAPSHOT_DIR) @property def snapshot_dir(self) -> Path: """Return the snapshot directory path.""" return self._snapshot_dir def store_snapshot(self, result: ScanResult, profile_hash: str) -> None: """Store a scan result as a timestamped JSON snapshot. Args: result: The scan result to store. profile_hash: Hash identifying the scan profile. The snapshot is saved with filename format: {profile_hash}_{timestamp}.json where timestamp is ISO format with colons replaced by dashes. After storing, old snapshots are pruned to retain at least MIN_RETAINED_SNAPSHOTS most recent files per profile_hash. """ self._snapshot_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%SZ") filename = f"{profile_hash}_{timestamp}.json" filepath = self._snapshot_dir / filename data = _serialize_scan_result(result) with open(filepath, "w", encoding="utf-8") as f: json.dump(data, f, indent=2) self._prune_snapshots(profile_hash) def load_previous(self, profile_hash: str) -> Optional[ScanResult]: """Load the most recent snapshot for a given profile hash. Args: profile_hash: Hash identifying the scan profile. Returns: The most recent ScanResult for the profile, or None if no snapshot exists. """ snapshots = self._list_snapshots(profile_hash) if not snapshots: return None # Sort by filename (which includes timestamp) to get most recent snapshots.sort() most_recent = snapshots[-1] with open(most_recent, "r", encoding="utf-8") as f: data = json.load(f) return _deserialize_scan_result(data) def _list_snapshots(self, profile_hash: str) -> list[Path]: """List all snapshot files for a given profile hash.""" if not self._snapshot_dir.exists(): return [] prefix = f"{profile_hash}_" return [ p for p in self._snapshot_dir.iterdir() if p.is_file() and p.name.startswith(prefix) and p.name.endswith(".json") ] def _prune_snapshots(self, profile_hash: str) -> None: """Remove old snapshots, keeping at least MIN_RETAINED_SNAPSHOTS most recent.""" snapshots = self._list_snapshots(profile_hash) if len(snapshots) <= MIN_RETAINED_SNAPSHOTS: return # Sort by filename (timestamp is embedded) and remove oldest snapshots.sort() to_remove = snapshots[: len(snapshots) - MIN_RETAINED_SNAPSHOTS] for snapshot_path in to_remove: snapshot_path.unlink()