diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 06ad036..ba96099 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -19,6 +19,7 @@ Python style - All imports at top of file (never inside functions) - Apache 2.0 license header in every .py file (including `__init__.py`) - Docstrings must start with a short summary line +- End all log messages with a period: `logger.info("Message.")` Patterns - `__init__` methods must not raise exceptions; defer validation and connection to first use (lazy init) @@ -31,8 +32,9 @@ Patterns - Keep error response format stable: `{"success": false, "statusCode": int, "errors": [...]}` Testing -- Mirror src structure: `src/handlers/` -> `tests/handlers/` -- Mock external services via `conftest.py`: Kafka, EventBridge, PostgreSQL, S3 +- Mirror src structure: `src/handlers/` -> `tests/unit/handlers/` +- Unit tests: mock external services via `conftest.py` (Kafka, EventBridge, PostgreSQL, S3) +- Integration tests: call `lambda_handler` directly with real containers (testcontainers-python for Kafka, PostgreSQL, LocalStack) - No real API/DB calls in unit tests - Use `mocker.patch("module.dependency")` or `mocker.patch.object(Class, "method")` - Assert pattern: `assert expected == actual` diff --git a/.github/workflows/check_python.yml b/.github/workflows/check_python.yml index d0f3f5d..20820ab 100644 --- a/.github/workflows/check_python.yml +++ b/.github/workflows/check_python.yml @@ -111,7 +111,32 @@ jobs: id: check-format run: black --check $(git ls-files '*.py') - pytest-test: + mypy-check: + name: Mypy Type Check + needs: detect + if: needs.detect.outputs.python_changed == 'true' + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 + with: + persist-credentials: false + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 + with: + python-version: '3.13' + cache: 'pip' + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Check types with Mypy + id: check-types + run: mypy . + + unit-tests: name: Pytest Unit Tests with Coverage needs: detect if: needs.detect.outputs.python_changed == 'true' @@ -123,7 +148,8 @@ jobs: persist-credentials: false fetch-depth: 0 - - uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 + - name: Set up Python + uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 with: python-version: '3.13' cache: 'pip' @@ -132,13 +158,14 @@ jobs: run: pip install -r requirements.txt - name: Check code coverage with Pytest - run: pytest --cov=. -v tests/ --cov-fail-under=80 + run: pytest --cov=. -v tests/unit/ --cov-fail-under=80 - mypy-check: - name: Mypy Type Check + integration-tests: + name: Pytest Integration Tests needs: detect if: needs.detect.outputs.python_changed == 'true' runs-on: ubuntu-latest + timeout-minutes: 15 steps: - name: Checkout repository uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 @@ -155,9 +182,8 @@ jobs: - name: Install dependencies run: pip install -r requirements.txt - - name: Check types with Mypy - id: check-types - run: mypy . + - name: Run integration tests + run: pytest tests/integration/ -v --tb=short --log-cli-level=INFO noop: name: No Operation diff --git a/DEVELOPER.md b/DEVELOPER.md index ccf04d5..b2b5b88 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -5,8 +5,9 @@ - [Run Pylint Tool Locally](#run-pylint-tool-locally) - [Run Black Tool Locally](#run-black-tool-locally) - [Run mypy Tool Locally](#run-mypy-tool-locally) -- [Run Unit Test](#running-unit-test) +- [Run Unit Test Locally](#run-unit-test-locally) - [Code Coverage](#code-coverage) +- [Run Integration Test Locally](#run-integration-test-locally) ## Get Started @@ -45,7 +46,7 @@ To run Pylint on a specific file, follow the pattern `pylint /// None: + """Handle GET requests.""" + if self.path == "/keys": + self._json_response(200, {"keys": [{"key": self.public_key_b64}]}) + elif self.path == "/health": + self._json_response(200, {"status": "ok"}) + elif self.path == "/private-key": + self.send_response(200) + self.send_header("Content-Type", "application/x-pem-file") + self.end_headers() + self.wfile.write(self.private_key_pem) + elif self.path == "/": + self.send_response(303) + self.send_header("Location", "http://localhost/login") + self.end_headers() + else: + self._json_response(404, {"error": "Not found"}) + + def _json_response(self, status: int, data: dict) -> None: + """Send JSON response.""" + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(data).encode("utf-8")) + + def log_message(self, fmt: str, *args: Any) -> None: + """Suppress default logging.""" + + +def _start_mock_jwt_server(port: int, private_key_pem: bytes, public_key_b64: str) -> HTTPServer: + """Start mock JWT HTTP server in background thread.""" + MockJWTHandler.private_key_pem = private_key_pem + MockJWTHandler.public_key_b64 = public_key_b64 + server = HTTPServer(("127.0.0.1", port), MockJWTHandler) + thread = Thread(target=server.serve_forever, daemon=True) + thread.start() + logger.debug("Mock JWT server started on port %s.", server.server_address[1]) + return server + + +# Docker image pre-pull +# --------------------------------------------------------------------------- +CONTAINER_IMAGES: List[str] = [ + "testcontainers/ryuk:0.8.1", + "postgres:16", + "confluentinc/cp-kafka:7.6.0", + "localstack/localstack:latest", +] + +_PULL_MAX_ATTEMPTS = 2 +_PULL_BACKOFF_SECONDS = [10, 30] + + +def _pull_image_with_retry(client: docker.DockerClient, image: str) -> Tuple[str, bool, str]: + """ + Pull a single Docker image with exponential-backoff retries. + + Returns: + Tuple of (image_name, success, message). + """ + for attempt in range(1, _PULL_MAX_ATTEMPTS + 1): + try: + logger.info("Pulling image %s (attempt %d/%d).", image, attempt, _PULL_MAX_ATTEMPTS) + client.images.pull(image) + logger.info("Image %s pulled successfully.", image) + return image, True, "ok" + except docker.errors.APIError as exc: + backoff = _PULL_BACKOFF_SECONDS[min(attempt - 1, len(_PULL_BACKOFF_SECONDS) - 1)] + logger.warning( + "Failed to pull %s (attempt %d/%d): %s. Retrying in %ds.", + image, + attempt, + _PULL_MAX_ATTEMPTS, + exc, + backoff, + ) + time.sleep(backoff) + return image, False, f"Failed to pull {image} after {_PULL_MAX_ATTEMPTS} attempts." + + +@pytest.fixture(scope="session", autouse=True) +def _prepull_images() -> None: + """Pre-pull all required Docker images in parallel before starting containers.""" + client = docker.from_env(timeout=300) + images_to_pull: List[str] = [] + for image in CONTAINER_IMAGES: + try: + client.images.get(image) + logger.debug("Image %s already available locally.", image) + except docker.errors.ImageNotFound: + images_to_pull.append(image) + + if not images_to_pull: + logger.debug("All container images already available.") + return + + logger.info("Pre-pulling %d Docker image(s) in parallel: %s.", len(images_to_pull), images_to_pull) + failures: List[str] = [] + with ThreadPoolExecutor(max_workers=len(images_to_pull)) as executor: + pull_tasks = {executor.submit(_pull_image_with_retry, client, img): img for img in images_to_pull} + for task in as_completed(pull_tasks): + image_name, success, message = task.result() + if not success: + failures.append(message) + + if failures: + pytest.exit("\n".join(["Docker image pre-pull failed:"] + failures), returncode=1) + + +# Container fixtures +# --------------------------------------------------------------------------- +def _convert_dsn(dsn: str) -> str: + """Convert SQLAlchemy DSN to psycopg2 format.""" + return dsn.replace("postgresql+psycopg2://", "postgresql://") + + +@pytest.fixture(scope="session") +def postgres_container() -> Generator[str, None, None]: + """PostgreSQL container with initialized schema.""" + logger.debug("Starting PostgreSQL container.") + container = PostgresContainer("postgres:16", dbname="eventgate") + + container.start() + dsn = _convert_dsn(container.get_connection_url()) + logger.debug("PostgreSQL started, initializing schema.") + + conn = psycopg2.connect(dsn) + conn.autocommit = True + with conn.cursor() as cursor: + cursor.execute(SCHEMA_SQL) + conn.close() + logger.debug("PostgreSQL schema initialized.") + + yield dsn + + container.stop() + logger.debug("PostgreSQL container stopped.") + + +@pytest.fixture(scope="session") +def kafka_container() -> Generator[str, None, None]: + """Kafka container with embedded Zookeeper.""" + logger.debug("Starting Kafka container.") + container = KafkaContainer() + + container.start() + bootstrap_server = container.get_bootstrap_server() + logger.debug("Kafka started at %s.", bootstrap_server) + + yield bootstrap_server + + container.stop() + logger.debug("Kafka container stopped.") + + +@pytest.fixture(scope="session") +def localstack_container() -> Generator[dict, None, None]: + """LocalStack container for EventBridge and Secrets Manager.""" + logger.debug("Starting LocalStack container.") + container = LocalStackContainer("localstack/localstack:latest") + container.with_services("events,secretsmanager") + + container.start() + url = container.get_url() + logger.debug("LocalStack started at %s.", url) + + yield {"url": url, "region": "us-east-1"} + + container.stop() + logger.debug("LocalStack container stopped.") + + +@pytest.fixture(scope="session") +def jwt_keypair() -> Dict[str, Any]: + """Generate RSA keypair for JWT signing.""" + return create_test_jwt_keypair() + + +@pytest.fixture(scope="session") +def mock_jwt_server(jwt_keypair: Dict[str, Any]) -> Generator[str, None, None]: + """In-process mock JWT provider server.""" + server = _start_mock_jwt_server( + 0, + jwt_keypair["private_key_pem"], + jwt_keypair["public_key_b64"], + ) + port = server.server_address[1] + url = f"http://127.0.0.1:{port}" + + # Wait for server to be ready. + for _ in range(10): + try: + req_lib.get(f"{url}/health", timeout=1) + break + except (ConnectionError, OSError): + time.sleep(0.1) + else: + pytest.fail("Mock JWT server failed to start within 1 second.") + + yield url + + server.shutdown() + logger.debug("Mock JWT server stopped.") + + +# Lambda handler fixture +# --------------------------------------------------------------------------- +@pytest.fixture(scope="session") +def lambda_handler_factory( + kafka_container: str, + postgres_container: str, + localstack_container: dict, + mock_jwt_server: str, +) -> Generator[Callable[[Dict[str, Any]], Dict[str, Any]], None, None]: + """Create lambda_handler with real container backends.""" + # Set environment variables for the Lambda. + os.environ["LOG_LEVEL"] = "DEBUG" + os.environ["AWS_ENDPOINT_URL"] = localstack_container["url"] + os.environ["AWS_DEFAULT_REGION"] = localstack_container["region"] + os.environ["AWS_ACCESS_KEY_ID"] = "test" + os.environ["AWS_SECRET_ACCESS_KEY"] = "test" + + # Store PostgreSQL credentials in LocalStack Secrets Manager so WriterPostgres can find them. + parsed_dsn = urlparse(postgres_container) + pg_secret = { + "database": parsed_dsn.path.lstrip("/"), + "host": parsed_dsn.hostname, + "port": parsed_dsn.port, + "user": parsed_dsn.username, + "password": parsed_dsn.password, + } + sm_client = boto3.client( + "secretsmanager", + endpoint_url=localstack_container["url"], + region_name=localstack_container["region"], + aws_access_key_id="test", + aws_secret_access_key="test", + ) + sm_client.create_secret(Name="eventgate/postgres", SecretString=json.dumps(pg_secret)) + os.environ["POSTGRES_SECRET_NAME"] = "eventgate/postgres" + os.environ["POSTGRES_SECRET_REGION"] = localstack_container["region"] + logger.debug("PostgreSQL secret stored in LocalStack Secrets Manager.") + + # Create test config with container URLs. + test_config_dir = PROJECT_ROOT / "tests" / "integration" / ".tmp_conf" + test_config_dir.mkdir(parents=True, exist_ok=True) + + # Copy access.json to test config dir. + access_config_src = PROJECT_ROOT / "conf" / "access.json" + access_config_dst = test_config_dir / "access.json" + access_config_dst.write_text(access_config_src.read_text(encoding="utf-8"), encoding="utf-8") + + # Copy topic_schemas to test config dir. + topic_schemas_src = PROJECT_ROOT / "conf" / "topic_schemas" + topic_schemas_dst = test_config_dir / "topic_schemas" + if topic_schemas_dst.exists(): + shutil.rmtree(topic_schemas_dst) + shutil.copytree(topic_schemas_src, topic_schemas_dst) + + test_config = { + "access_config": str(access_config_dst), + "token_provider_url": mock_jwt_server, + "token_public_keys_url": f"{mock_jwt_server}/keys", + "kafka_bootstrap_server": kafka_container, + "event_bus_arn": "arn:aws:events:us-east-1:000000000000:event-bus/default", + } + + test_config_file = test_config_dir / "config.json" + test_config_file.write_text(json.dumps(test_config, indent=2), encoding="utf-8") + + # Point CONF_DIR to our test config. + os.environ["CONF_DIR"] = str(test_config_dir) + + logger.debug("Test config written to %s.", test_config_file) + + # Import the lambda handler (this triggers initialization). + from src.event_gate_lambda import lambda_handler + + yield lambda_handler + + # Cleanup environment variables. + for key in ( + "LOG_LEVEL", + "AWS_ENDPOINT_URL", + "AWS_DEFAULT_REGION", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "POSTGRES_SECRET_NAME", + "POSTGRES_SECRET_REGION", + "CONF_DIR", + ): + os.environ.pop(key, None) + if test_config_file.exists(): + test_config_file.unlink() + if access_config_dst.exists(): + access_config_dst.unlink() + topic_schemas_dst = test_config_dir / "topic_schemas" + if topic_schemas_dst.exists(): + shutil.rmtree(topic_schemas_dst) + if test_config_dir.exists() and not any(test_config_dir.iterdir()): + test_config_dir.rmdir() + + +@pytest.fixture(scope="session") +def eventgate_client( + lambda_handler_factory: Callable[[Dict[str, Any]], Dict[str, Any]], +) -> "EventGateTestClient": + """EventGate test client that invokes lambda_handler directly.""" + return EventGateTestClient(lambda_handler_factory) + + +@pytest.fixture(scope="session") +def valid_token(jwt_keypair: Dict[str, Any]) -> str: + """Valid JWT token for IntegrationTestUser.""" + return generate_token(jwt_keypair["private_key_pem"], "IntegrationTestUser") + + +# Test client (direct lambda invocation) +# --------------------------------------------------------------------------- +class EventGateTestClient: + """Test client that invokes lambda_handler directly.""" + + def __init__(self, handler: Callable[[Dict[str, Any]], Dict[str, Any]]): + """Initialize with lambda handler function.""" + self._handler = handler + + def invoke( + self, + resource: str, + method: str = "GET", + body: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + path_parameters: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + """Invoke lambda_handler with API Gateway proxy event format.""" + event = { + "resource": resource, + "httpMethod": method, + "headers": headers or {}, + "body": json.dumps(body) if body else None, + "pathParameters": path_parameters, + } + logger.debug("Invoking Lambda: %s %s.", method, resource) + result = self._handler(event) + logger.debug("Lambda response: statusCode=%s.", result.get("statusCode")) + return result + + def get_api(self) -> Dict[str, Any]: + """Get OpenAPI specification.""" + return self.invoke("/api", "GET") + + def get_token(self) -> Dict[str, Any]: + """Get token provider info.""" + return self.invoke("/token", "GET") + + def get_health(self) -> Dict[str, Any]: + """Get health status.""" + return self.invoke("/health", "GET") + + def get_topics(self) -> Dict[str, Any]: + """Get list of topics.""" + return self.invoke("/topics", "GET") + + def get_topic_schema(self, topic_name: str) -> Dict[str, Any]: + """Get schema for a specific topic.""" + return self.invoke( + "/topics/{topic_name}", + "GET", + path_parameters={"topic_name": topic_name}, + ) + + def post_event( + self, + topic_name: str, + event_data: Dict[str, Any], + token: Optional[str] = None, + ) -> Dict[str, Any]: + """Post an event to a topic.""" + headers = {} + if token: + headers["Authorization"] = f"Bearer {token}" + return self.invoke( + "/topics/{topic_name}", + "POST", + body=event_data, + headers=headers, + path_parameters={"topic_name": topic_name}, + ) diff --git a/tests/integration/schemas/__init__.py b/tests/integration/schemas/__init__.py new file mode 100644 index 0000000..ebfbdd3 --- /dev/null +++ b/tests/integration/schemas/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/integration/schemas/postgres_schema.py b/tests/integration/schemas/postgres_schema.py new file mode 100644 index 0000000..482fabb --- /dev/null +++ b/tests/integration/schemas/postgres_schema.py @@ -0,0 +1,43 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PostgreSQL schema for integration tests.""" + +SCHEMA_SQL = """ +-- Table matching WriterPostgres._postgres_run_write columns +CREATE TABLE IF NOT EXISTS public_cps_za_runs ( + event_id VARCHAR(255) NOT NULL, + job_ref VARCHAR(255) NOT NULL, + tenant_id VARCHAR(255) NOT NULL, + source_app VARCHAR(255) NOT NULL, + source_app_version VARCHAR(255) NOT NULL, + environment VARCHAR(255) NOT NULL, + timestamp_start BIGINT, + timestamp_end BIGINT +); + +-- Table matching WriterPostgres._postgres_run_write job rows +CREATE TABLE IF NOT EXISTS public_cps_za_runs_jobs ( + event_id VARCHAR(255) NOT NULL, + country VARCHAR(255), + catalog_id VARCHAR(255) NOT NULL, + status VARCHAR(50) NOT NULL, + timestamp_start BIGINT, + timestamp_end BIGINT, + message TEXT, + additional_info JSONB +); +""" diff --git a/tests/integration/test_api_endpoint.py b/tests/integration/test_api_endpoint.py new file mode 100644 index 0000000..195f116 --- /dev/null +++ b/tests/integration/test_api_endpoint.py @@ -0,0 +1,45 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for GET /api endpoint.""" + +from tests.integration.conftest import EventGateTestClient + + +class TestApiEndpoint: + """Tests for the /api endpoint.""" + + def test_get_api_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /api returns successful response.""" + response = eventgate_client.get_api() + + assert 200 == response["statusCode"] + + def test_get_api_returns_openapi_content_type(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /api returns correct content type for OpenAPI.""" + response = eventgate_client.get_api() + + assert "Content-Type" in response["headers"] + assert "application/yaml" in response["headers"]["Content-Type"] + + def test_get_api_body_contains_openapi_spec(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /api returns valid OpenAPI specification.""" + response = eventgate_client.get_api() + + body = response["body"] + assert "openapi:" in body + assert "paths:" in body + assert "/topics" in body diff --git a/tests/integration/test_health_endpoint.py b/tests/integration/test_health_endpoint.py new file mode 100644 index 0000000..3e54464 --- /dev/null +++ b/tests/integration/test_health_endpoint.py @@ -0,0 +1,47 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for GET /health endpoint.""" + +import json + +from tests.integration.conftest import EventGateTestClient + + +class TestHealthEndpoint: + """Tests for the /health endpoint.""" + + def test_get_health_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /health returns successful response.""" + response = eventgate_client.get_health() + + assert 200 == response["statusCode"] + + def test_get_health_status_ok(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /health returns ok status when all writers healthy.""" + response = eventgate_client.get_health() + + body = json.loads(response["body"]) + assert "ok" == body["status"] + + def test_get_health_includes_uptime(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /health includes uptime in response.""" + response = eventgate_client.get_health() + + body = json.loads(response["body"]) + assert "uptime_seconds" in body + assert isinstance(body["uptime_seconds"], (int, float)) + assert body["uptime_seconds"] >= 0 diff --git a/tests/integration/test_token_endpoint.py b/tests/integration/test_token_endpoint.py new file mode 100644 index 0000000..1083a91 --- /dev/null +++ b/tests/integration/test_token_endpoint.py @@ -0,0 +1,36 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for GET /token endpoint.""" + +from tests.integration.conftest import EventGateTestClient + + +class TestTokenEndpoint: + """Tests for the /token endpoint.""" + + def test_get_token_returns_redirect(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /token returns 303 redirect.""" + response = eventgate_client.get_token() + + assert 303 == response["statusCode"] + + def test_get_token_has_location_header(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /token includes Location header.""" + response = eventgate_client.get_token() + + assert "Location" in response["headers"] + assert response["headers"]["Location"] diff --git a/tests/integration/test_topics_endpoint.py b/tests/integration/test_topics_endpoint.py new file mode 100644 index 0000000..c730478 --- /dev/null +++ b/tests/integration/test_topics_endpoint.py @@ -0,0 +1,238 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for /topics endpoints.""" + +import json +import time +import uuid +from typing import Any, Dict + +import boto3 +import psycopg2 +import pytest +from confluent_kafka import Consumer + +from tests.integration.conftest import EventGateTestClient +from tests.integration.utils.jwt_helper import generate_token +from tests.integration.utils.utils import create_runs_event + + +class TestTopicsListEndpoint: + """Tests for GET /topics endpoint.""" + + def test_get_topics_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics returns successful response.""" + response = eventgate_client.get_topics() + + assert 200 == response["statusCode"] + + def test_get_topics_includes_list_of_test_topic(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics includes the test topic.""" + response = eventgate_client.get_topics() + + body = json.loads(response["body"]) + assert isinstance(body, list) + assert "public.cps.za.test" in body + + +class TestTopicSchemaEndpoint: + """Tests for GET /topics/{topic_name} endpoint.""" + + def test_get_topic_schema_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics/{topic_name} returns schema.""" + response = eventgate_client.get_topic_schema("public.cps.za.test") + + assert 200 == response["statusCode"] + + def test_get_topic_schema_contains_properties(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics/{topic_name} returns valid JSON schema.""" + response = eventgate_client.get_topic_schema("public.cps.za.test") + + body = json.loads(response["body"]) + assert "object" == body["type"] + assert "properties" in body + assert "event_id" in body["properties"] + + def test_get_topic_schema_nonexistent_returns_404(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics/{topic_name} returns 404 for nonexistent topic.""" + response = eventgate_client.get_topic_schema("nonexistent.topic") + + assert 404 == response["statusCode"] + + +class TestPostEventEndpoint: + """Tests for POST /topics/{topic_name} endpoint.""" + + @pytest.fixture + def valid_event(self) -> dict: + """Generate a valid run event.""" + return create_runs_event( + event_id=str(uuid.uuid4()), + job_ref="spark-app-001", + tenant_id="TEST", + source_app="integration-test", + catalog_id="db.schema.table", + ) + + def test_post_event_without_token_returns_401( + self, eventgate_client: EventGateTestClient, valid_event: dict + ) -> None: + """Test POST without JWT returns 401.""" + response = eventgate_client.post_event( + "public.cps.za.runs", + valid_event, + token=None, + ) + + assert 401 == response["statusCode"] + + def test_post_event_with_valid_token_returns_202( + self, eventgate_client: EventGateTestClient, valid_event: dict, valid_token: str + ) -> None: + """Test POST with valid JWT returns accepted status.""" + response = eventgate_client.post_event( + "public.cps.za.runs", + valid_event, + token=valid_token, + ) + + assert 202 == response["statusCode"] + + def test_post_event_invalid_schema_returns_400( + self, eventgate_client: EventGateTestClient, valid_token: str + ) -> None: + """Test POST with invalid event schema returns 400.""" + invalid_event = {"invalid_field": "value"} + + response = eventgate_client.post_event( + "public.cps.za.runs", + invalid_event, + token=valid_token, + ) + + assert 400 == response["statusCode"] + + def test_post_event_unauthorized_user_returns_403( + self, eventgate_client: EventGateTestClient, valid_event: dict, jwt_keypair: Dict[str, Any] + ) -> None: + """Test POST from unauthorized user returns 403.""" + unauthorized_token = generate_token(jwt_keypair["private_key_pem"], "UnauthorizedUser") + response = eventgate_client.post_event( + "public.cps.za.runs", + valid_event, + token=unauthorized_token, + ) + + assert 403 == response["statusCode"] + + def test_post_event_nonexistent_topic_returns_404( + self, eventgate_client: EventGateTestClient, valid_event: dict, valid_token: str + ) -> None: + """Test POST to nonexistent topic returns 404.""" + response = eventgate_client.post_event( + "nonexistent.topic", + valid_event, + token=valid_token, + ) + + assert 404 == response["statusCode"] + + +class TestPostEventWriterVerification: + """Verify events are dispatched after a successful POST.""" + + @pytest.fixture(scope="class") + def posted_event(self, eventgate_client: EventGateTestClient, valid_token: str) -> Dict[str, Any]: + """Post a unique runs event and return its payload for downstream assertions.""" + event = create_runs_event( + event_id=str(uuid.uuid4()), + job_ref="spark-verify-001", + tenant_id="VERIFY", + source_app="integration-test-verify", + catalog_id="db.schema.verify_table", + ) + response = eventgate_client.post_event("public.cps.za.runs", event, token=valid_token) + assert 202 == response["statusCode"] + return event + + def test_event_received_by_kafka(self, posted_event: Dict[str, Any], kafka_container: str) -> None: + """Verify the posted event is consumable from the Kafka topic.""" + consumer = Consumer( + { + "bootstrap.servers": kafka_container, + "group.id": f"test-verify-{uuid.uuid4()}", + "auto.offset.reset": "earliest", + } + ) + consumer.subscribe(["public.cps.za.runs"]) + + try: + messages = [] + deadline = time.time() + 10 + while time.time() < deadline: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + continue + messages.append(json.loads(msg.value().decode("utf-8"))) + if any(m["event_id"] == posted_event["event_id"] for m in messages): + break + + matched = [m for m in messages if m["event_id"] == posted_event["event_id"]] + assert 1 == len(matched) + assert posted_event["tenant_id"] == matched[0]["tenant_id"] + finally: + consumer.close() + + def test_eventbridge_bus_reachable(self, localstack_container: dict) -> None: + """Verify EventBridge event bus exists and is reachable after event dispatch.""" + client = boto3.client( + "events", + endpoint_url=localstack_container["url"], + region_name=localstack_container["region"], + aws_access_key_id="test", + aws_secret_access_key="test", + ) + response = client.describe_event_bus(Name="default") + assert "Arn" in response + + def test_event_received_by_postgres(self, posted_event: Dict[str, Any], postgres_container: str) -> None: + """Verify the posted event was inserted into the PostgreSQL runs tables.""" + conn = psycopg2.connect(postgres_container) + try: + with conn.cursor() as cursor: + cursor.execute( + "SELECT event_id, tenant_id FROM public_cps_za_runs WHERE event_id = %s", + (posted_event["event_id"],), + ) + run_row = cursor.fetchone() + assert run_row is not None + assert posted_event["event_id"] == run_row[0] + assert posted_event["tenant_id"] == run_row[1] + + cursor.execute( + "SELECT event_id, catalog_id, status FROM public_cps_za_runs_jobs WHERE event_id = %s", + (posted_event["event_id"],), + ) + job_row = cursor.fetchone() + assert job_row is not None + assert posted_event["event_id"] == job_row[0] + assert posted_event["jobs"][0]["catalog_id"] == job_row[1] + assert posted_event["jobs"][0]["status"] == job_row[2] + finally: + conn.close() diff --git a/tests/integration/utils/__init__.py b/tests/integration/utils/__init__.py new file mode 100644 index 0000000..ebfbdd3 --- /dev/null +++ b/tests/integration/utils/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/integration/utils/jwt_helper.py b/tests/integration/utils/jwt_helper.py new file mode 100644 index 0000000..1d7780c --- /dev/null +++ b/tests/integration/utils/jwt_helper.py @@ -0,0 +1,87 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""JWT token generation utilities for integration tests.""" + +import base64 +import logging +from datetime import datetime, timedelta, timezone +from typing import Any, Dict + +import jwt +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +logger = logging.getLogger(__name__) + + +def create_test_jwt_keypair() -> Dict[str, Any]: + """ + Generate RSA keypair for JWT signing. + + Returns: + Dictionary with private_key_pem (bytes) and public_key_b64 (str). + """ + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + public_key = private_key.public_key() + + private_key_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + public_key_der = public_key.public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + public_key_b64 = base64.b64encode(public_key_der).decode("utf-8") + + logger.debug("Generated RSA keypair for JWT signing.") + + return { + "private_key_pem": private_key_pem, + "public_key_b64": public_key_b64, + } + + +def generate_token( + private_key_pem: bytes, + username: str, + exp_minutes: int = 30, +) -> str: + """ + Generate a signed JWT token. + + Args: + private_key_pem: PEM-encoded private key bytes. + username: Username to include in 'sub' claim. + exp_minutes: Token expiration time in minutes. + + Returns: + Signed JWT token string. + """ + now = datetime.now(timezone.utc) + payload = { + "sub": username, + "iat": now, + "exp": now + timedelta(minutes=exp_minutes), + } + + token = jwt.encode(payload, private_key_pem, algorithm="RS256") + logger.debug("Generated JWT token for user %s.", username) + + return token diff --git a/tests/integration/utils/utils.py b/tests/integration/utils/utils.py new file mode 100644 index 0000000..5dd4b1e --- /dev/null +++ b/tests/integration/utils/utils.py @@ -0,0 +1,48 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utility functions for the integration testing.""" +import time +from typing import Dict, Any + + +def create_runs_event( + event_id: str, + job_ref: str, + tenant_id: str, + source_app: str, + catalog_id: str, +) -> Dict[str, Any]: + """Create a runs event with standard structure.""" + now = int(time.time() * 1000) + return { + "event_id": event_id, + "job_ref": job_ref, + "tenant_id": tenant_id, + "source_app": source_app, + "source_app_version": "1.0.0", + "environment": "test", + "timestamp_start": now - 60000, + "timestamp_end": now, + "jobs": [ + { + "catalog_id": catalog_id, + "status": "succeeded", + "timestamp_start": now - 60000, + "timestamp_end": now, + } + ], + } diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..ebfbdd3 --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/conftest.py b/tests/unit/conftest.py similarity index 100% rename from tests/conftest.py rename to tests/unit/conftest.py diff --git a/tests/handlers/__init__.py b/tests/unit/handlers/__init__.py similarity index 100% rename from tests/handlers/__init__.py rename to tests/unit/handlers/__init__.py diff --git a/tests/handlers/test_handler_api.py b/tests/unit/handlers/test_handler_api.py similarity index 100% rename from tests/handlers/test_handler_api.py rename to tests/unit/handlers/test_handler_api.py diff --git a/tests/handlers/test_handler_health.py b/tests/unit/handlers/test_handler_health.py similarity index 100% rename from tests/handlers/test_handler_health.py rename to tests/unit/handlers/test_handler_health.py diff --git a/tests/handlers/test_handler_token.py b/tests/unit/handlers/test_handler_token.py similarity index 100% rename from tests/handlers/test_handler_token.py rename to tests/unit/handlers/test_handler_token.py diff --git a/tests/handlers/test_handler_topic.py b/tests/unit/handlers/test_handler_topic.py similarity index 100% rename from tests/handlers/test_handler_topic.py rename to tests/unit/handlers/test_handler_topic.py diff --git a/tests/test_conf_validation.py b/tests/unit/test_conf_validation.py similarity index 99% rename from tests/test_conf_validation.py rename to tests/unit/test_conf_validation.py index da001b1..232e968 100644 --- a/tests/test_conf_validation.py +++ b/tests/unit/test_conf_validation.py @@ -1,5 +1,3 @@ -import os - # # Copyright 2025 ABSA Group Limited # @@ -15,11 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # + import json +import os from glob import glob import pytest -CONF_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "conf") +CONF_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), "../conf") REQUIRED_CONFIG_KEYS = { "access_config", diff --git a/tests/test_event_gate_lambda.py b/tests/unit/test_event_gate_lambda.py similarity index 100% rename from tests/test_event_gate_lambda.py rename to tests/unit/test_event_gate_lambda.py diff --git a/tests/test_event_gate_lambda_local_access.py b/tests/unit/test_event_gate_lambda_local_access.py similarity index 100% rename from tests/test_event_gate_lambda_local_access.py rename to tests/unit/test_event_gate_lambda_local_access.py diff --git a/tests/utils/__init__.py b/tests/unit/utils/__init__.py similarity index 100% rename from tests/utils/__init__.py rename to tests/unit/utils/__init__.py diff --git a/tests/utils/test_conf_path.py b/tests/unit/utils/test_conf_path.py similarity index 100% rename from tests/utils/test_conf_path.py rename to tests/unit/utils/test_conf_path.py diff --git a/tests/utils/test_safe_serialization.py b/tests/unit/utils/test_safe_serialization.py similarity index 100% rename from tests/utils/test_safe_serialization.py rename to tests/unit/utils/test_safe_serialization.py diff --git a/tests/utils/test_trace_logging.py b/tests/unit/utils/test_trace_logging.py similarity index 100% rename from tests/utils/test_trace_logging.py rename to tests/unit/utils/test_trace_logging.py diff --git a/tests/utils/test_utils.py b/tests/unit/utils/test_utils.py similarity index 100% rename from tests/utils/test_utils.py rename to tests/unit/utils/test_utils.py diff --git a/tests/writers/__init__.py b/tests/unit/writers/__init__.py similarity index 100% rename from tests/writers/__init__.py rename to tests/unit/writers/__init__.py diff --git a/tests/writers/test_writer_eventbridge.py b/tests/unit/writers/test_writer_eventbridge.py similarity index 100% rename from tests/writers/test_writer_eventbridge.py rename to tests/unit/writers/test_writer_eventbridge.py diff --git a/tests/writers/test_writer_kafka.py b/tests/unit/writers/test_writer_kafka.py similarity index 100% rename from tests/writers/test_writer_kafka.py rename to tests/unit/writers/test_writer_kafka.py diff --git a/tests/writers/test_writer_postgres.py b/tests/unit/writers/test_writer_postgres.py similarity index 100% rename from tests/writers/test_writer_postgres.py rename to tests/unit/writers/test_writer_postgres.py