"""Unit tests for Authentik authentication and discovery plugin.""" from unittest.mock import MagicMock, patch import pytest from iac_reverse.auth.authentik_auth import ( AuthenticationError, AuthentikAuthProvider, AuthentikConfig, AuthentikSession, ) from iac_reverse.auth.authentik_discovery import ( AuthentikDiscoveryError, AuthentikDiscoveryPlugin, ) from iac_reverse.models import CpuArchitecture, PlatformCategory, ScanProgress # --------------------------------------------------------------------------- # AuthentikAuthProvider tests # --------------------------------------------------------------------------- class TestAuthentikAuthProvider: """Tests for AuthentikAuthProvider SSO authentication.""" def setup_method(self): self.provider = AuthentikAuthProvider() self.config = AuthentikConfig( base_url="https://auth.internal.lab", client_id="iac-reverse-tool", client_secret="test-secret", ) @patch("iac_reverse.auth.authentik_auth.requests.post") @patch("iac_reverse.auth.authentik_auth.requests.get") def test_authenticate_user_success(self, mock_get, mock_post): """Successful authentication returns a valid session.""" mock_post.return_value = MagicMock( status_code=200, json=lambda: { "access_token": "access-123", "refresh_token": "refresh-456", }, ) mock_get.return_value = MagicMock( status_code=200, json=lambda: { "sub": "user-001", "groups": ["admins", "infra-team"], }, ) session = self.provider.authenticate_user(self.config) assert session.access_token == "access-123" assert session.refresh_token == "refresh-456" assert session.user_id == "user-001" assert session.groups == ["admins", "infra-team"] @patch("iac_reverse.auth.authentik_auth.requests.post") def test_authenticate_user_failure_status(self, mock_post): """Authentication failure raises AuthenticationError.""" mock_post.return_value = MagicMock( status_code=401, text="Invalid client credentials", ) with pytest.raises(AuthenticationError) as exc_info: self.provider.authenticate_user(self.config) assert "Authentik" in str(exc_info.value) assert "401" in str(exc_info.value) @patch("iac_reverse.auth.authentik_auth.requests.post") def test_authenticate_user_connection_error(self, mock_post): """Connection error raises AuthenticationError.""" import requests mock_post.side_effect = requests.ConnectionError("Connection refused") with pytest.raises(AuthenticationError) as exc_info: self.provider.authenticate_user(self.config) assert "Authentik" in str(exc_info.value) assert "failed to connect" in str(exc_info.value) @patch("iac_reverse.auth.authentik_auth.requests.post") @patch("iac_reverse.auth.authentik_auth.requests.get") def test_refresh_session_success(self, mock_get, mock_post): """Successful token refresh returns updated session.""" session = AuthentikSession( access_token="old-access", refresh_token="old-refresh", user_id="user-001", groups=["admins"], ) mock_post.return_value = MagicMock( status_code=200, json=lambda: { "access_token": "new-access-789", "refresh_token": "new-refresh-012", }, ) mock_get.return_value = MagicMock( status_code=200, json=lambda: { "sub": "user-001", "groups": ["admins", "new-group"], }, ) new_session = self.provider.refresh_session(self.config, session) assert new_session.access_token == "new-access-789" assert new_session.refresh_token == "new-refresh-012" assert new_session.user_id == "user-001" @patch("iac_reverse.auth.authentik_auth.requests.post") def test_refresh_session_failure(self, mock_post): """Failed refresh raises AuthenticationError.""" session = AuthentikSession( access_token="old-access", refresh_token="expired-refresh", user_id="user-001", groups=[], ) mock_post.return_value = MagicMock( status_code=400, text="Invalid refresh token", ) with pytest.raises(AuthenticationError) as exc_info: self.provider.refresh_session(self.config, session) assert "refresh failed" in str(exc_info.value) @patch("iac_reverse.auth.authentik_auth.requests.get") def test_validate_token_valid(self, mock_get): """Valid token returns True.""" mock_get.return_value = MagicMock(status_code=200) result = self.provider.validate_token(self.config, "valid-token") assert result is True @patch("iac_reverse.auth.authentik_auth.requests.get") def test_validate_token_invalid(self, mock_get): """Invalid token returns False.""" mock_get.return_value = MagicMock(status_code=401) result = self.provider.validate_token(self.config, "invalid-token") assert result is False @patch("iac_reverse.auth.authentik_auth.requests.get") def test_validate_token_connection_error(self, mock_get): """Connection error during validation returns False.""" import requests mock_get.side_effect = requests.ConnectionError("timeout") result = self.provider.validate_token(self.config, "some-token") assert result is False # --------------------------------------------------------------------------- # AuthentikDiscoveryPlugin tests # --------------------------------------------------------------------------- class TestAuthentikDiscoveryPlugin: """Tests for AuthentikDiscoveryPlugin resource discovery.""" def setup_method(self): self.plugin = AuthentikDiscoveryPlugin() self.credentials = { "base_url": "https://auth.internal.lab", "api_token": "test-api-token", } def test_get_platform_category(self): """Plugin returns CONTAINER_ORCHESTRATION category.""" assert self.plugin.get_platform_category() == PlatformCategory.CONTAINER_ORCHESTRATION def test_list_supported_resource_types(self): """Plugin lists all expected Authentik resource types.""" types = self.plugin.list_supported_resource_types() expected = [ "authentik_flow", "authentik_stage", "authentik_provider", "authentik_application", "authentik_outpost", "authentik_property_mapping", "authentik_certificate", "authentik_group", "authentik_source", ] assert types == expected def test_detect_architecture_defaults_to_amd64(self): """Architecture detection defaults to AMD64.""" arch = self.plugin.detect_architecture("https://auth.internal.lab") assert arch == CpuArchitecture.AMD64 def test_list_endpoints_before_auth(self): """Endpoints list is empty before authentication.""" assert self.plugin.list_endpoints() == [] @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_authenticate_success(self, mock_get): """Successful authentication sets internal state.""" mock_get.return_value = MagicMock(status_code=200, json=lambda: {"results": []}) self.plugin.authenticate(self.credentials) assert self.plugin.list_endpoints() == ["https://auth.internal.lab"] @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_authenticate_invalid_token(self, mock_get): """Invalid API token raises AuthentikDiscoveryError.""" mock_get.return_value = MagicMock(status_code=401) with pytest.raises(AuthentikDiscoveryError) as exc_info: self.plugin.authenticate(self.credentials) assert "invalid API token" in str(exc_info.value) def test_authenticate_missing_base_url(self): """Missing base_url raises AuthentikDiscoveryError.""" with pytest.raises(AuthentikDiscoveryError) as exc_info: self.plugin.authenticate({"api_token": "token"}) assert "base_url" in str(exc_info.value) def test_authenticate_missing_api_token(self): """Missing api_token raises AuthentikDiscoveryError.""" with pytest.raises(AuthentikDiscoveryError) as exc_info: self.plugin.authenticate({"base_url": "https://auth.lab"}) assert "api_token" in str(exc_info.value) @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_authenticate_connection_error(self, mock_get): """Connection error during auth raises AuthentikDiscoveryError.""" import requests mock_get.side_effect = requests.ConnectionError("refused") with pytest.raises(AuthentikDiscoveryError) as exc_info: self.plugin.authenticate(self.credentials) assert "failed to connect" in str(exc_info.value) def test_discover_resources_without_auth(self): """Discovering without authentication raises error.""" with pytest.raises(AuthentikDiscoveryError) as exc_info: self.plugin.discover_resources( ["https://auth.lab"], ["authentik_flow"], lambda p: None, ) assert "must authenticate" in str(exc_info.value) @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_discover_resources_flows(self, mock_get): """Discovers Authentik flows from the API.""" # First call is for authentication check auth_response = MagicMock(status_code=200, json=lambda: {"results": []}) # Second call is for flow discovery flow_response = MagicMock( status_code=200, json=lambda: { "results": [ { "pk": "flow-uuid-1", "name": "default-authentication-flow", "slug": "default-authentication-flow", "stages": ["stage-1", "stage-2"], }, { "pk": "flow-uuid-2", "name": "default-enrollment-flow", "slug": "default-enrollment-flow", "stages": ["stage-3"], }, ], "pagination": {"next": 0, "count": 2}, }, ) mock_get.side_effect = [auth_response, flow_response] self.plugin.authenticate(self.credentials) progress_updates: list[ScanProgress] = [] result = self.plugin.discover_resources( ["https://auth.internal.lab"], ["authentik_flow"], lambda p: progress_updates.append(p), ) assert len(result.resources) == 2 assert result.resources[0].resource_type == "authentik_flow" assert result.resources[0].name == "default-authentication-flow" assert result.resources[0].unique_id == "authentik/authentik_flow/flow-uuid-1" assert "stage-1" in result.resources[0].raw_references assert len(result.warnings) == 0 assert len(result.errors) == 0 @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_discover_resources_multiple_types(self, mock_get): """Discovers multiple resource types in one call.""" auth_response = MagicMock(status_code=200, json=lambda: {"results": []}) app_response = MagicMock( status_code=200, json=lambda: { "results": [ { "pk": "app-1", "name": "grafana", "slug": "grafana", "provider": "provider-1", } ], "pagination": {"next": 0, "count": 1}, }, ) group_response = MagicMock( status_code=200, json=lambda: { "results": [ {"pk": "group-1", "name": "admins"}, {"pk": "group-2", "name": "users"}, ], "pagination": {"next": 0, "count": 2}, }, ) mock_get.side_effect = [auth_response, app_response, group_response] self.plugin.authenticate(self.credentials) result = self.plugin.discover_resources( ["https://auth.internal.lab"], ["authentik_application", "authentik_group"], lambda p: None, ) assert len(result.resources) == 3 app_resources = [r for r in result.resources if r.resource_type == "authentik_application"] group_resources = [r for r in result.resources if r.resource_type == "authentik_group"] assert len(app_resources) == 1 assert len(group_resources) == 2 @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_discover_resources_unsupported_type_warning(self, mock_get): """Unsupported resource type produces a warning.""" auth_response = MagicMock(status_code=200, json=lambda: {"results": []}) mock_get.side_effect = [auth_response] self.plugin.authenticate(self.credentials) result = self.plugin.discover_resources( ["https://auth.internal.lab"], ["authentik_nonexistent"], lambda p: None, ) assert len(result.resources) == 0 assert len(result.warnings) == 1 assert "Unsupported" in result.warnings[0] @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_discover_resources_api_error(self, mock_get): """API error during discovery is captured in errors list.""" auth_response = MagicMock(status_code=200, json=lambda: {"results": []}) error_response = MagicMock(status_code=500) mock_get.side_effect = [auth_response, error_response] self.plugin.authenticate(self.credentials) result = self.plugin.discover_resources( ["https://auth.internal.lab"], ["authentik_flow"], lambda p: None, ) assert len(result.resources) == 0 assert len(result.errors) == 1 assert "authentik_flow" in result.errors[0] @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_discover_resources_pagination(self, mock_get): """Handles paginated API responses correctly.""" auth_response = MagicMock(status_code=200, json=lambda: {"results": []}) page1_response = MagicMock( status_code=200, json=lambda: { "results": [{"pk": "cert-1", "name": "cert-one"}], "pagination": {"next": 2, "count": 2}, }, ) page2_response = MagicMock( status_code=200, json=lambda: { "results": [{"pk": "cert-2", "name": "cert-two"}], "pagination": {"next": 0, "count": 2}, }, ) mock_get.side_effect = [auth_response, page1_response, page2_response] self.plugin.authenticate(self.credentials) result = self.plugin.discover_resources( ["https://auth.internal.lab"], ["authentik_certificate"], lambda p: None, ) assert len(result.resources) == 2 assert result.resources[0].name == "cert-one" assert result.resources[1].name == "cert-two" @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_discover_resources_progress_callback(self, mock_get): """Progress callback is invoked correctly during discovery.""" auth_response = MagicMock(status_code=200, json=lambda: {"results": []}) empty_response = MagicMock( status_code=200, json=lambda: {"results": [], "pagination": {"next": 0, "count": 0}}, ) mock_get.side_effect = [auth_response, empty_response, empty_response] self.plugin.authenticate(self.credentials) progress_updates: list[ScanProgress] = [] self.plugin.discover_resources( ["https://auth.internal.lab"], ["authentik_flow", "authentik_stage"], lambda p: progress_updates.append(p), ) # Should have progress for each type + final "complete" assert len(progress_updates) == 3 assert progress_updates[0].current_resource_type == "authentik_flow" assert progress_updates[1].current_resource_type == "authentik_stage" assert progress_updates[2].current_resource_type == "complete" @patch("iac_reverse.auth.authentik_discovery.requests.get") def test_extract_references_from_resource(self, mock_get): """References are extracted from API response fields.""" auth_response = MagicMock(status_code=200, json=lambda: {"results": []}) app_response = MagicMock( status_code=200, json=lambda: { "results": [ { "pk": "app-1", "name": "my-app", "provider": "provider-uuid-1", "group": "group-uuid-1", } ], "pagination": {"next": 0, "count": 1}, }, ) mock_get.side_effect = [auth_response, app_response] self.plugin.authenticate(self.credentials) result = self.plugin.discover_resources( ["https://auth.internal.lab"], ["authentik_application"], lambda p: None, ) refs = result.resources[0].raw_references assert "provider-uuid-1" in refs assert "group-uuid-1" in refs