506 lines
17 KiB
Python
506 lines
17 KiB
Python
"""Unit tests for the MultiProviderScanner.
|
|
|
|
Tests cover:
|
|
- All providers succeed: all resources collected
|
|
- One provider fails: others still complete, failed one reported
|
|
- Multiple providers fail: remaining still complete
|
|
- Error details include provider name and reason
|
|
"""
|
|
|
|
import pytest
|
|
|
|
from iac_reverse.models import (
|
|
CpuArchitecture,
|
|
DiscoveredResource,
|
|
PlatformCategory,
|
|
ProviderType,
|
|
ScanProfile,
|
|
ScanProgress,
|
|
ScanResult,
|
|
)
|
|
from iac_reverse.plugin_base import ProviderPlugin
|
|
from iac_reverse.scanner.multi_provider_scanner import (
|
|
MultiProviderScanner,
|
|
MultiProviderScanResult,
|
|
ProviderFailure,
|
|
ProviderScanEntry,
|
|
)
|
|
from iac_reverse.scanner.scanner import AuthenticationError
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers / Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def make_profile(provider: ProviderType = ProviderType.KUBERNETES) -> ScanProfile:
|
|
"""Create a valid ScanProfile with sensible defaults."""
|
|
return ScanProfile(
|
|
provider=provider,
|
|
credentials={"token": "test-token"},
|
|
endpoints=["https://api.local:6443"],
|
|
resource_type_filters=None,
|
|
)
|
|
|
|
|
|
def make_resource(
|
|
provider: ProviderType = ProviderType.KUBERNETES,
|
|
resource_type: str = "kubernetes_deployment",
|
|
name: str = "nginx",
|
|
) -> DiscoveredResource:
|
|
"""Create a sample DiscoveredResource."""
|
|
return DiscoveredResource(
|
|
resource_type=resource_type,
|
|
unique_id=f"{provider.value}/{resource_type}/{name}",
|
|
name=name,
|
|
provider=provider,
|
|
platform_category=PlatformCategory.CONTAINER_ORCHESTRATION,
|
|
architecture=CpuArchitecture.AARCH64,
|
|
endpoint="https://api.local:6443",
|
|
attributes={"replicas": 3},
|
|
raw_references=[],
|
|
)
|
|
|
|
|
|
class SuccessPlugin(ProviderPlugin):
|
|
"""A plugin that always succeeds with configurable resources."""
|
|
|
|
def __init__(self, resources: list[DiscoveredResource] | None = None):
|
|
self._resources = resources or []
|
|
|
|
def authenticate(self, credentials: dict[str, str]) -> None:
|
|
pass
|
|
|
|
def get_platform_category(self) -> PlatformCategory:
|
|
return PlatformCategory.CONTAINER_ORCHESTRATION
|
|
|
|
def list_endpoints(self) -> list[str]:
|
|
return ["https://api.local:6443"]
|
|
|
|
def list_supported_resource_types(self) -> list[str]:
|
|
return ["kubernetes_deployment", "kubernetes_service"]
|
|
|
|
def detect_architecture(self, endpoint: str) -> CpuArchitecture:
|
|
return CpuArchitecture.AARCH64
|
|
|
|
def discover_resources(
|
|
self,
|
|
endpoints: list[str],
|
|
resource_types: list[str],
|
|
progress_callback=None,
|
|
) -> ScanResult:
|
|
return ScanResult(
|
|
resources=self._resources,
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp="",
|
|
profile_hash="",
|
|
)
|
|
|
|
|
|
class FailingAuthPlugin(ProviderPlugin):
|
|
"""A plugin that fails during authentication."""
|
|
|
|
def __init__(self, error_message: str = "Invalid credentials"):
|
|
self._error_message = error_message
|
|
|
|
def authenticate(self, credentials: dict[str, str]) -> None:
|
|
raise RuntimeError(self._error_message)
|
|
|
|
def get_platform_category(self) -> PlatformCategory:
|
|
return PlatformCategory.CONTAINER_ORCHESTRATION
|
|
|
|
def list_endpoints(self) -> list[str]:
|
|
return []
|
|
|
|
def list_supported_resource_types(self) -> list[str]:
|
|
return []
|
|
|
|
def detect_architecture(self, endpoint: str) -> CpuArchitecture:
|
|
return CpuArchitecture.AMD64
|
|
|
|
def discover_resources(
|
|
self,
|
|
endpoints: list[str],
|
|
resource_types: list[str],
|
|
progress_callback=None,
|
|
) -> ScanResult:
|
|
return ScanResult(
|
|
resources=[],
|
|
warnings=[],
|
|
errors=[],
|
|
scan_timestamp="",
|
|
profile_hash="",
|
|
)
|
|
|
|
|
|
class FailingDiscoverPlugin(ProviderPlugin):
|
|
"""A plugin that fails during resource discovery."""
|
|
|
|
def __init__(self, error_message: str = "Connection refused"):
|
|
self._error_message = error_message
|
|
|
|
def authenticate(self, credentials: dict[str, str]) -> None:
|
|
pass
|
|
|
|
def get_platform_category(self) -> PlatformCategory:
|
|
return PlatformCategory.STORAGE_APPLIANCE
|
|
|
|
def list_endpoints(self) -> list[str]:
|
|
return ["https://nas.local:5001"]
|
|
|
|
def list_supported_resource_types(self) -> list[str]:
|
|
return ["synology_shared_folder"]
|
|
|
|
def detect_architecture(self, endpoint: str) -> CpuArchitecture:
|
|
return CpuArchitecture.AMD64
|
|
|
|
def discover_resources(
|
|
self,
|
|
endpoints: list[str],
|
|
resource_types: list[str],
|
|
progress_callback=None,
|
|
) -> ScanResult:
|
|
raise ConnectionError(self._error_message)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: All providers succeed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAllProvidersSucceed:
|
|
"""Tests for the happy path where all providers complete successfully."""
|
|
|
|
def test_single_provider_all_resources_collected(self):
|
|
resources = [
|
|
make_resource(name="deploy-1"),
|
|
make_resource(name="deploy-2"),
|
|
]
|
|
entry = ProviderScanEntry(
|
|
profile=make_profile(ProviderType.KUBERNETES),
|
|
plugin=SuccessPlugin(resources=resources),
|
|
)
|
|
scanner = MultiProviderScanner([entry])
|
|
|
|
result = scanner.scan()
|
|
|
|
assert len(result.resources) == 2
|
|
assert len(result.failed_providers) == 0
|
|
assert "kubernetes" in result.successful_providers
|
|
|
|
def test_multiple_providers_all_resources_merged(self):
|
|
k8s_resources = [
|
|
make_resource(ProviderType.KUBERNETES, "kubernetes_deployment", "nginx"),
|
|
]
|
|
docker_resources = [
|
|
make_resource(ProviderType.DOCKER_SWARM, "docker_service", "web"),
|
|
]
|
|
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.KUBERNETES),
|
|
plugin=SuccessPlugin(resources=k8s_resources),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.DOCKER_SWARM),
|
|
plugin=SuccessPlugin(resources=docker_resources),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
assert len(result.resources) == 2
|
|
assert len(result.failed_providers) == 0
|
|
assert len(result.successful_providers) == 2
|
|
|
|
def test_empty_entries_returns_empty_result(self):
|
|
scanner = MultiProviderScanner([])
|
|
|
|
result = scanner.scan()
|
|
|
|
assert len(result.resources) == 0
|
|
assert len(result.failed_providers) == 0
|
|
assert len(result.successful_providers) == 0
|
|
|
|
def test_scan_timestamp_is_set(self):
|
|
entry = ProviderScanEntry(
|
|
profile=make_profile(),
|
|
plugin=SuccessPlugin(resources=[]),
|
|
)
|
|
scanner = MultiProviderScanner([entry])
|
|
|
|
result = scanner.scan()
|
|
|
|
assert result.scan_timestamp != ""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: One provider fails, others succeed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestOneProviderFails:
|
|
"""Tests for partial failure: one provider fails, others complete."""
|
|
|
|
def test_failed_provider_does_not_block_others(self):
|
|
k8s_resources = [make_resource(ProviderType.KUBERNETES, name="nginx")]
|
|
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.KUBERNETES),
|
|
plugin=SuccessPlugin(resources=k8s_resources),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.SYNOLOGY),
|
|
plugin=FailingAuthPlugin("Invalid API key"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
# Kubernetes resources should still be collected
|
|
assert len(result.resources) == 1
|
|
assert result.resources[0].name == "nginx"
|
|
# Synology should be reported as failed
|
|
assert len(result.failed_providers) == 1
|
|
assert result.failed_providers[0].provider_name == "synology"
|
|
|
|
def test_failed_provider_reported_with_error_details(self):
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.KUBERNETES),
|
|
plugin=SuccessPlugin(resources=[]),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.SYNOLOGY),
|
|
plugin=FailingAuthPlugin("Token expired at 2024-01-15"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
failure = result.failed_providers[0]
|
|
assert failure.provider_name == "synology"
|
|
assert "Token expired" in failure.error_message
|
|
assert failure.error_type == "AuthenticationError"
|
|
|
|
def test_successful_providers_listed_correctly(self):
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.KUBERNETES),
|
|
plugin=SuccessPlugin(resources=[]),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.DOCKER_SWARM),
|
|
plugin=FailingAuthPlugin("Connection refused"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
assert "kubernetes" in result.successful_providers
|
|
assert "docker_swarm" not in result.successful_providers
|
|
|
|
def test_order_does_not_matter_failed_first(self):
|
|
"""Even if the first provider fails, subsequent ones still run."""
|
|
docker_resources = [make_resource(ProviderType.DOCKER_SWARM, "docker_service", "web")]
|
|
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.SYNOLOGY),
|
|
plugin=FailingAuthPlugin("Auth failed"),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.DOCKER_SWARM),
|
|
plugin=SuccessPlugin(resources=docker_resources),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
assert len(result.resources) == 1
|
|
assert result.resources[0].name == "web"
|
|
assert len(result.failed_providers) == 1
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Multiple providers fail
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestMultipleProvidersFail:
|
|
"""Tests for scenarios where multiple providers fail."""
|
|
|
|
def test_multiple_failures_remaining_still_complete(self):
|
|
k8s_resources = [make_resource(ProviderType.KUBERNETES, name="app")]
|
|
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.SYNOLOGY),
|
|
plugin=FailingAuthPlugin("Synology auth failed"),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.KUBERNETES),
|
|
plugin=SuccessPlugin(resources=k8s_resources),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.HARVESTER),
|
|
plugin=FailingAuthPlugin("Harvester unreachable"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
assert len(result.resources) == 1
|
|
assert result.resources[0].name == "app"
|
|
assert len(result.failed_providers) == 2
|
|
assert len(result.successful_providers) == 1
|
|
|
|
def test_all_providers_fail_returns_empty_resources(self):
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.SYNOLOGY),
|
|
plugin=FailingAuthPlugin("Auth error 1"),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.HARVESTER),
|
|
plugin=FailingAuthPlugin("Auth error 2"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
assert len(result.resources) == 0
|
|
assert len(result.failed_providers) == 2
|
|
assert len(result.successful_providers) == 0
|
|
|
|
def test_each_failure_has_distinct_error_details(self):
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.SYNOLOGY),
|
|
plugin=FailingAuthPlugin("Invalid API key"),
|
|
),
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.HARVESTER),
|
|
plugin=FailingAuthPlugin("Certificate expired"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
provider_names = [f.provider_name for f in result.failed_providers]
|
|
assert "synology" in provider_names
|
|
assert "harvester" in provider_names
|
|
|
|
synology_failure = next(
|
|
f for f in result.failed_providers if f.provider_name == "synology"
|
|
)
|
|
harvester_failure = next(
|
|
f for f in result.failed_providers if f.provider_name == "harvester"
|
|
)
|
|
assert "Invalid API key" in synology_failure.error_message
|
|
assert "Certificate expired" in harvester_failure.error_message
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: Error details include provider name and reason
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestErrorDetails:
|
|
"""Tests that error details contain provider name and failure reason."""
|
|
|
|
def test_auth_error_includes_provider_name(self):
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.DOCKER_SWARM),
|
|
plugin=FailingAuthPlugin("Bad token"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
assert result.failed_providers[0].provider_name == "docker_swarm"
|
|
|
|
def test_auth_error_includes_reason(self):
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.DOCKER_SWARM),
|
|
plugin=FailingAuthPlugin("Token revoked by admin"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
assert "Token revoked by admin" in result.failed_providers[0].error_message
|
|
|
|
def test_connection_error_includes_error_type(self):
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.SYNOLOGY),
|
|
plugin=FailingDiscoverPlugin("Connection timed out"),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
# ConnectionError is raised during discover; Scanner wraps it in
|
|
# ConnectionLostError. The error_type reflects the wrapped exception.
|
|
failure = result.failed_providers[0]
|
|
assert failure.provider_name == "synology"
|
|
assert failure.error_type == "ConnectionLostError"
|
|
assert "Connection lost" in failure.error_message
|
|
|
|
def test_validation_error_includes_details(self):
|
|
"""A provider with invalid profile still reports correctly."""
|
|
# Create a profile with empty credentials to trigger ValueError
|
|
bad_profile = ScanProfile(
|
|
provider=ProviderType.BARE_METAL,
|
|
credentials={},
|
|
endpoints=["https://bmc.local"],
|
|
)
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=bad_profile,
|
|
plugin=SuccessPlugin(resources=[]),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
result = scanner.scan()
|
|
|
|
failure = result.failed_providers[0]
|
|
assert failure.provider_name == "bare_metal"
|
|
assert failure.error_type == "ValueError"
|
|
assert "credentials" in failure.error_message.lower()
|
|
|
|
def test_progress_callback_invoked_for_successful_providers(self):
|
|
"""Progress callback is passed through to individual scanners."""
|
|
resources = [make_resource(name="test")]
|
|
entries = [
|
|
ProviderScanEntry(
|
|
profile=make_profile(ProviderType.KUBERNETES),
|
|
plugin=SuccessPlugin(resources=resources),
|
|
),
|
|
]
|
|
scanner = MultiProviderScanner(entries)
|
|
|
|
progress_updates = []
|
|
scanner.scan(progress_callback=progress_updates.append)
|
|
|
|
# SuccessPlugin doesn't call progress_callback, but the scan still works
|
|
# This verifies the callback is accepted without error
|
|
assert len(scanner.entries) == 1
|