From 6f098048485f1130d5cef3aa6f2cd3691045bac7 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Mon, 2 Feb 2026 14:44:21 +0100 Subject: [PATCH 1/6] New test structure following the integration test implementation. --- .github/workflows/check_python.yml | 2 +- DEVELOPER.md | 24 +++++++++++-------- README.md | 5 ++-- tests/integration/__init__.py | 15 ++++++++++++ tests/unit/__init__.py | 15 ++++++++++++ tests/{ => unit}/conftest.py | 0 tests/{ => unit}/handlers/__init__.py | 0 tests/{ => unit}/handlers/test_handler_api.py | 0 .../handlers/test_handler_health.py | 0 .../{ => unit}/handlers/test_handler_token.py | 0 .../{ => unit}/handlers/test_handler_topic.py | 0 tests/{ => unit}/test_conf_validation.py | 6 ++--- tests/{ => unit}/test_event_gate_lambda.py | 0 .../test_event_gate_lambda_local_access.py | 0 tests/{ => unit}/utils/__init__.py | 0 tests/{ => unit}/utils/test_conf_path.py | 0 .../utils/test_safe_serialization.py | 0 tests/{ => unit}/utils/test_trace_logging.py | 0 tests/{ => unit}/utils/test_utils.py | 0 tests/{ => unit}/writers/__init__.py | 0 .../writers/test_writer_eventbridge.py | 0 tests/{ => unit}/writers/test_writer_kafka.py | 0 .../writers/test_writer_postgres.py | 0 23 files changed, 50 insertions(+), 17 deletions(-) create mode 100644 tests/integration/__init__.py create mode 100644 tests/unit/__init__.py rename tests/{ => unit}/conftest.py (100%) rename tests/{ => unit}/handlers/__init__.py (100%) rename tests/{ => unit}/handlers/test_handler_api.py (100%) rename tests/{ => unit}/handlers/test_handler_health.py (100%) rename tests/{ => unit}/handlers/test_handler_token.py (100%) rename tests/{ => unit}/handlers/test_handler_topic.py (100%) rename tests/{ => unit}/test_conf_validation.py (99%) rename tests/{ => unit}/test_event_gate_lambda.py (100%) rename tests/{ => unit}/test_event_gate_lambda_local_access.py (100%) rename tests/{ => unit}/utils/__init__.py (100%) rename tests/{ => unit}/utils/test_conf_path.py (100%) rename tests/{ => unit}/utils/test_safe_serialization.py (100%) rename tests/{ => unit}/utils/test_trace_logging.py (100%) rename tests/{ => unit}/utils/test_utils.py (100%) rename tests/{ => unit}/writers/__init__.py (100%) rename tests/{ => unit}/writers/test_writer_eventbridge.py (100%) rename tests/{ => unit}/writers/test_writer_kafka.py (100%) rename tests/{ => unit}/writers/test_writer_postgres.py (100%) diff --git a/.github/workflows/check_python.yml b/.github/workflows/check_python.yml index d0f3f5d..34fbe29 100644 --- a/.github/workflows/check_python.yml +++ b/.github/workflows/check_python.yml @@ -132,7 +132,7 @@ jobs: run: pip install -r requirements.txt - name: Check code coverage with Pytest - run: pytest --cov=. -v tests/ --cov-fail-under=80 + run: pytest --cov=. -v tests/unit/ --cov-fail-under=80 mypy-check: name: Mypy Type Check diff --git a/DEVELOPER.md b/DEVELOPER.md index ccf04d5..2cdc5c0 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -5,8 +5,9 @@ - [Run Pylint Tool Locally](#run-pylint-tool-locally) - [Run Black Tool Locally](#run-black-tool-locally) - [Run mypy Tool Locally](#run-mypy-tool-locally) -- [Run Unit Test](#running-unit-test) +- [Run Unit Test Locally](#run-unit-test-locally) - [Code Coverage](#code-coverage) +- [Run Integration Test Locally](#run-integration-test-locally) ## Get Started @@ -45,7 +46,7 @@ To run Pylint on a specific file, follow the pattern `pylint /// Date: Tue, 10 Feb 2026 09:51:34 +0100 Subject: [PATCH 2/6] Integration tests. --- .github/copilot-instructions.md | 46 ++ .github/workflows/check_python.yml | 40 +- DEVELOPER.md | 61 ++- conf/access.json | 3 +- requirements.txt | 4 +- tests/integration/__init__.py | 2 +- tests/integration/conftest.py | 432 +++++++++++++++++++ tests/integration/schemas/__init__.py | 15 + tests/integration/schemas/postgres_schema.py | 29 ++ tests/integration/test_api_endpoint.py | 45 ++ tests/integration/test_health_endpoint.py | 47 ++ tests/integration/test_token_endpoint.py | 36 ++ tests/integration/test_topics_endpoint.py | 228 ++++++++++ tests/integration/utils/__init__.py | 15 + tests/integration/utils/jwt_helper.py | 87 ++++ 15 files changed, 1079 insertions(+), 11 deletions(-) create mode 100644 .github/copilot-instructions.md create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/schemas/__init__.py create mode 100644 tests/integration/schemas/postgres_schema.py create mode 100644 tests/integration/test_api_endpoint.py create mode 100644 tests/integration/test_health_endpoint.py create mode 100644 tests/integration/test_token_endpoint.py create mode 100644 tests/integration/test_topics_endpoint.py create mode 100644 tests/integration/utils/__init__.py create mode 100644 tests/integration/utils/jwt_helper.py diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md new file mode 100644 index 0000000..51de70f --- /dev/null +++ b/.github/copilot-instructions.md @@ -0,0 +1,46 @@ +Copilot instructions for EventGate + +Purpose +AWS Lambda event gateway that receives messages via API Gateway and dispatches them to Kafka, EventBridge, and PostgreSQL. + +Structure +- Entry point: `src/event_gate_lambda.py` +- Handlers: `src/handlers/` (HandlerApi, HandlerToken, HandlerTopic, HandlerHealth) +- Writers: `src/writers/` (inherit from `Writer` base class) +- Config: `conf/config.json`, `conf/access.json`, `conf/topic_schemas/*.json` +- Production Terraform scripts are not part of this repository; `terraform_examples/` for reference configurations only + +Python style +- Python 3.13 +- Type hints for public functions and classes +- Use `logging.getLogger(__name__)`, not print +- Lazy % formatting in logging: `logger.info("msg %s", var)` +- F-strings in exceptions: `raise ValueError(f"Error {var}")` +- All imports at top of file (never inside functions) +- Apache 2.0 license header in every .py file (including `__init__.py`) +- Docstrings must start with a short summary line +- End all log messages with a period: `logger.info("Message.")` + +Patterns +- `__init__` methods must not raise exceptions; defer validation and connection to first use (lazy init) +- Writers: inherit from `Writer(ABC)`, implement `write(topic, message) -> (bool, str|None)` and `check_health() -> (bool, str)` +- Route dispatch via `ROUTE_MAP` dict mapping routes to handler functions in `event_gate_lambda.py` +- Separate business logic from environment access (env vars, file I/O, network calls) +- No duplicate validation; centralize parsing in one layer where practical +- Preserve existing formatting and conventions +- Keep API Gateway response structure stable: `{"statusCode": int, "headers": {...}, "body": "..."}` +- Keep error response format stable: `{"success": false, "statusCode": int, "errors": [...]}` + +Testing +- Mirror src structure: `src/handlers/` -> `tests/handlers/` +- Unit tests: mock external services via `conftest.py` (Kafka, EventBridge, PostgreSQL, S3) +- Integration tests: call `lambda_handler` directly with real containers (testcontainers-python for Kafka, PostgreSQL, LocalStack) +- No real API/DB calls in unit tests +- Use `mocker.patch("module.dependency")` or `mocker.patch.object(Class, "method")` +- Assert pattern: `assert expected == actual` + +Quality gates (run after changes, fix only if below threshold) +- black . +- mypy . +- pylint $(git ls-files '*.py') >= 9.5 +- pytest tests/ >= 80% coverage diff --git a/.github/workflows/check_python.yml b/.github/workflows/check_python.yml index 34fbe29..20820ab 100644 --- a/.github/workflows/check_python.yml +++ b/.github/workflows/check_python.yml @@ -111,7 +111,32 @@ jobs: id: check-format run: black --check $(git ls-files '*.py') - pytest-test: + mypy-check: + name: Mypy Type Check + needs: detect + if: needs.detect.outputs.python_changed == 'true' + runs-on: ubuntu-latest + steps: + - name: Checkout repository + uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 + with: + persist-credentials: false + fetch-depth: 0 + + - name: Set up Python + uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 + with: + python-version: '3.13' + cache: 'pip' + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Check types with Mypy + id: check-types + run: mypy . + + unit-tests: name: Pytest Unit Tests with Coverage needs: detect if: needs.detect.outputs.python_changed == 'true' @@ -123,7 +148,8 @@ jobs: persist-credentials: false fetch-depth: 0 - - uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 + - name: Set up Python + uses: actions/setup-python@83679a892e2d95755f2dac6acb0bfd1e9ac5d548 with: python-version: '3.13' cache: 'pip' @@ -134,11 +160,12 @@ jobs: - name: Check code coverage with Pytest run: pytest --cov=. -v tests/unit/ --cov-fail-under=80 - mypy-check: - name: Mypy Type Check + integration-tests: + name: Pytest Integration Tests needs: detect if: needs.detect.outputs.python_changed == 'true' runs-on: ubuntu-latest + timeout-minutes: 15 steps: - name: Checkout repository uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 @@ -155,9 +182,8 @@ jobs: - name: Install dependencies run: pip install -r requirements.txt - - name: Check types with Mypy - id: check-types - run: mypy . + - name: Run integration tests + run: pytest tests/integration/ -v --tb=short --log-cli-level=INFO noop: name: No Operation diff --git a/DEVELOPER.md b/DEVELOPER.md index 2cdc5c0..82b8ab8 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -144,4 +144,63 @@ open htmlcov/index.html ``` ## Run Integration Test Locally -TODO + +Integration tests validate EventGate against real service dependencies using testcontainers-python. + +### Integration Test Approach + +EventGate uses a **direct invocation approach** for integration testing: +- **Lambda handler is called directly** in Python (not run in a container) +- **External dependencies run in Docker containers**: Kafka, PostgreSQL, LocalStack (EventBridge) +- **Mock JWT provider runs in-process** as a background thread (no container) +- Test configuration is dynamically generated and injected via environment variables + +This approach is faster (~8s vs 30s+), more reliable, and easier to debug than container-based Lambda testing. + +### Prerequisites +- Docker running (Docker Desktop on macOS/Windows, or Docker Engine on Linux) +- Python 3.13 with dependencies installed + +### Run Integration Tests + +Containers start and stop automatically: +```shell +pytest tests/integration/ -v +``` + +With detailed logging: +```shell +pytest tests/integration/ -v --log-cli-level=INFO +``` + +### Run Specific Integration Tests + +Run a single test file: +```shell +pytest tests/integration/test_health_endpoint.py -v +``` + +Run a specific test function: +```shell +pytest tests/integration/test_topics_endpoint.py::TestPostEventEndpoint::test_post_event_with_valid_token_returns_202 -v +``` + +### Troubleshooting + +If containers fail to start, check Docker is running: +```shell +docker info +``` + +If image pulls fail with TLS or timeout errors, pre-pull the required images manually: +```shell +docker pull testcontainers/ryuk:0.8.1 +docker pull postgres:16 +docker pull confluentinc/cp-kafka:7.6.0 +docker pull localstack/localstack:latest +``` + +View container logs in pytest output by increasing log level: +```shell +pytest tests/integration/ -v --log-cli-level=DEBUG +``` diff --git a/conf/access.json b/conf/access.json index 39afa63..f158758 100644 --- a/conf/access.json +++ b/conf/access.json @@ -7,6 +7,7 @@ "BarUser" ], "public.cps.za.test": [ - "TestUser" + "TestUser", + "IntegrationTestUser" ] } \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 40b697a..21de79a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -pytest==8.4.2 +pytest==9.0.2 pytest-cov==6.3.0 pytest-mock==3.15.0 pylint==3.3.8 @@ -12,5 +12,7 @@ PyJWT==2.10.1 requests==2.32.5 boto3==1.40.25 confluent-kafka==2.12.1 +testcontainers==4.9.0 +docker==7.1.0 # psycopg2-binary==2.9.10 # Ideal for local development, but not for long-term production use psycopg2==2.9.10 diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index ebfbdd3..f7115cb 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2026 ABSA Group Limited +# Copyright 2025 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..f1bd710 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,432 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Pytest fixtures for EventGate integration tests using testcontainers.""" + +import json +import logging +import os +import shutil +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from http.server import HTTPServer, BaseHTTPRequestHandler +from pathlib import Path +from threading import Thread +from typing import Any, Callable, Dict, Generator, List, Optional, Tuple +from urllib.parse import urlparse + +import boto3 +import docker +import psycopg2 +import pytest +import requests as req_lib +from testcontainers.kafka import KafkaContainer +from testcontainers.localstack import LocalStackContainer +from testcontainers.postgres import PostgresContainer + +from tests.integration.schemas.postgres_schema import SCHEMA_SQL +from tests.integration.utils.jwt_helper import create_test_jwt_keypair, generate_token + +logger = logging.getLogger(__name__) + +PROJECT_ROOT = Path(__file__).parent.parent.parent + + +# Mock JWT Provider (runs in-process via threading) +# --------------------------------------------------------------------------- +class MockJWTHandler(BaseHTTPRequestHandler): + """HTTP handler for mock JWT provider.""" + + private_key_pem: bytes = b"" + public_key_b64: str = "" + + def do_GET(self) -> None: + """Handle GET requests.""" + if self.path == "/keys": + self._json_response(200, {"keys": [{"key": self.public_key_b64}]}) + elif self.path == "/health": + self._json_response(200, {"status": "ok"}) + elif self.path == "/private-key": + self.send_response(200) + self.send_header("Content-Type", "application/x-pem-file") + self.end_headers() + self.wfile.write(self.private_key_pem) + elif self.path == "/": + self.send_response(303) + self.send_header("Location", "http://localhost/login") + self.end_headers() + else: + self._json_response(404, {"error": "Not found"}) + + def _json_response(self, status: int, data: dict) -> None: + """Send JSON response.""" + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps(data).encode("utf-8")) + + def log_message(self, fmt: str, *args: Any) -> None: + """Suppress default logging.""" + + +def _start_mock_jwt_server(port: int, private_key_pem: bytes, public_key_b64: str) -> HTTPServer: + """Start mock JWT HTTP server in background thread.""" + MockJWTHandler.private_key_pem = private_key_pem + MockJWTHandler.public_key_b64 = public_key_b64 + server = HTTPServer(("127.0.0.1", port), MockJWTHandler) + thread = Thread(target=server.serve_forever, daemon=True) + thread.start() + logger.debug("Mock JWT server started on port %s.", server.server_address[1]) + return server + + +# Docker image pre-pull +# --------------------------------------------------------------------------- +CONTAINER_IMAGES: List[str] = [ + "testcontainers/ryuk:0.8.1", + "postgres:16", + "confluentinc/cp-kafka:7.6.0", + "localstack/localstack:latest", +] + +_PULL_MAX_ATTEMPTS = 2 +_PULL_BACKOFF_SECONDS = [10, 30] + + +def _pull_image_with_retry(client: docker.DockerClient, image: str) -> Tuple[str, bool, str]: + """ + Pull a single Docker image with exponential-backoff retries. + + Returns: + Tuple of (image_name, success, message). + """ + for attempt in range(1, _PULL_MAX_ATTEMPTS + 1): + try: + logger.info("Pulling image %s (attempt %d/%d).", image, attempt, _PULL_MAX_ATTEMPTS) + client.images.pull(image) + logger.info("Image %s pulled successfully.", image) + return image, True, "ok" + except docker.errors.APIError as exc: + backoff = _PULL_BACKOFF_SECONDS[min(attempt - 1, len(_PULL_BACKOFF_SECONDS) - 1)] + logger.warning( + "Failed to pull %s (attempt %d/%d): %s. Retrying in %ds.", + image, + attempt, + _PULL_MAX_ATTEMPTS, + exc, + backoff, + ) + time.sleep(backoff) + return image, False, f"Failed to pull {image} after {_PULL_MAX_ATTEMPTS} attempts." + + +@pytest.fixture(scope="session", autouse=True) +def _prepull_images() -> None: + """Pre-pull all required Docker images in parallel before starting containers.""" + client = docker.from_env(timeout=300) + images_to_pull: List[str] = [] + for image in CONTAINER_IMAGES: + try: + client.images.get(image) + logger.debug("Image %s already available locally.", image) + except docker.errors.ImageNotFound: + images_to_pull.append(image) + + if not images_to_pull: + logger.debug("All container images already available.") + return + + logger.info("Pre-pulling %d Docker image(s) in parallel: %s.", len(images_to_pull), images_to_pull) + failures: List[str] = [] + with ThreadPoolExecutor(max_workers=len(images_to_pull)) as executor: + pull_tasks = {executor.submit(_pull_image_with_retry, client, img): img for img in images_to_pull} + for task in as_completed(pull_tasks): + image_name, success, message = task.result() + if not success: + failures.append(message) + + if failures: + pytest.exit("\n".join(["Docker image pre-pull failed:"] + failures), returncode=1) + + +# Container fixtures +# --------------------------------------------------------------------------- +def _convert_dsn(dsn: str) -> str: + """Convert SQLAlchemy DSN to psycopg2 format.""" + return dsn.replace("postgresql+psycopg2://", "postgresql://") + + +@pytest.fixture(scope="session") +def postgres_container() -> Generator[str, None, None]: + """PostgreSQL container with initialized schema.""" + logger.debug("Starting PostgreSQL container.") + container = PostgresContainer("postgres:16", dbname="eventgate") + + container.start() + dsn = _convert_dsn(container.get_connection_url()) + logger.debug("PostgreSQL started, initializing schema.") + + conn = psycopg2.connect(dsn) + conn.autocommit = True + with conn.cursor() as cursor: + cursor.execute(SCHEMA_SQL) + conn.close() + logger.debug("PostgreSQL schema initialized.") + + yield dsn + + container.stop() + logger.debug("PostgreSQL container stopped.") + + +@pytest.fixture(scope="session") +def kafka_container() -> Generator[str, None, None]: + """Kafka container with embedded Zookeeper.""" + logger.debug("Starting Kafka container.") + container = KafkaContainer() + + container.start() + bootstrap_server = container.get_bootstrap_server() + logger.debug("Kafka started at %s.", bootstrap_server) + + yield bootstrap_server + + container.stop() + logger.debug("Kafka container stopped.") + + +@pytest.fixture(scope="session") +def localstack_container() -> Generator[dict, None, None]: + """LocalStack container for EventBridge and Secrets Manager.""" + logger.debug("Starting LocalStack container.") + container = LocalStackContainer("localstack/localstack:latest") + container.with_services("events,secretsmanager") + + container.start() + url = container.get_url() + logger.debug("LocalStack started at %s.", url) + + yield {"url": url, "region": "us-east-1"} + + container.stop() + logger.debug("LocalStack container stopped.") + + +@pytest.fixture(scope="session") +def jwt_keypair() -> Dict[str, Any]: + """Generate RSA keypair for JWT signing.""" + return create_test_jwt_keypair() + + +@pytest.fixture(scope="session") +def mock_jwt_server(jwt_keypair: Dict[str, Any]) -> Generator[str, None, None]: + """In-process mock JWT provider server.""" + server = _start_mock_jwt_server( + 0, + jwt_keypair["private_key_pem"], + jwt_keypair["public_key_b64"], + ) + port = server.server_address[1] + url = f"http://127.0.0.1:{port}" + + # Wait for server to be ready. + for _ in range(10): + try: + req_lib.get(f"{url}/health", timeout=1) + break + except (ConnectionError, OSError): + time.sleep(0.1) + + yield url + + server.shutdown() + logger.debug("Mock JWT server stopped.") + + +# Lambda handler fixture +# --------------------------------------------------------------------------- +@pytest.fixture(scope="session") +def lambda_handler_factory( + kafka_container: str, + postgres_container: str, + localstack_container: dict, + mock_jwt_server: str, +) -> Generator[Callable[[Dict[str, Any]], Dict[str, Any]], None, None]: + """Create lambda_handler with real container backends.""" + # Set environment variables for the Lambda. + os.environ["LOG_LEVEL"] = "DEBUG" + os.environ["AWS_ENDPOINT_URL"] = localstack_container["url"] + os.environ["AWS_DEFAULT_REGION"] = localstack_container["region"] + os.environ["AWS_ACCESS_KEY_ID"] = "test" + os.environ["AWS_SECRET_ACCESS_KEY"] = "test" + + # Store PostgreSQL credentials in LocalStack Secrets Manager so WriterPostgres can find them. + parsed_dsn = urlparse(postgres_container) + pg_secret = { + "database": parsed_dsn.path.lstrip("/"), + "host": parsed_dsn.hostname, + "port": parsed_dsn.port, + "user": parsed_dsn.username, + "password": parsed_dsn.password, + } + sm_client = boto3.client( + "secretsmanager", + endpoint_url=localstack_container["url"], + region_name=localstack_container["region"], + aws_access_key_id="test", + aws_secret_access_key="test", + ) + sm_client.create_secret(Name="eventgate/postgres", SecretString=json.dumps(pg_secret)) + os.environ["POSTGRES_SECRET_NAME"] = "eventgate/postgres" + os.environ["POSTGRES_SECRET_REGION"] = localstack_container["region"] + logger.debug("PostgreSQL secret stored in LocalStack Secrets Manager.") + + # Create test config with container URLs. + test_config_dir = PROJECT_ROOT / "tests" / "integration" / ".tmp_conf" + test_config_dir.mkdir(parents=True, exist_ok=True) + + # Copy access.json to test config dir. + access_config_src = PROJECT_ROOT / "conf" / "access.json" + access_config_dst = test_config_dir / "access.json" + access_config_dst.write_text(access_config_src.read_text(encoding="utf-8"), encoding="utf-8") + + # Copy topic_schemas to test config dir. + topic_schemas_src = PROJECT_ROOT / "conf" / "topic_schemas" + topic_schemas_dst = test_config_dir / "topic_schemas" + if topic_schemas_dst.exists(): + shutil.rmtree(topic_schemas_dst) + shutil.copytree(topic_schemas_src, topic_schemas_dst) + + test_config = { + "access_config": str(access_config_dst), + "token_provider_url": mock_jwt_server, + "token_public_keys_url": f"{mock_jwt_server}/keys", + "kafka_bootstrap_server": kafka_container, + "event_bus_arn": "arn:aws:events:us-east-1:000000000000:event-bus/default", + } + + test_config_file = test_config_dir / "config.json" + test_config_file.write_text(json.dumps(test_config, indent=2), encoding="utf-8") + + # Point CONF_DIR to our test config. + os.environ["CONF_DIR"] = str(test_config_dir) + + logger.debug("Test config written to %s.", test_config_file) + + # Import the lambda handler (this triggers initialization). + from src.event_gate_lambda import lambda_handler + + yield lambda_handler + + # Cleanup. + os.environ.pop("POSTGRES_SECRET_NAME", None) + os.environ.pop("POSTGRES_SECRET_REGION", None) + if test_config_file.exists(): + test_config_file.unlink() + if access_config_dst.exists(): + access_config_dst.unlink() + topic_schemas_dst = test_config_dir / "topic_schemas" + if topic_schemas_dst.exists(): + shutil.rmtree(topic_schemas_dst) + if test_config_dir.exists() and not any(test_config_dir.iterdir()): + test_config_dir.rmdir() + + +@pytest.fixture(scope="session") +def eventgate_client( + lambda_handler_factory: Callable[[Dict[str, Any]], Dict[str, Any]], +) -> "EventGateTestClient": + """EventGate test client that invokes lambda_handler directly.""" + return EventGateTestClient(lambda_handler_factory) + + +@pytest.fixture(scope="session") +def valid_token(jwt_keypair: Dict[str, Any]) -> str: + """Valid JWT token for IntegrationTestUser.""" + return generate_token(jwt_keypair["private_key_pem"], "IntegrationTestUser") + + +# Test client (direct lambda invocation) +# --------------------------------------------------------------------------- +class EventGateTestClient: + """Test client that invokes lambda_handler directly.""" + + def __init__(self, handler: Callable[[Dict[str, Any]], Dict[str, Any]]): + """Initialize with lambda handler function.""" + self._handler = handler + + def invoke( + self, + resource: str, + method: str = "GET", + body: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + path_parameters: Optional[Dict[str, str]] = None, + ) -> Dict[str, Any]: + """Invoke lambda_handler with API Gateway proxy event format.""" + event = { + "resource": resource, + "httpMethod": method, + "headers": headers or {}, + "body": json.dumps(body) if body else None, + "pathParameters": path_parameters, + } + logger.debug("Invoking Lambda: %s %s.", method, resource) + result = self._handler(event) + logger.debug("Lambda response: statusCode=%s.", result.get("statusCode")) + return result + + def get_api(self) -> Dict[str, Any]: + """Get OpenAPI specification.""" + return self.invoke("/api", "GET") + + def get_token(self) -> Dict[str, Any]: + """Get token provider info.""" + return self.invoke("/token", "GET") + + def get_health(self) -> Dict[str, Any]: + """Get health status.""" + return self.invoke("/health", "GET") + + def get_topics(self) -> Dict[str, Any]: + """Get list of topics.""" + return self.invoke("/topics", "GET") + + def get_topic_schema(self, topic_name: str) -> Dict[str, Any]: + """Get schema for a specific topic.""" + return self.invoke( + "/topics/{topic_name}", + "GET", + path_parameters={"topic_name": topic_name}, + ) + + def post_event( + self, + topic_name: str, + event_data: Dict[str, Any], + token: Optional[str] = None, + ) -> Dict[str, Any]: + """Post an event to a topic.""" + headers = {} + if token: + headers["Authorization"] = f"Bearer {token}" + return self.invoke( + "/topics/{topic_name}", + "POST", + body=event_data, + headers=headers, + path_parameters={"topic_name": topic_name}, + ) diff --git a/tests/integration/schemas/__init__.py b/tests/integration/schemas/__init__.py new file mode 100644 index 0000000..f7115cb --- /dev/null +++ b/tests/integration/schemas/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/integration/schemas/postgres_schema.py b/tests/integration/schemas/postgres_schema.py new file mode 100644 index 0000000..f9d13cc --- /dev/null +++ b/tests/integration/schemas/postgres_schema.py @@ -0,0 +1,29 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""PostgreSQL schema for integration tests.""" + +SCHEMA_SQL = """ +-- Test topic table matching WriterPostgres test configuration +CREATE TABLE IF NOT EXISTS public_cps_za_test ( + event_id VARCHAR(255) NOT NULL, + tenant_id VARCHAR(255) NOT NULL, + source_app VARCHAR(255) NOT NULL, + environment VARCHAR(255) NOT NULL, + timestamp_event BIGINT, + additional_info JSONB +); +""" diff --git a/tests/integration/test_api_endpoint.py b/tests/integration/test_api_endpoint.py new file mode 100644 index 0000000..734e7cc --- /dev/null +++ b/tests/integration/test_api_endpoint.py @@ -0,0 +1,45 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for GET /api endpoint.""" + +from tests.integration.conftest import EventGateTestClient + + +class TestApiEndpoint: + """Tests for the /api endpoint.""" + + def test_get_api_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /api returns successful response.""" + response = eventgate_client.get_api() + + assert 200 == response["statusCode"] + + def test_get_api_returns_openapi_content_type(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /api returns correct content type for OpenAPI.""" + response = eventgate_client.get_api() + + assert "Content-Type" in response["headers"] + assert "application/yaml" in response["headers"]["Content-Type"] + + def test_get_api_body_contains_openapi_spec(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /api returns valid OpenAPI specification.""" + response = eventgate_client.get_api() + + body = response["body"] + assert "openapi:" in body + assert "paths:" in body + assert "/topics" in body diff --git a/tests/integration/test_health_endpoint.py b/tests/integration/test_health_endpoint.py new file mode 100644 index 0000000..6d52c8d --- /dev/null +++ b/tests/integration/test_health_endpoint.py @@ -0,0 +1,47 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for GET /health endpoint.""" + +import json + +from tests.integration.conftest import EventGateTestClient + + +class TestHealthEndpoint: + """Tests for the /health endpoint.""" + + def test_get_health_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /health returns successful response.""" + response = eventgate_client.get_health() + + assert 200 == response["statusCode"] + + def test_get_health_status_ok(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /health returns ok status when all writers healthy.""" + response = eventgate_client.get_health() + + body = json.loads(response["body"]) + assert "ok" == body["status"] + + def test_get_health_includes_uptime(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /health includes uptime in response.""" + response = eventgate_client.get_health() + + body = json.loads(response["body"]) + assert "uptime_seconds" in body + assert isinstance(body["uptime_seconds"], (int, float)) + assert body["uptime_seconds"] >= 0 diff --git a/tests/integration/test_token_endpoint.py b/tests/integration/test_token_endpoint.py new file mode 100644 index 0000000..14c6b24 --- /dev/null +++ b/tests/integration/test_token_endpoint.py @@ -0,0 +1,36 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for GET /token endpoint.""" + +from tests.integration.conftest import EventGateTestClient + + +class TestTokenEndpoint: + """Tests for the /token endpoint.""" + + def test_get_token_returns_redirect(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /token returns 303 redirect.""" + response = eventgate_client.get_token() + + assert 303 == response["statusCode"] + + def test_get_token_has_location_header(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /token includes Location header.""" + response = eventgate_client.get_token() + + assert "Location" in response["headers"] + assert response["headers"]["Location"] diff --git a/tests/integration/test_topics_endpoint.py b/tests/integration/test_topics_endpoint.py new file mode 100644 index 0000000..2f65e46 --- /dev/null +++ b/tests/integration/test_topics_endpoint.py @@ -0,0 +1,228 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Integration tests for /topics endpoints.""" + +import json +import time +import uuid +from typing import Any, Dict + +import boto3 +import psycopg2 +import pytest +from confluent_kafka import Consumer + +from tests.integration.conftest import EventGateTestClient +from tests.integration.utils.jwt_helper import generate_token + + +class TestTopicsListEndpoint: + """Tests for GET /topics endpoint.""" + + def test_get_topics_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics returns successful response.""" + response = eventgate_client.get_topics() + + assert 200 == response["statusCode"] + + def test_get_topics_includes_list_of_test_topic(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics includes the test topic.""" + response = eventgate_client.get_topics() + + body = json.loads(response["body"]) + assert isinstance(body, list) + assert "public.cps.za.test" in body + + +class TestTopicSchemaEndpoint: + """Tests for GET /topics/{topic_name} endpoint.""" + + def test_get_topic_schema_returns_200(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics/{topic_name} returns schema.""" + response = eventgate_client.get_topic_schema("public.cps.za.test") + + assert 200 == response["statusCode"] + + def test_get_topic_schema_contains_properties(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics/{topic_name} returns valid JSON schema.""" + response = eventgate_client.get_topic_schema("public.cps.za.test") + + body = json.loads(response["body"]) + assert "object" == body["type"] + assert "properties" in body + assert "event_id" in body["properties"] + + def test_get_topic_schema_nonexistent_returns_404(self, eventgate_client: EventGateTestClient) -> None: + """Test GET /topics/{topic_name} returns 404 for nonexistent topic.""" + response = eventgate_client.get_topic_schema("nonexistent.topic") + + assert 404 == response["statusCode"] + + +class TestPostEventEndpoint: + """Tests for POST /topics/{topic_name} endpoint.""" + + @pytest.fixture + def valid_event(self) -> dict: + """Generate a valid test event.""" + return { + "event_id": str(uuid.uuid4()), + "tenant_id": "TEST", + "source_app": "integration-test", + "environment": "test", + "timestamp": int(time.time() * 1000), + } + + def test_post_event_without_token_returns_401( + self, eventgate_client: EventGateTestClient, valid_event: dict + ) -> None: + """Test POST without JWT returns 401.""" + response = eventgate_client.post_event( + "public.cps.za.test", + valid_event, + token=None, + ) + + assert 401 == response["statusCode"] + + def test_post_event_with_valid_token_returns_202( + self, eventgate_client: EventGateTestClient, valid_event: dict, valid_token: str + ) -> None: + """Test POST with valid JWT returns accepted status.""" + response = eventgate_client.post_event( + "public.cps.za.test", + valid_event, + token=valid_token, + ) + + assert 202 == response["statusCode"] + + def test_post_event_invalid_schema_returns_400( + self, eventgate_client: EventGateTestClient, valid_token: str + ) -> None: + """Test POST with invalid event schema returns 400.""" + invalid_event = {"invalid_field": "value"} + + response = eventgate_client.post_event( + "public.cps.za.test", + invalid_event, + token=valid_token, + ) + + assert 400 == response["statusCode"] + + def test_post_event_unauthorized_user_returns_403( + self, eventgate_client: EventGateTestClient, valid_event: dict, jwt_keypair: Dict[str, Any] + ) -> None: + """Test POST from unauthorized user returns 403.""" + unauthorized_token = generate_token(jwt_keypair["private_key_pem"], "UnauthorizedUser") + response = eventgate_client.post_event( + "public.cps.za.test", + valid_event, + token=unauthorized_token, + ) + + assert 403 == response["statusCode"] + + def test_post_event_nonexistent_topic_returns_404( + self, eventgate_client: EventGateTestClient, valid_event: dict, valid_token: str + ) -> None: + """Test POST to nonexistent topic returns 404.""" + response = eventgate_client.post_event( + "nonexistent.topic", + valid_event, + token=valid_token, + ) + + assert 404 == response["statusCode"] + + +class TestPostEventWriterVerification: + """Verify events are dispatched after a successful POST.""" + + @pytest.fixture(scope="class") + def posted_event(self, eventgate_client: EventGateTestClient, valid_token: str) -> Dict[str, Any]: + """Post a unique event and return its payload for downstream assertions.""" + event = { + "event_id": str(uuid.uuid4()), + "tenant_id": "VERIFY", + "source_app": "integration-test-verify", + "environment": "test", + "timestamp": int(time.time() * 1000), + } + response = eventgate_client.post_event("public.cps.za.test", event, token=valid_token) + assert 202 == response["statusCode"] + return event + + def test_event_received_by_kafka(self, posted_event: Dict[str, Any], kafka_container: str) -> None: + """Verify the posted event is consumable from the Kafka topic.""" + consumer = Consumer( + { + "bootstrap.servers": kafka_container, + "group.id": f"test-verify-{uuid.uuid4()}", + "auto.offset.reset": "earliest", + } + ) + consumer.subscribe(["public.cps.za.test"]) + + try: + messages = [] + deadline = time.time() + 10 + while time.time() < deadline: + msg = consumer.poll(timeout=1.0) + if msg is None: + continue + if msg.error(): + continue + messages.append(json.loads(msg.value().decode("utf-8"))) + if any(m["event_id"] == posted_event["event_id"] for m in messages): + break + + matched = [m for m in messages if m["event_id"] == posted_event["event_id"]] + assert 1 == len(matched) + assert posted_event["tenant_id"] == matched[0]["tenant_id"] + finally: + consumer.close() + + def test_event_received_by_eventbridge(self, posted_event: Dict[str, Any], localstack_container: dict) -> None: + """Verify EventBridge accepted the event without failures.""" + client = boto3.client( + "events", + endpoint_url=localstack_container["url"], + region_name=localstack_container["region"], + aws_access_key_id="test", + aws_secret_access_key="test", + ) + # Verify the event bus exists and is reachable. + response = client.describe_event_bus(Name="default") + assert "Arn" in response + + def test_event_received_by_postgres(self, posted_event: Dict[str, Any], postgres_container: str) -> None: + """Verify the posted event was inserted into the PostgreSQL test table.""" + conn = psycopg2.connect(postgres_container) + try: + with conn.cursor() as cursor: + cursor.execute( + "SELECT event_id, tenant_id FROM public_cps_za_test WHERE event_id = %s", + (posted_event["event_id"],), + ) + row = cursor.fetchone() + assert row is not None + assert posted_event["event_id"] == row[0] + assert posted_event["tenant_id"] == row[1] + finally: + conn.close() diff --git a/tests/integration/utils/__init__.py b/tests/integration/utils/__init__.py new file mode 100644 index 0000000..f7115cb --- /dev/null +++ b/tests/integration/utils/__init__.py @@ -0,0 +1,15 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/integration/utils/jwt_helper.py b/tests/integration/utils/jwt_helper.py new file mode 100644 index 0000000..c2afbef --- /dev/null +++ b/tests/integration/utils/jwt_helper.py @@ -0,0 +1,87 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""JWT token generation utilities for integration tests.""" + +import base64 +import logging +from datetime import datetime, timedelta, timezone +from typing import Any, Dict + +import jwt +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa + +logger = logging.getLogger(__name__) + + +def create_test_jwt_keypair() -> Dict[str, Any]: + """ + Generate RSA keypair for JWT signing. + + Returns: + Dictionary with private_key_pem (bytes) and public_key_b64 (str). + """ + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + public_key = private_key.public_key() + + private_key_pem = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + public_key_der = public_key.public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + public_key_b64 = base64.b64encode(public_key_der).decode("utf-8") + + logger.debug("Generated RSA keypair for JWT signing.") + + return { + "private_key_pem": private_key_pem, + "public_key_b64": public_key_b64, + } + + +def generate_token( + private_key_pem: bytes, + username: str, + exp_minutes: int = 30, +) -> str: + """ + Generate a signed JWT token. + + Args: + private_key_pem: PEM-encoded private key bytes. + username: Username to include in 'sub' claim. + exp_minutes: Token expiration time in minutes. + + Returns: + Signed JWT token string. + """ + now = datetime.now(timezone.utc) + payload = { + "sub": username, + "iat": now, + "exp": now + timedelta(minutes=exp_minutes), + } + + token = jwt.encode(payload, private_key_pem, algorithm="RS256") + logger.debug("Generated JWT token for user %s.", username) + + return token From 1f2af9fde443edd17081c6638aa90b3059fe25e2 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 17 Feb 2026 12:22:51 +0100 Subject: [PATCH 3/6] Code Rabbit comment fixes. --- .github/copilot-instructions.md | 2 +- DEVELOPER.md | 2 +- conf/access.json | 2 +- tests/integration/__init__.py | 2 +- tests/integration/conftest.py | 2 +- tests/integration/schemas/__init__.py | 2 +- tests/integration/schemas/postgres_schema.py | 2 +- tests/integration/test_api_endpoint.py | 2 +- tests/integration/test_health_endpoint.py | 2 +- tests/integration/test_token_endpoint.py | 2 +- tests/integration/test_topics_endpoint.py | 2 +- tests/integration/utils/__init__.py | 2 +- tests/integration/utils/jwt_helper.py | 2 +- 13 files changed, 13 insertions(+), 13 deletions(-) diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 51de70f..ba96099 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -32,7 +32,7 @@ Patterns - Keep error response format stable: `{"success": false, "statusCode": int, "errors": [...]}` Testing -- Mirror src structure: `src/handlers/` -> `tests/handlers/` +- Mirror src structure: `src/handlers/` -> `tests/unit/handlers/` - Unit tests: mock external services via `conftest.py` (Kafka, EventBridge, PostgreSQL, S3) - Integration tests: call `lambda_handler` directly with real containers (testcontainers-python for Kafka, PostgreSQL, LocalStack) - No real API/DB calls in unit tests diff --git a/DEVELOPER.md b/DEVELOPER.md index 82b8ab8..94d88c4 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -112,7 +112,7 @@ Unit tests are written using pytest. To run the tests, use the following command pytest tests/unit/ ``` -This will execute all tests located in the tests directory. +This will execute all unit tests located in the tests/unit/ directory. ### Focused / Selective Test Runs Run a single test file: diff --git a/conf/access.json b/conf/access.json index f158758..76d167f 100644 --- a/conf/access.json +++ b/conf/access.json @@ -10,4 +10,4 @@ "TestUser", "IntegrationTestUser" ] -} \ No newline at end of file +} diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index f7115cb..ebfbdd3 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f1bd710..7b72e65 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/schemas/__init__.py b/tests/integration/schemas/__init__.py index f7115cb..ebfbdd3 100644 --- a/tests/integration/schemas/__init__.py +++ b/tests/integration/schemas/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/schemas/postgres_schema.py b/tests/integration/schemas/postgres_schema.py index f9d13cc..d3ccb47 100644 --- a/tests/integration/schemas/postgres_schema.py +++ b/tests/integration/schemas/postgres_schema.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/test_api_endpoint.py b/tests/integration/test_api_endpoint.py index 734e7cc..195f116 100644 --- a/tests/integration/test_api_endpoint.py +++ b/tests/integration/test_api_endpoint.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/test_health_endpoint.py b/tests/integration/test_health_endpoint.py index 6d52c8d..3e54464 100644 --- a/tests/integration/test_health_endpoint.py +++ b/tests/integration/test_health_endpoint.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/test_token_endpoint.py b/tests/integration/test_token_endpoint.py index 14c6b24..1083a91 100644 --- a/tests/integration/test_token_endpoint.py +++ b/tests/integration/test_token_endpoint.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/test_topics_endpoint.py b/tests/integration/test_topics_endpoint.py index 2f65e46..c9bc5ee 100644 --- a/tests/integration/test_topics_endpoint.py +++ b/tests/integration/test_topics_endpoint.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/utils/__init__.py b/tests/integration/utils/__init__.py index f7115cb..ebfbdd3 100644 --- a/tests/integration/utils/__init__.py +++ b/tests/integration/utils/__init__.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/tests/integration/utils/jwt_helper.py b/tests/integration/utils/jwt_helper.py index c2afbef..1d7780c 100644 --- a/tests/integration/utils/jwt_helper.py +++ b/tests/integration/utils/jwt_helper.py @@ -1,5 +1,5 @@ # -# Copyright 2025 ABSA Group Limited +# Copyright 2026 ABSA Group Limited # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. From b01e140301649346ca1622a3f67981c8c550024f Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 17 Feb 2026 13:44:12 +0100 Subject: [PATCH 4/6] Code Rabbit comment fixes. --- tests/integration/conftest.py | 17 ++++++++++++++--- tests/integration/test_topics_endpoint.py | 5 ++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 7b72e65..76d768d 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -249,6 +249,8 @@ def mock_jwt_server(jwt_keypair: Dict[str, Any]) -> Generator[str, None, None]: break except (ConnectionError, OSError): time.sleep(0.1) + else: + pytest.fail("Mock JWT server failed to start within 1 second.") yield url @@ -331,9 +333,18 @@ def lambda_handler_factory( yield lambda_handler - # Cleanup. - os.environ.pop("POSTGRES_SECRET_NAME", None) - os.environ.pop("POSTGRES_SECRET_REGION", None) + # Cleanup environment variables. + for key in ( + "LOG_LEVEL", + "AWS_ENDPOINT_URL", + "AWS_DEFAULT_REGION", + "AWS_ACCESS_KEY_ID", + "AWS_SECRET_ACCESS_KEY", + "POSTGRES_SECRET_NAME", + "POSTGRES_SECRET_REGION", + "CONF_DIR", + ): + os.environ.pop(key, None) if test_config_file.exists(): test_config_file.unlink() if access_config_dst.exists(): diff --git a/tests/integration/test_topics_endpoint.py b/tests/integration/test_topics_endpoint.py index c9bc5ee..f7dbb59 100644 --- a/tests/integration/test_topics_endpoint.py +++ b/tests/integration/test_topics_endpoint.py @@ -198,8 +198,8 @@ def test_event_received_by_kafka(self, posted_event: Dict[str, Any], kafka_conta finally: consumer.close() - def test_event_received_by_eventbridge(self, posted_event: Dict[str, Any], localstack_container: dict) -> None: - """Verify EventBridge accepted the event without failures.""" + def test_eventbridge_bus_reachable(self, localstack_container: dict) -> None: + """Verify EventBridge event bus exists and is reachable after event dispatch.""" client = boto3.client( "events", endpoint_url=localstack_container["url"], @@ -207,7 +207,6 @@ def test_event_received_by_eventbridge(self, posted_event: Dict[str, Any], local aws_access_key_id="test", aws_secret_access_key="test", ) - # Verify the event bus exists and is reachable. response = client.describe_event_bus(Name="default") assert "Arn" in response From 4f37c0ce54b75ac808d4bfbff1482460132ed002 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 17 Feb 2026 15:36:40 +0100 Subject: [PATCH 5/6] Comment implementation --- conf/access.json | 14 +--- requirements.txt | 2 +- tests/integration/schemas/postgres_schema.py | 20 +++++- tests/integration/test_topics_endpoint.py | 67 ++++++++++++-------- tests/integration/utils/utils.py | 48 ++++++++++++++ 5 files changed, 108 insertions(+), 43 deletions(-) create mode 100644 tests/integration/utils/utils.py diff --git a/conf/access.json b/conf/access.json index 76d167f..827bc3d 100644 --- a/conf/access.json +++ b/conf/access.json @@ -1,13 +1,5 @@ { - "public.cps.za.runs": [ - "FooBarUser" - ], - "public.cps.za.dlchange": [ - "FooUser", - "BarUser" - ], - "public.cps.za.test": [ - "TestUser", - "IntegrationTestUser" - ] + "public.cps.za.runs": ["FooBarUser", "IntegrationTestUser"], + "public.cps.za.dlchange": ["FooUser", "BarUser"], + "public.cps.za.test": ["TestUser"] } diff --git a/requirements.txt b/requirements.txt index 21de79a..7ccefe1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,7 @@ PyJWT==2.10.1 requests==2.32.5 boto3==1.40.25 confluent-kafka==2.12.1 -testcontainers==4.9.0 +testcontainers==4.14.1 docker==7.1.0 # psycopg2-binary==2.9.10 # Ideal for local development, but not for long-term production use psycopg2==2.9.10 diff --git a/tests/integration/schemas/postgres_schema.py b/tests/integration/schemas/postgres_schema.py index d3ccb47..482fabb 100644 --- a/tests/integration/schemas/postgres_schema.py +++ b/tests/integration/schemas/postgres_schema.py @@ -17,13 +17,27 @@ """PostgreSQL schema for integration tests.""" SCHEMA_SQL = """ --- Test topic table matching WriterPostgres test configuration -CREATE TABLE IF NOT EXISTS public_cps_za_test ( +-- Table matching WriterPostgres._postgres_run_write columns +CREATE TABLE IF NOT EXISTS public_cps_za_runs ( event_id VARCHAR(255) NOT NULL, + job_ref VARCHAR(255) NOT NULL, tenant_id VARCHAR(255) NOT NULL, source_app VARCHAR(255) NOT NULL, + source_app_version VARCHAR(255) NOT NULL, environment VARCHAR(255) NOT NULL, - timestamp_event BIGINT, + timestamp_start BIGINT, + timestamp_end BIGINT +); + +-- Table matching WriterPostgres._postgres_run_write job rows +CREATE TABLE IF NOT EXISTS public_cps_za_runs_jobs ( + event_id VARCHAR(255) NOT NULL, + country VARCHAR(255), + catalog_id VARCHAR(255) NOT NULL, + status VARCHAR(50) NOT NULL, + timestamp_start BIGINT, + timestamp_end BIGINT, + message TEXT, additional_info JSONB ); """ diff --git a/tests/integration/test_topics_endpoint.py b/tests/integration/test_topics_endpoint.py index f7dbb59..c730478 100644 --- a/tests/integration/test_topics_endpoint.py +++ b/tests/integration/test_topics_endpoint.py @@ -28,6 +28,7 @@ from tests.integration.conftest import EventGateTestClient from tests.integration.utils.jwt_helper import generate_token +from tests.integration.utils.utils import create_runs_event class TestTopicsListEndpoint: @@ -78,21 +79,21 @@ class TestPostEventEndpoint: @pytest.fixture def valid_event(self) -> dict: - """Generate a valid test event.""" - return { - "event_id": str(uuid.uuid4()), - "tenant_id": "TEST", - "source_app": "integration-test", - "environment": "test", - "timestamp": int(time.time() * 1000), - } + """Generate a valid run event.""" + return create_runs_event( + event_id=str(uuid.uuid4()), + job_ref="spark-app-001", + tenant_id="TEST", + source_app="integration-test", + catalog_id="db.schema.table", + ) def test_post_event_without_token_returns_401( self, eventgate_client: EventGateTestClient, valid_event: dict ) -> None: """Test POST without JWT returns 401.""" response = eventgate_client.post_event( - "public.cps.za.test", + "public.cps.za.runs", valid_event, token=None, ) @@ -104,7 +105,7 @@ def test_post_event_with_valid_token_returns_202( ) -> None: """Test POST with valid JWT returns accepted status.""" response = eventgate_client.post_event( - "public.cps.za.test", + "public.cps.za.runs", valid_event, token=valid_token, ) @@ -118,7 +119,7 @@ def test_post_event_invalid_schema_returns_400( invalid_event = {"invalid_field": "value"} response = eventgate_client.post_event( - "public.cps.za.test", + "public.cps.za.runs", invalid_event, token=valid_token, ) @@ -131,7 +132,7 @@ def test_post_event_unauthorized_user_returns_403( """Test POST from unauthorized user returns 403.""" unauthorized_token = generate_token(jwt_keypair["private_key_pem"], "UnauthorizedUser") response = eventgate_client.post_event( - "public.cps.za.test", + "public.cps.za.runs", valid_event, token=unauthorized_token, ) @@ -156,15 +157,15 @@ class TestPostEventWriterVerification: @pytest.fixture(scope="class") def posted_event(self, eventgate_client: EventGateTestClient, valid_token: str) -> Dict[str, Any]: - """Post a unique event and return its payload for downstream assertions.""" - event = { - "event_id": str(uuid.uuid4()), - "tenant_id": "VERIFY", - "source_app": "integration-test-verify", - "environment": "test", - "timestamp": int(time.time() * 1000), - } - response = eventgate_client.post_event("public.cps.za.test", event, token=valid_token) + """Post a unique runs event and return its payload for downstream assertions.""" + event = create_runs_event( + event_id=str(uuid.uuid4()), + job_ref="spark-verify-001", + tenant_id="VERIFY", + source_app="integration-test-verify", + catalog_id="db.schema.verify_table", + ) + response = eventgate_client.post_event("public.cps.za.runs", event, token=valid_token) assert 202 == response["statusCode"] return event @@ -177,7 +178,7 @@ def test_event_received_by_kafka(self, posted_event: Dict[str, Any], kafka_conta "auto.offset.reset": "earliest", } ) - consumer.subscribe(["public.cps.za.test"]) + consumer.subscribe(["public.cps.za.runs"]) try: messages = [] @@ -211,17 +212,27 @@ def test_eventbridge_bus_reachable(self, localstack_container: dict) -> None: assert "Arn" in response def test_event_received_by_postgres(self, posted_event: Dict[str, Any], postgres_container: str) -> None: - """Verify the posted event was inserted into the PostgreSQL test table.""" + """Verify the posted event was inserted into the PostgreSQL runs tables.""" conn = psycopg2.connect(postgres_container) try: with conn.cursor() as cursor: cursor.execute( - "SELECT event_id, tenant_id FROM public_cps_za_test WHERE event_id = %s", + "SELECT event_id, tenant_id FROM public_cps_za_runs WHERE event_id = %s", + (posted_event["event_id"],), + ) + run_row = cursor.fetchone() + assert run_row is not None + assert posted_event["event_id"] == run_row[0] + assert posted_event["tenant_id"] == run_row[1] + + cursor.execute( + "SELECT event_id, catalog_id, status FROM public_cps_za_runs_jobs WHERE event_id = %s", (posted_event["event_id"],), ) - row = cursor.fetchone() - assert row is not None - assert posted_event["event_id"] == row[0] - assert posted_event["tenant_id"] == row[1] + job_row = cursor.fetchone() + assert job_row is not None + assert posted_event["event_id"] == job_row[0] + assert posted_event["jobs"][0]["catalog_id"] == job_row[1] + assert posted_event["jobs"][0]["status"] == job_row[2] finally: conn.close() diff --git a/tests/integration/utils/utils.py b/tests/integration/utils/utils.py new file mode 100644 index 0000000..5dd4b1e --- /dev/null +++ b/tests/integration/utils/utils.py @@ -0,0 +1,48 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utility functions for the integration testing.""" +import time +from typing import Dict, Any + + +def create_runs_event( + event_id: str, + job_ref: str, + tenant_id: str, + source_app: str, + catalog_id: str, +) -> Dict[str, Any]: + """Create a runs event with standard structure.""" + now = int(time.time() * 1000) + return { + "event_id": event_id, + "job_ref": job_ref, + "tenant_id": tenant_id, + "source_app": source_app, + "source_app_version": "1.0.0", + "environment": "test", + "timestamp_start": now - 60000, + "timestamp_end": now, + "jobs": [ + { + "catalog_id": catalog_id, + "status": "succeeded", + "timestamp_start": now - 60000, + "timestamp_end": now, + } + ], + } From 9a56145a70dc11fe0c3df6fde7002a706f467951 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Tue, 17 Feb 2026 15:44:11 +0100 Subject: [PATCH 6/6] documentation update. --- DEVELOPER.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/DEVELOPER.md b/DEVELOPER.md index 94d88c4..b2b5b88 100644 --- a/DEVELOPER.md +++ b/DEVELOPER.md @@ -155,8 +155,6 @@ EventGate uses a **direct invocation approach** for integration testing: - **Mock JWT provider runs in-process** as a background thread (no container) - Test configuration is dynamically generated and injected via environment variables -This approach is faster (~8s vs 30s+), more reliable, and easier to debug than container-based Lambda testing. - ### Prerequisites - Docker running (Docker Desktop on macOS/Windows, or Docker Engine on Linux) - Python 3.13 with dependencies installed