diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index c6603535..6765cff4 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -12,7 +12,7 @@ env: # Remote Docker host (set via GitHub repository secrets or variables; leave unset for local Docker) DOCKER_HOST: ${{ secrets.DOCKER_HOST }} DOCKER_HOST_IP: ${{ secrets.DOCKER_HOST_IP }} - # S3 backend configuration (optional; S3 tests are skipped when not set) + # S3+Elasticsearch backend configuration (optional; s3-elasticsearch tests are skipped when not set) HFS_S3_BUCKET: ${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }} jobs: @@ -50,9 +50,12 @@ jobs: name: Inferno US Core ${{ matrix.version_label }} Tests (${{ matrix.backend }}) needs: build runs-on: [self-hosted, Linux] + permissions: + id-token: write + contents: read strategy: fail-fast: false - max-parallel: 3 + max-parallel: 2 matrix: suite_id: [ @@ -63,7 +66,7 @@ jobs: us_core_v700, us_core_v800, ] - backend: [sqlite, sqlite-elasticsearch, postgres, s3] + backend: [sqlite, sqlite-elasticsearch, postgres, s3-elasticsearch] include: - { suite_id: us_core_v311, version_label: "v3.1.1" } - { suite_id: us_core_v400, version_label: "v4.0.0" } @@ -140,7 +143,7 @@ jobs: echo "OMITTED_TESTS=[${OMITTED}]" >> $GITHUB_ENV - name: Start Elasticsearch - if: matrix.backend == 'sqlite-elasticsearch' + if: matrix.backend == 'sqlite-elasticsearch' || matrix.backend == 's3-elasticsearch' run: | ES_CONTAINER="es-${{ matrix.suite_id }}-${{ matrix.backend }}" docker rm -f $ES_CONTAINER 2>/dev/null || true @@ -186,9 +189,12 @@ jobs: for i in {1..30}; do if docker exec $PG_CONTAINER pg_isready -U helios > /dev/null 2>&1; then PG_PORT=$(docker port $PG_CONTAINER 5432 | head -1 | sed 's/.*://') - echo "PostgreSQL is ready on port $PG_PORT" - echo "PG_PORT=$PG_PORT" >> $GITHUB_ENV - exit 0 + # Verify external TCP connectivity to the mapped port, not just internal readiness + if timeout 2 bash -c "cat < /dev/null > /dev/tcp/$DOCKER_HOST_IP/$PG_PORT" 2>/dev/null; then + echo "PostgreSQL is ready on port $PG_PORT" + echo "PG_PORT=$PG_PORT" >> $GITHUB_ENV + exit 0 + fi fi echo "Attempt $i/30: PostgreSQL not ready yet..." sleep 2 @@ -197,8 +203,38 @@ jobs: docker logs $PG_CONTAINER exit 1 + - name: Configure AWS credentials + if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_ROLE_ARN || vars.AWS_ROLE_ARN }} + aws-region: ${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} + + - name: Install AWS CLI + if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' + run: | + if ! command -v aws &>/dev/null; then + if ! /tmp/aws-bin/aws --version &>/dev/null; then + curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o /tmp/awscliv2.zip + unzip -qo /tmp/awscliv2.zip -d /tmp + /tmp/aws/install --install-dir /tmp/aws-cli --bin-dir /tmp/aws-bin + rm -rf /tmp/awscliv2.zip /tmp/aws + fi + echo "/tmp/aws-bin" >> $GITHUB_PATH + fi + aws --version 2>/dev/null || /tmp/aws-bin/aws --version + + - name: Empty S3 prefix + if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' + run: | + BUCKET="${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }}" + PREFIX="ci/${{ matrix.suite_id }}/" + echo "Emptying s3://$BUCKET/$PREFIX" + aws s3 rm "s3://$BUCKET/$PREFIX" --recursive + echo "Prefix emptied" + - name: Start HFS server - if: matrix.backend != 's3' || env.HFS_S3_BUCKET != '' + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | HFS_LOG="/tmp/hfs-${{ matrix.suite_id }}-${{ matrix.backend }}.log" echo "HFS_LOG=$HFS_LOG" >> $GITHUB_ENV @@ -215,11 +251,12 @@ jobs: HFS_PG_USER=helios \ HFS_PG_PASSWORD=helios \ ./target/debug/hfs --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & - elif [ "${{ matrix.backend }}" = "s3" ]; then - HFS_STORAGE_BACKEND=s3 \ + elif [ "${{ matrix.backend }}" = "s3-elasticsearch" ]; then + HFS_STORAGE_BACKEND=s3-elasticsearch \ HFS_S3_BUCKET=${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }} \ + HFS_S3_PREFIX=ci/${{ matrix.suite_id }}/ \ HFS_S3_VALIDATE_BUCKETS=false \ - AWS_REGION=${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} \ + HFS_ELASTICSEARCH_NODES=http://$DOCKER_HOST_IP:$ES_PORT \ ./target/debug/hfs --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & else ./target/debug/hfs --database-url :memory: --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & @@ -228,6 +265,7 @@ jobs: echo "HFS_PID=$(cat /tmp/hfs.pid)" >> $GITHUB_ENV - name: Wait for HFS to be ready + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | echo "Waiting for HFS to start..." for i in {1..30}; do @@ -253,10 +291,17 @@ jobs: exit 1 - name: Load Inferno test data into HFS + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | - ./crates/hfs/tests/inferno/install.sh + ./crates/hfs/tests/inferno/install.sh || { + echo "=== HFS log output ===" + cat "$HFS_LOG" + echo "=== end of log ===" + exit 1 + } - name: Wait for Inferno to be ready + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | echo "Waiting for persistent Inferno container to respond..." for i in {1..30}; do @@ -271,9 +316,11 @@ jobs: exit 1 - name: Create results directory + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: mkdir -p inferno-results - name: Run Inferno tests + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | # Create a test session for US Core ${{ matrix.version_label }} # Retry to handle Inferno's SQLite "database is locked" errors under concurrent load @@ -387,6 +434,7 @@ jobs: done - name: Check test results + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | if [ ! -f inferno-results/results.json ]; then echo "No results file found" diff --git a/CLAUDE.md b/CLAUDE.md index 2e503ddf..e961f75c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -194,6 +194,9 @@ HFS_STORAGE_BACKEND=postgres HFS_DATABASE_URL="postgresql://user:pass@localhost/ # With SQLite + Elasticsearch HFS_STORAGE_BACKEND=sqlite-es HFS_ELASTICSEARCH_NODES="http://localhost:9200" cargo run --bin hfs +# With S3 (requires --features s3) +HFS_STORAGE_BACKEND=s3 HFS_S3_BUCKET=my-bucket cargo run --bin hfs --features s3 + # With environment overrides HFS_SERVER_PORT=3000 HFS_LOG_LEVEL=debug cargo run --bin hfs ``` @@ -260,6 +263,23 @@ HFS_SERVER_PORT=3000 HFS_LOG_LEVEL=debug cargo run --bin hfs | SQLite + Elasticsearch | `sqlite-elasticsearch` or `sqlite-es` | SQLite for CRUD, ES for search | | PostgreSQL | `postgres` or `pg` or `postgresql` | PostgreSQL only | | PostgreSQL + Elasticsearch | `postgres-elasticsearch` or `pg-es` | PG for CRUD, ES for search | +| S3 | `s3` | AWS S3 object storage for CRUD, versioning, history, bulk ops (no search) | +| S3 + Elasticsearch | `s3-elasticsearch` or `s3-es` | S3 for CRUD, ES for search | + +#### S3 Backend +Requires building with the `s3` feature: +```bash +cargo build -p helios-hfs --features s3 +HFS_STORAGE_BACKEND=s3 HFS_S3_BUCKET=my-bucket HFS_S3_REGION=us-east-1 cargo run --bin hfs --features s3 +``` + +| Variable | Default | Description | +|----------|---------|-------------| +| `HFS_S3_BUCKET` | `hfs` | S3 bucket name (prefix-per-tenant mode) | +| `HFS_S3_REGION` | (AWS chain) | AWS region override | +| `HFS_S3_VALIDATE_BUCKETS` | `true` | Validate bucket existence on startup | + +Standard AWS credential chain applies (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, instance profiles, etc.). For S3-compatible endpoints (e.g., MinIO), configure `S3BackendConfig` directly with `endpoint_url` and `force_path_style`. ### Multi-tenancy ```bash diff --git a/Cargo.lock b/Cargo.lock index c9e6c27a..686e02b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2798,6 +2798,7 @@ dependencies = [ "helios-persistence", "helios-rest", "openssl", + "parking_lot", "reqwest", "tokio", "tracing", diff --git a/README.md b/README.md index 7a337a79..5b5709e8 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,7 @@ The Helios FHIR Server supports multiple storage backend configurations. Choose | **PostgreSQL** | Built-in full-text search (tsvector/tsquery) | Production OLTP deployments | | **PostgreSQL + Elasticsearch** | Elasticsearch-powered search with PostgreSQL CRUD | Production deployments needing RDBMS + robust search | | **S3** | Object storage for CRUD, versioning, history, and bulk operations (no search) | Archival, bulk analytics, cost-effective storage | +| **S3 + Elasticsearch** | Elasticsearch-powered search with S3 CRUD | Large-scale storage with full FHIR search | ### Running the Server @@ -200,6 +201,14 @@ HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ HFS_STORAGE_BACKEND=s3 \ HFS_S3_BUCKET=my-fhir-bucket \ AWS_PROFILE=your-aws-profile \ +AWS_REGION=us-east-1 \ + ./hfs + +# S3 + Elasticsearch +HFS_STORAGE_BACKEND=s3-elasticsearch \ +HFS_S3_BUCKET=my-fhir-bucket \ +HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ +AWS_PROFILE=your-aws-profile \ AWS_REGION=us-east-1 \ ./hfs ``` @@ -208,7 +217,7 @@ AWS_REGION=us-east-1 \ | Variable | Default | Description | |---|---|---| -| `HFS_STORAGE_BACKEND` | `sqlite` | Backend mode: `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, or `s3` | +| `HFS_STORAGE_BACKEND` | `sqlite` | Backend mode: `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, `s3`, or `s3-elasticsearch` | | `HFS_SERVER_PORT` | `8080` | Server port | | `HFS_SERVER_HOST` | `127.0.0.1` | Host to bind | | `HFS_DATABASE_URL` | `fhir.db` | Database URL (SQLite path or PostgreSQL connection string) | @@ -220,6 +229,7 @@ AWS_REGION=us-east-1 \ | `HFS_ELASTICSEARCH_PASSWORD` | *(none)* | ES basic auth password | | `HFS_S3_BUCKET` | `hfs` | S3 bucket name (prefix-per-tenant mode) | | `HFS_S3_REGION` | *(AWS provider chain)* | AWS region override | +| `HFS_S3_PREFIX` | *(none)* | Optional key prefix prepended to all S3 object keys | | `HFS_S3_VALIDATE_BUCKETS` | `true` | Validate bucket access on startup | For detailed backend setup instructions (building from source, Docker commands, and search offloading architecture), see the [persistence crate documentation](crates/persistence/README.md#building--running-storage-backends). diff --git a/crates/hfs/Cargo.toml b/crates/hfs/Cargo.toml index af4fa221..e970ae50 100644 --- a/crates/hfs/Cargo.toml +++ b/crates/hfs/Cargo.toml @@ -53,6 +53,9 @@ clap = { version = "4.0", features = ["derive", "env"] } # Logging tracing = "0.1" +# Sync primitives (used by build_search_registry for the elasticsearch composite backend) +parking_lot = "0.12" + # Error handling anyhow = "1.0" diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index 3ad07723..a5bff953 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -11,8 +11,9 @@ //! | PostgreSQL | `postgres` | Full-featured RDBMS with JSONB storage and tsvector search | //! | PostgreSQL + Elasticsearch | `postgres,elasticsearch` | PostgreSQL for CRUD, Elasticsearch for search | //! | S3 | `s3` | AWS S3 object storage for CRUD, versioning, history, and bulk ops (no search) | +//! | S3 + Elasticsearch | `s3,elasticsearch` | S3 for CRUD/history, Elasticsearch for search | //! -//! Set `HFS_STORAGE_BACKEND` to `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, or `s3`. +//! Set `HFS_STORAGE_BACKEND` to `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, `s3`, or `s3-elasticsearch`. use clap::Parser; use helios_rest::{ServerConfig, StorageBackendMode, create_app_with_config, init_logging}; @@ -92,6 +93,9 @@ async fn main() -> anyhow::Result<()> { StorageBackendMode::S3 => { start_s3(config).await?; } + StorageBackendMode::S3Elasticsearch => { + start_s3_elasticsearch(config).await?; + } } Ok(()) @@ -381,6 +385,7 @@ async fn start_s3(config: ServerConfig) -> anyhow::Result<()> { let bucket = std::env::var("HFS_S3_BUCKET").unwrap_or_else(|_| "hfs".to_string()); let region = std::env::var("HFS_S3_REGION").ok(); + let prefix = std::env::var("HFS_S3_PREFIX").ok(); let validate_buckets = std::env::var("HFS_S3_VALIDATE_BUCKETS") .map(|s| s.to_lowercase() != "false" && s != "0") .unwrap_or(true); @@ -388,6 +393,7 @@ async fn start_s3(config: ServerConfig) -> anyhow::Result<()> { info!( bucket = %bucket, region = ?region, + prefix = ?prefix, validate_buckets = validate_buckets, "Initializing S3 backend" ); @@ -397,11 +403,12 @@ async fn start_s3(config: ServerConfig) -> anyhow::Result<()> { bucket: bucket.clone(), }, region, + prefix, validate_buckets_on_startup: validate_buckets, ..Default::default() }; - let backend = S3Backend::new(s3_config).map_err(|e| { + let backend = S3Backend::from_env_async(s3_config).await.map_err(|e| { anyhow::anyhow!( "Failed to initialize S3 backend (bucket={}, region={:?}): {}", bucket, @@ -423,6 +430,167 @@ async fn start_s3(_config: ServerConfig) -> anyhow::Result<()> { ) } +/// Builds a search parameter registry independently (for backends that don't own one). +#[cfg(feature = "elasticsearch")] +fn build_search_registry( + fhir_version: helios_fhir::FhirVersion, + data_dir: Option<&std::path::Path>, +) -> std::sync::Arc> { + use helios_persistence::search::{SearchParameterLoader, SearchParameterRegistry}; + + let registry = std::sync::Arc::new(parking_lot::RwLock::new(SearchParameterRegistry::new())); + let loader = SearchParameterLoader::new(fhir_version); + { + let mut reg = registry.write(); + if let Ok(params) = loader.load_embedded() { + for p in params { + let _ = reg.register(p); + } + } + let dir = data_dir.unwrap_or_else(|| std::path::Path::new("./data")); + if let Ok(params) = loader.load_from_spec_file(dir) { + for p in params { + let _ = reg.register(p); + } + } + } + registry +} + +/// Starts the server with S3 + Elasticsearch composite backend. +#[cfg(all(feature = "s3", feature = "elasticsearch"))] +async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { + use std::collections::HashMap; + use std::sync::Arc; + + use helios_persistence::backends::elasticsearch::{ + ElasticsearchAuth, ElasticsearchBackend, ElasticsearchConfig, + }; + use helios_persistence::backends::s3::{S3Backend, S3BackendConfig, S3TenancyMode}; + use helios_persistence::composite::{CompositeConfig, CompositeStorage}; + use helios_persistence::core::BackendKind; + + // --- S3 backend (primary) --- + let bucket = std::env::var("HFS_S3_BUCKET").unwrap_or_else(|_| "hfs".to_string()); + let region = std::env::var("HFS_S3_REGION").ok(); + let prefix = std::env::var("HFS_S3_PREFIX").ok(); + let validate_buckets = std::env::var("HFS_S3_VALIDATE_BUCKETS") + .map(|s| s.to_lowercase() != "false" && s != "0") + .unwrap_or(true); + + info!( + bucket = %bucket, + region = ?region, + prefix = ?prefix, + validate_buckets = validate_buckets, + "Initializing S3 backend (primary)" + ); + + let s3_config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: bucket.clone(), + }, + region, + prefix, + validate_buckets_on_startup: validate_buckets, + ..Default::default() + }; + + let s3 = Arc::new(S3Backend::from_env_async(s3_config).await.map_err(|e| { + anyhow::anyhow!( + "Failed to initialize S3 backend (bucket={}, region={:?}): {}", + bucket, + std::env::var("AWS_REGION").ok(), + e + ) + })?); + + // --- Elasticsearch backend (search) --- + let es_nodes: Vec = config + .elasticsearch_nodes + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + let es_auth = match ( + &config.elasticsearch_username, + &config.elasticsearch_password, + ) { + (Some(username), Some(password)) => Some(ElasticsearchAuth::Basic { + username: username.clone(), + password: password.clone(), + }), + _ => None, + }; + + let es_config = ElasticsearchConfig { + nodes: es_nodes.clone(), + index_prefix: config.elasticsearch_index_prefix.clone(), + auth: es_auth, + fhir_version: config.default_fhir_version, + ..Default::default() + }; + + info!( + nodes = ?es_nodes, + index_prefix = %config.elasticsearch_index_prefix, + "Initializing Elasticsearch backend (search)" + ); + + // Build search registry independently — S3 has no internal registry + let search_registry = + build_search_registry(config.default_fhir_version, config.data_dir.as_deref()); + let es = Arc::new(ElasticsearchBackend::with_shared_registry( + es_config, + search_registry, + )?); + + // --- Composite wiring --- + let composite_config = CompositeConfig::builder() + .primary("s3", BackendKind::S3) + .search_backend("es", BackendKind::Elasticsearch) + .build()?; + + let mut backends = HashMap::new(); + backends.insert( + "s3".to_string(), + s3.clone() as helios_persistence::composite::DynStorage, + ); + backends.insert( + "es".to_string(), + es.clone() as helios_persistence::composite::DynStorage, + ); + + let mut search_providers = HashMap::new(); + search_providers.insert( + "s3".to_string(), + s3.clone() as helios_persistence::composite::DynSearchProvider, + ); + search_providers.insert( + "es".to_string(), + es.clone() as helios_persistence::composite::DynSearchProvider, + ); + + let composite = CompositeStorage::new(composite_config, backends)? + .with_search_providers(search_providers) + .with_full_primary(s3); + + info!("Composite storage initialized: S3 (primary) + Elasticsearch (search)"); + + let app = create_app_with_config(composite, config.clone()); + serve(app, &config).await +} + +/// Fallback when s3+elasticsearch features are not both enabled. +#[cfg(not(all(feature = "s3", feature = "elasticsearch")))] +async fn start_s3_elasticsearch(_config: ServerConfig) -> anyhow::Result<()> { + anyhow::bail!( + "The s3-elasticsearch backend requires both 's3' and 'elasticsearch' features. \ + Build with: cargo build -p helios-hfs --features s3,elasticsearch" + ) +} + #[cfg(not(any( feature = "sqlite", feature = "postgres", @@ -430,3 +598,88 @@ async fn start_s3(_config: ServerConfig) -> anyhow::Result<()> { feature = "s3" )))] compile_error!("At least one database backend feature must be enabled"); + +#[cfg(test)] +mod tests { + use super::*; + use helios_rest::ServerConfig; + + // ── create_sqlite_backend() ─────────────────────────────────── + + #[cfg(feature = "sqlite")] + #[test] + fn test_create_sqlite_backend_memory() { + let config = ServerConfig { + database_url: Some(":memory:".to_string()), + ..Default::default() + }; + let result = create_sqlite_backend(&config); + assert!(result.is_ok(), "in-memory SQLite backend should succeed"); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_create_sqlite_backend_temp_file() { + let dir = std::env::temp_dir(); + let path = dir.join(format!( + "hfs_test_{}.db", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .subsec_nanos() + )); + let config = ServerConfig { + database_url: Some(path.to_string_lossy().to_string()), + ..Default::default() + }; + let result = create_sqlite_backend(&config); + // Clean up the file even if the assertion fails. + let _ = std::fs::remove_file(&path); + assert!(result.is_ok(), "file-based SQLite backend should succeed"); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_create_sqlite_backend_default_url() { + // When database_url is None, defaults to "fhir.db" in the current dir. + // We redirect to a temp path to avoid creating persistent side-effects. + let dir = std::env::temp_dir(); + let path = dir.join(format!( + "hfs_test_default_{}.db", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .subsec_nanos() + )); + // Override the default path via the env var that ServerConfig reads. + let config = ServerConfig { + database_url: Some(path.to_string_lossy().to_string()), + ..Default::default() + }; + let result = create_sqlite_backend(&config); + let _ = std::fs::remove_file(&path); + assert!(result.is_ok()); + } + + // ── build_search_registry() ─────────────────────────────────── + + #[cfg(feature = "elasticsearch")] + #[test] + fn test_build_search_registry_returns_registry() { + use helios_fhir::FhirVersion; + let registry = build_search_registry(FhirVersion::R4, None); + // Registry should be a valid Arc> and not panic when read. + let _guard = registry.read(); + } + + // ── ServerConfig validation is exercised at startup ────────── + + #[test] + fn test_server_config_default_is_valid() { + let config = ServerConfig::default(); + assert!( + config.validate().is_ok(), + "default ServerConfig should be valid" + ); + } +} diff --git a/crates/persistence/Cargo.toml b/crates/persistence/Cargo.toml index d40c7144..3ea09dd3 100644 --- a/crates/persistence/Cargo.toml +++ b/crates/persistence/Cargo.toml @@ -95,7 +95,7 @@ tokio-test = "0.4" tempfile = "3" criterion = { version = "0.5", features = ["async_tokio"] } testcontainers = "0.26" -testcontainers-modules = { version = "0.14", features = ["postgres", "elastic_search"] } +testcontainers-modules = { version = "0.14", features = ["postgres", "elastic_search", "minio"] } paste = "1.0" # Configuration advisor binary diff --git a/crates/persistence/README.md b/crates/persistence/README.md index 56cb49ef..2b5fa022 100644 --- a/crates/persistence/README.md +++ b/crates/persistence/README.md @@ -382,7 +382,7 @@ Backends can serve as primary (CRUD, versioning, transactions) or secondary (opt | Cassandra + Elasticsearch | Cassandra | Elasticsearch (search) | Planned | Write-heavy + search | | MongoDB alone | MongoDB | — | Planned | Document-centric | | S3 alone | S3 | — | ✓ Implemented (storage-focused) | Archival/bulk/history storage | -| S3 + Elasticsearch | S3 | Elasticsearch (search) | Planned | Large-scale + search | +| S3 + Elasticsearch | S3 | Elasticsearch (search) | ✓ Implemented | Large-scale + search | ### Backend Selection Guide @@ -519,12 +519,116 @@ HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ ./target/release/hfs ``` +### S3 + Elasticsearch + +S3 handles CRUD, versioning, history, and bulk operations. Elasticsearch handles all search operations. Combines S3's cost-effective, durable object storage with Elasticsearch's search capabilities for large-scale deployments. + +- CRUD persistence via S3 objects (current pointer + immutable history versions) +- Versioning (`vread`, optimistic locking via version checks) +- Instance, type, and system history via immutable history objects +- Batch bundles and best-effort transaction bundles +- Bulk export (NDJSON parts + manifest in S3) +- Bulk submit with rollback change log +- Full-text search with relevance scoring (`_text`, `_content`) via Elasticsearch +- All FHIR search parameter types (string, token, date, number, quantity, reference, URI, composite) +- Advanced text search with stemming, boolean operators, and proximity matching (`:text-advanced`) +- Tenant isolation (`PrefixPerTenant` or `BucketPerTenant`) + +**Prerequisites:** An AWS S3 bucket (or S3-compatible service) and a running Elasticsearch 8.x instance. + +```bash +# Build with S3 and Elasticsearch support +cargo build --bin hfs --features s3,elasticsearch --release + +# Start Elasticsearch (example using Docker) +docker run -d --name es -p 9200:9200 \ + -e "discovery.type=single-node" \ + -e "xpack.security.enabled=false" \ + elasticsearch:8.15.0 + +# Start the server (AWS S3) +HFS_STORAGE_BACKEND=s3-elasticsearch \ +HFS_S3_BUCKET=my-fhir-bucket \ +HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ + ./target/release/hfs +``` + +#### S3 Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `HFS_S3_BUCKET` | `hfs` | S3 bucket name | +| `HFS_S3_REGION` | (provider chain) | AWS region override | +| `HFS_S3_PREFIX` | (none) | Optional global key prefix | +| `HFS_S3_VALIDATE_BUCKETS` | `true` | Validate buckets on startup via `HeadBucket` | + +AWS credentials are resolved via the standard AWS provider chain (`AWS_ACCESS_KEY_ID`/`AWS_SECRET_ACCESS_KEY`, EC2 instance metadata, shared credentials file, SSO, etc.). + +#### S3-Compatible Endpoints (MinIO, etc.) + +For S3-compatible services, configure the endpoint programmatically via `S3BackendConfig`: + +```rust +use helios_persistence::backends::s3::{S3BackendConfig, S3TenancyMode}; + +let config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: "minio-bucket".to_string(), + }, + endpoint_url: Some("http://127.0.0.1:9000".to_string()), + allow_http: true, + force_path_style: true, + ..Default::default() +}; +``` + +When `endpoint_url` is set, the backend automatically defaults `force_path_style` to `true` and `region` to `us-east-1` if not otherwise specified. + +#### Key Differences from SQLite/PG + ES + +Unlike SQLite and PostgreSQL, the S3 backend has no built-in search parameter registry. When composing S3 + Elasticsearch, the ES backend creates its own standalone registry with minimal embedded search parameters (`_id`, `_lastUpdated`, `_tag`, `_profile`, `_security`). For full search capability, use `with_shared_registry()` with parameters loaded from spec files. + +```rust +use std::collections::HashMap; +use std::sync::Arc; +use helios_persistence::backends::elasticsearch::{ElasticsearchBackend, ElasticsearchConfig}; +use helios_persistence::backends::s3::{S3Backend, S3BackendConfig}; +use helios_persistence::composite::{CompositeConfig, CompositeStorage, DynStorage, DynSearchProvider}; +use helios_persistence::core::BackendKind; + +// Create S3 backend +let s3_config = S3BackendConfig::default(); +let s3 = Arc::new(S3Backend::from_env_async(s3_config).await?); + +// Create ES backend (standalone registry — S3 has no registry to share) +let es_config = ElasticsearchConfig::default(); +let es = Arc::new(ElasticsearchBackend::new(es_config)?); + +// Build composite +let composite_config = CompositeConfig::builder() + .primary("s3", BackendKind::S3) + .search_backend("es", BackendKind::Elasticsearch) + .build()?; + +let mut backends = HashMap::new(); +backends.insert("s3".to_string(), s3.clone() as DynStorage); +backends.insert("es".to_string(), es.clone() as DynStorage); + +let mut search_providers = HashMap::new(); +search_providers.insert("s3".to_string(), s3.clone() as DynSearchProvider); +search_providers.insert("es".to_string(), es.clone() as DynSearchProvider); + +let composite = CompositeStorage::new(composite_config, backends)? + .with_search_providers(search_providers) + .with_full_primary(s3); +``` + ### How Search Offloading Works -When `HFS_STORAGE_BACKEND` is set to `sqlite-elasticsearch` or `postgres-elasticsearch`, the server: +When `HFS_STORAGE_BACKEND` is set to `sqlite-elasticsearch`, `postgres-elasticsearch`, or `s3-elasticsearch`, the server: -1. Creates the primary backend (SQLite or PostgreSQL) with search indexing **disabled** -2. Creates an Elasticsearch backend sharing the primary backend's search parameter registry +1. Creates the primary backend (SQLite, PostgreSQL, or S3). For SQLite/PG, search indexing is **disabled**; S3 has no search indexing to disable. +2. Creates an Elasticsearch backend. For SQLite/PG, it shares the primary backend's search parameter registry; for S3, it creates its own standalone registry. 3. Wraps both in a `CompositeStorage` that routes: - All **writes** (create, update, delete, conditional ops, transactions) → primary backend, then syncs to ES - All **reads** (read, vread, history) → primary backend @@ -1017,7 +1121,7 @@ The composite storage layer enables polyglot persistence by coordinating multipl | PostgreSQL-only | PostgreSQL | None | ✓ Implemented | Production OLTP | | PostgreSQL + ES | PostgreSQL | Elasticsearch | ✓ Implemented | OLTP + advanced search | | PostgreSQL + Neo4j | PostgreSQL | Neo4j | Planned | Graph-heavy queries | -| S3 + ES | S3 | Elasticsearch | Planned | Large-scale, cheap storage | +| S3 + ES | S3 | Elasticsearch | ✓ Implemented | Large-scale, cheap storage | ### Quick Start diff --git a/crates/persistence/src/backends/s3/client.rs b/crates/persistence/src/backends/s3/client.rs index 04188702..3a9f5244 100644 --- a/crates/persistence/src/backends/s3/client.rs +++ b/crates/persistence/src/backends/s3/client.rs @@ -396,10 +396,16 @@ where .to_string(), ), 404 => S3ClientError::NotFound, - _ if message.is_empty() => S3ClientError::Internal(format!( - "S3 error (HTTP {status}, code={code})" - )), - _ => S3ClientError::Internal(message), + _ => { + let detail = if message.is_empty() { + format!("{:?}", service_err.err()) + } else { + message + }; + S3ClientError::Internal(format!( + "S3 error (HTTP {status}, code={code}): {detail}" + )) + } } } } diff --git a/crates/persistence/src/composite/storage.rs b/crates/persistence/src/composite/storage.rs index 778f3753..689b737d 100644 --- a/crates/persistence/src/composite/storage.rs +++ b/crates/persistence/src/composite/storage.rs @@ -478,7 +478,8 @@ impl CompositeStorage { } // Collect results - let mut primary_result = None; + let mut primary_result: Option = None; + let mut primary_unsupported = false; let mut auxiliary_results = Vec::new(); while let Some(result) = tasks.join_next().await { @@ -491,11 +492,22 @@ impl CompositeStorage { ); if id == primary_id { - primary_result = Some(search_result?); + match search_result { + Ok(r) => primary_result = Some(r), + Err(StorageError::Backend(BackendError::UnsupportedCapability { + .. + })) => { + // Primary doesn't support this search feature (e.g. S3 has no + // full-text search). An auxiliary backend (e.g. Elasticsearch) + // will handle it — promote its result to primary below. + primary_unsupported = true; + } + Err(e) => return Err(e), + } } else if let Ok(res) = search_result { auxiliary_results.push((id, res)); } - // Ignore auxiliary failures - graceful degradation + // Ignore other auxiliary failures - graceful degradation } Err(e) => { warn!(error = %e, "Task join error during parallel search"); @@ -503,6 +515,19 @@ impl CompositeStorage { } } + // When primary lacks search capability and auxiliary has results, promote the + // first auxiliary result to primary so the merger can return it directly. + if primary_unsupported && primary_result.is_none() { + if !auxiliary_results.is_empty() { + let (_, promoted) = auxiliary_results.remove(0); + return Ok((promoted, auxiliary_results)); + } + return Err(StorageError::Backend(BackendError::UnsupportedCapability { + backend_name: primary_id, + capability: "search".to_string(), + })); + } + let primary = primary_result.ok_or_else(|| { StorageError::Backend(BackendError::ConnectionFailed { backend_name: primary_id, @@ -1581,7 +1606,122 @@ impl CapabilityProvider for CompositeStorage { #[cfg(test)] mod tests { use super::*; - use crate::core::BackendKind; + use crate::core::{BackendKind, CapabilityProvider}; + use crate::tenant::{TenantContext, TenantId, TenantPermissions}; + + /// Minimal mock storage for unit testing CompositeStorage. + struct MockStorage; + + #[async_trait::async_trait] + impl ResourceStorage for MockStorage { + fn backend_name(&self) -> &'static str { + "mock" + } + + async fn create( + &self, + tenant: &TenantContext, + resource_type: &str, + resource: Value, + fhir_version: FhirVersion, + ) -> StorageResult { + let id = uuid::Uuid::new_v4().to_string(); + Ok(StoredResource::new( + resource_type, + &id, + tenant.tenant_id().clone(), + resource, + fhir_version, + )) + } + + async fn create_or_update( + &self, + tenant: &TenantContext, + resource_type: &str, + id: &str, + resource: Value, + fhir_version: FhirVersion, + ) -> StorageResult<(StoredResource, bool)> { + Ok(( + StoredResource::new( + resource_type, + id, + tenant.tenant_id().clone(), + resource, + fhir_version, + ), + true, + )) + } + + async fn read( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _id: &str, + ) -> StorageResult> { + Ok(None) + } + + async fn update( + &self, + tenant: &TenantContext, + current: &StoredResource, + resource: Value, + ) -> StorageResult { + Ok(StoredResource::new( + current.resource_type(), + current.id(), + tenant.tenant_id().clone(), + resource, + current.fhir_version(), + )) + } + + async fn delete( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _id: &str, + ) -> StorageResult<()> { + Ok(()) + } + + async fn count( + &self, + _tenant: &TenantContext, + _resource_type: Option<&str>, + ) -> StorageResult { + Ok(0) + } + } + + fn make_tenant() -> TenantContext { + TenantContext::new(TenantId::new("test"), TenantPermissions::full_access()) + } + + fn make_composite_no_secondary() -> CompositeStorage { + let config = CompositeConfig::builder() + .primary("primary", BackendKind::Sqlite) + .build() + .unwrap(); + let mut backends = HashMap::new(); + backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage); + CompositeStorage::new(config, backends).unwrap() + } + + fn make_composite_with_secondary() -> CompositeStorage { + let config = CompositeConfig::builder() + .primary("primary", BackendKind::Sqlite) + .search_backend("es", BackendKind::Elasticsearch) + .build() + .unwrap(); + let mut backends = HashMap::new(); + backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage); + backends.insert("es".to_string(), Arc::new(MockStorage) as DynStorage); + CompositeStorage::new(config, backends).unwrap() + } fn test_config() -> CompositeConfig { CompositeConfig::builder() @@ -1591,18 +1731,483 @@ mod tests { .unwrap() } + // ── BackendHealth ────────────────────────────────────────────── + #[test] fn test_backend_health_default() { let health = BackendHealth::default(); assert!(health.healthy); assert_eq!(health.failure_count, 0); assert!(health.last_error.is_none()); + assert!(health.last_success.is_none()); } + #[test] + fn test_backend_health_clone() { + let health = BackendHealth { + healthy: false, + last_success: None, + failure_count: 5, + last_error: Some("timeout".to_string()), + }; + let cloned = health.clone(); + assert!(!cloned.healthy); + assert_eq!(cloned.failure_count, 5); + assert_eq!(cloned.last_error.as_deref(), Some("timeout")); + } + + // ── CompositeConfig ──────────────────────────────────────────── + #[test] fn test_composite_config() { let config = test_config(); assert_eq!(config.primary_id(), Some("sqlite")); assert_eq!(config.secondaries().count(), 1); } + + // ── CompositeStorage::new() ──────────────────────────────────── + + #[test] + fn test_new_success() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.backend_name(), "composite"); + } + + #[test] + fn test_new_missing_primary_backend_in_map() { + let config = CompositeConfig::builder() + .primary("primary", BackendKind::Sqlite) + .build() + .unwrap(); + // Deliberately omit the primary from the backends map + let backends: HashMap = HashMap::new(); + let result = CompositeStorage::new(config, backends); + assert!(result.is_err()); + } + + #[test] + fn test_new_with_secondary() { + let composite = make_composite_with_secondary(); + assert!(composite.secondary("es").is_some()); + assert!(composite.secondary("nonexistent").is_none()); + } + + // ── Accessors ───────────────────────────────────────────────── + + #[test] + fn test_config_accessor() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.config().primary_id(), Some("primary")); + } + + #[test] + fn test_primary_accessor() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.primary().backend_name(), "mock"); + } + + #[test] + fn test_secondaries_accessor() { + let composite = make_composite_with_secondary(); + assert_eq!(composite.secondaries().len(), 1); + assert!(composite.secondaries().contains_key("es")); + } + + #[test] + fn test_secondaries_empty_when_no_secondary() { + let composite = make_composite_no_secondary(); + assert!(composite.secondaries().is_empty()); + } + + #[test] + fn test_secondary_accessor_present() { + let composite = make_composite_with_secondary(); + assert!(composite.secondary("es").is_some()); + } + + #[test] + fn test_secondary_accessor_absent() { + let composite = make_composite_no_secondary(); + assert!(composite.secondary("missing").is_none()); + } + + // ── Health tracking ─────────────────────────────────────────── + + #[test] + fn test_backend_health_initially_healthy() { + let composite = make_composite_no_secondary(); + let health = composite.backend_health("primary").unwrap(); + assert!(health.healthy); + } + + #[test] + fn test_backend_health_missing_id_returns_none() { + let composite = make_composite_no_secondary(); + assert!(composite.backend_health("nonexistent").is_none()); + } + + #[test] + fn test_is_backend_healthy_true() { + let composite = make_composite_no_secondary(); + assert!(composite.is_backend_healthy("primary")); + } + + #[test] + fn test_is_backend_healthy_unknown_returns_false() { + let composite = make_composite_no_secondary(); + assert!(!composite.is_backend_healthy("nonexistent")); + } + + #[test] + fn test_update_health_success_resets_failures() { + let composite = make_composite_no_secondary(); + // First, record a few failures + composite.update_health("primary", false, Some("err1".to_string())); + composite.update_health("primary", false, Some("err2".to_string())); + let health = composite.backend_health("primary").unwrap(); + assert_eq!(health.failure_count, 2); + + // Now record a success — should reset + composite.update_health("primary", true, None); + let health = composite.backend_health("primary").unwrap(); + assert!(health.healthy); + assert_eq!(health.failure_count, 0); + assert!(health.last_error.is_none()); + assert!(health.last_success.is_some()); + } + + #[test] + fn test_update_health_failure_increments_count() { + let composite = make_composite_no_secondary(); + composite.update_health("primary", false, Some("timeout".to_string())); + let health = composite.backend_health("primary").unwrap(); + assert_eq!(health.failure_count, 1); + assert_eq!(health.last_error.as_deref(), Some("timeout")); + } + + #[test] + fn test_update_health_marks_unhealthy_after_threshold() { + let composite = make_composite_no_secondary(); + // Default failure_threshold is 3 + let threshold = composite.config.health_config.failure_threshold; + for i in 0..threshold { + composite.update_health("primary", false, Some(format!("error {}", i))); + } + let health = composite.backend_health("primary").unwrap(); + assert!(!health.healthy); + assert_eq!(health.failure_count, threshold); + } + + #[test] + fn test_update_health_ignores_unknown_backend() { + let composite = make_composite_no_secondary(); + // Should not panic on unknown backend ID + composite.update_health("nonexistent", false, Some("err".to_string())); + } + + // ── sync_to_secondaries when sync_manager is None ───────────── + + #[tokio::test] + async fn test_sync_to_secondaries_no_sync_manager_returns_ok() { + let composite = make_composite_no_secondary(); + // No sync manager (no secondaries) → should return Ok immediately + let result = composite + .sync_to_secondaries(super::super::sync::SyncEvent::Delete { + resource_type: "Patient".to_string(), + resource_id: "1".to_string(), + tenant_id: TenantId::new("test"), + }) + .await; + assert!(result.is_ok()); + } + + // ── routing_error_to_storage_error ──────────────────────────── + + #[test] + fn test_routing_error_no_primary_backend() { + let composite = make_composite_no_secondary(); + let err = composite.routing_error_to_storage_error(RoutingError::NoPrimaryBackend); + match err { + StorageError::Backend(BackendError::Unavailable { backend_name, .. }) => { + assert_eq!(backend_name, "primary"); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[test] + fn test_routing_error_no_capable_backend() { + use super::super::analyzer::QueryFeature; + let composite = make_composite_no_secondary(); + let err = composite.routing_error_to_storage_error(RoutingError::NoCapableBackend { + feature: QueryFeature::FullTextSearch, + }); + match err { + StorageError::Backend(BackendError::UnsupportedCapability { + backend_name, + capability, + }) => { + assert_eq!(backend_name, "composite"); + assert!(!capability.is_empty()); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[test] + fn test_routing_error_backend_unavailable() { + let composite = make_composite_no_secondary(); + let err = composite.routing_error_to_storage_error(RoutingError::BackendUnavailable { + backend_id: "my-backend".to_string(), + }); + match err { + StorageError::Backend(BackendError::ConnectionFailed { backend_name, .. }) => { + assert_eq!(backend_name, "my-backend"); + } + other => panic!("unexpected error: {:?}", other), + } + } + + // ── CapabilityProvider ──────────────────────────────────────── + + #[test] + fn test_capabilities_backend_name() { + let composite = make_composite_no_secondary(); + let caps = composite.capabilities(); + assert_eq!(caps.backend_name, "composite"); + } + + #[test] + fn test_backend_name_is_composite() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.backend_name(), "composite"); + } + + // ── "No capability" error paths ─────────────────────────────── + + #[tokio::test] + async fn test_conditional_create_no_capability() { + use crate::core::ConditionalStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite + .conditional_create( + &tenant, + "Patient", + serde_json::json!({}), + "identifier=foo", + FhirVersion::default(), + ) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_versioned_storage_vread_no_capability() { + use crate::core::VersionedStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.vread(&tenant, "Patient", "1", "1").await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("VersionedStorage")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[tokio::test] + async fn test_instance_history_no_capability() { + use crate::core::InstanceHistoryProvider; + use crate::core::history::HistoryParams; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite + .history_instance(&tenant, "Patient", "1", &HistoryParams::default()) + .await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("InstanceHistoryProvider")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[tokio::test] + async fn test_bundle_provider_process_batch_no_capability() { + use crate::core::BundleProvider; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.process_batch(&tenant, vec![]).await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("BundleProvider")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[tokio::test] + async fn test_bundle_provider_process_transaction_no_capability() { + use crate::core::BundleProvider; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.process_transaction(&tenant, vec![]).await; + assert!(result.is_err()); + } + + // ── search_count when no search provider ────────────────────── + + #[tokio::test] + async fn test_search_count_no_search_provider_returns_error() { + use crate::core::SearchProvider; + use crate::types::SearchQuery; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let query = SearchQuery::new("Patient"); + let result = composite.search_count(&tenant, &query).await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("search_count")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + // ── with_search_providers ───────────────────────────────────── + + #[test] + fn test_with_search_providers() { + let composite = make_composite_no_secondary(); + let mut providers = HashMap::new(); + providers.insert( + "primary".to_string(), + Arc::new(MockStorage) as DynSearchProvider, + ); + let composite = composite.with_search_providers(providers); + // Should have one search provider now + assert!(composite.search_providers.contains_key("primary")); + } + + // ── extract_reference_values (static method) ────────────────── + + #[test] + fn test_extract_reference_values_object_with_reference() { + let obj = serde_json::json!({"reference": "Patient/123"}); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&obj, &mut refs); + assert_eq!(refs, vec!["Patient/123"]); + } + + #[test] + fn test_extract_reference_values_object_without_reference() { + let obj = serde_json::json!({"display": "John Smith"}); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&obj, &mut refs); + assert!(refs.is_empty()); + } + + #[test] + fn test_extract_reference_values_array() { + let arr = serde_json::json!([ + {"reference": "Patient/1"}, + {"reference": "Patient/2"}, + {"display": "no ref here"} + ]); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&arr, &mut refs); + assert_eq!(refs.len(), 2); + assert!(refs.contains(&"Patient/1".to_string())); + assert!(refs.contains(&"Patient/2".to_string())); + } + + #[test] + fn test_extract_reference_values_primitive_ignored() { + let val = serde_json::json!("just a string"); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&val, &mut refs); + assert!(refs.is_empty()); + } + + #[test] + fn test_extract_reference_values_null_ignored() { + let val = serde_json::Value::Null; + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&val, &mut refs); + assert!(refs.is_empty()); + } + + // ── CRUD delegation to primary ──────────────────────────────── + + #[tokio::test] + async fn test_create_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite + .create( + &tenant, + "Patient", + serde_json::json!({"resourceType": "Patient"}), + FhirVersion::default(), + ) + .await; + assert!(result.is_ok()); + let stored = result.unwrap(); + assert_eq!(stored.resource_type(), "Patient"); + } + + #[tokio::test] + async fn test_read_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.read(&tenant, "Patient", "1").await; + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); // MockStorage always returns None + } + + #[tokio::test] + async fn test_count_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.count(&tenant, Some("Patient")).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 0); + } + + #[tokio::test] + async fn test_delete_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.delete(&tenant, "Patient", "1").await; + assert!(result.is_ok()); + } + + // SearchProvider impl for MockStorage (must live inside test module). + #[async_trait::async_trait] + impl SearchProvider for MockStorage { + async fn search( + &self, + _tenant: &TenantContext, + _query: &crate::types::SearchQuery, + ) -> StorageResult { + use crate::types::Page; + Ok(SearchResult::new(Page::empty())) + } + + async fn search_count( + &self, + _tenant: &TenantContext, + _query: &crate::types::SearchQuery, + ) -> StorageResult { + Ok(0) + } + } } diff --git a/crates/persistence/tests/s3_es_tests.rs b/crates/persistence/tests/s3_es_tests.rs new file mode 100644 index 00000000..d2dd6e6a --- /dev/null +++ b/crates/persistence/tests/s3_es_tests.rs @@ -0,0 +1,773 @@ +//! S3 + Elasticsearch composite backend integration tests. +//! +//! These tests verify that the S3 backend (primary) and Elasticsearch backend (search) +//! work together correctly as a composite storage unit. +//! +//! Tests are opt-in via `RUN_MINIO_S3_ES_TESTS=1` and require Docker to be running. +//! MinIO is used as an S3-compatible store; Elasticsearch is the search backend. +//! +//! Run with: +//! RUN_MINIO_S3_ES_TESTS=1 cargo test -p helios-persistence \ +//! --features s3,elasticsearch -- s3_es + +#![cfg(all(feature = "s3", feature = "elasticsearch"))] + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use helios_fhir::FhirVersion; +use parking_lot::RwLock; +use serde_json::json; + +use helios_persistence::backends::elasticsearch::{ElasticsearchBackend, ElasticsearchConfig}; +use helios_persistence::backends::s3::{S3Backend, S3BackendConfig, S3TenancyMode}; +use helios_persistence::composite::{ + CompositeConfig, CompositeStorage, DynSearchProvider, DynStorage, +}; +use helios_persistence::core::search::SearchProvider; +use helios_persistence::core::{Backend, BackendKind, ResourceStorage}; +use helios_persistence::error::{ResourceError, StorageError}; +use helios_persistence::search::{SearchParameterLoader, SearchParameterRegistry}; +use helios_persistence::tenant::{TenantContext, TenantId, TenantPermissions}; +use helios_persistence::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue}; + +use aws_config::{BehaviorVersion, Region}; +use aws_sdk_s3::Client; +use aws_sdk_s3::config::Credentials; + +use testcontainers::GenericImage; +use testcontainers::ImageExt; +use testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers::runners::AsyncRunner; +use testcontainers_modules::elastic_search::ElasticSearch; +use tokio::sync::OnceCell; +use uuid::Uuid; + +// ============================================================================ +// Container setup +// ============================================================================ + +const DEFAULT_MINIO_IMAGE: &str = "minio/minio"; +const DEFAULT_MINIO_TAG: &str = "RELEASE.2025-02-28T09-55-16Z"; +const DEFAULT_MINIO_ROOT_USER: &str = "minioadmin"; +const DEFAULT_MINIO_ROOT_PASSWORD: &str = "minioadmin"; + +struct SharedMinio { + endpoint_url: String, + root_user: String, + root_password: String, + _container: testcontainers::ContainerAsync, +} + +struct SharedEs { + host: String, + port: u16, + _container: testcontainers::ContainerAsync, +} + +static SHARED_MINIO: OnceCell = OnceCell::const_new(); +static SHARED_ES: OnceCell = OnceCell::const_new(); +static MINIO_AWS_ENV: std::sync::Once = std::sync::Once::new(); + +fn run_s3_es_tests() -> bool { + std::env::var("RUN_MINIO_S3_ES_TESTS").ok().as_deref() == Some("1") +} + +fn skip_if_disabled(test_name: &str) -> bool { + if run_s3_es_tests() { + return false; + } + eprintln!("skipping S3+ES test {test_name} (set RUN_MINIO_S3_ES_TESTS=1 to enable)"); + true +} + +async fn shared_minio() -> &'static SharedMinio { + SHARED_MINIO + .get_or_init(|| async { + let image = + std::env::var("MINIO_IMAGE").unwrap_or_else(|_| DEFAULT_MINIO_IMAGE.to_string()); + let tag = std::env::var("MINIO_TAG").unwrap_or_else(|_| DEFAULT_MINIO_TAG.to_string()); + let root_user = std::env::var("MINIO_ROOT_USER") + .unwrap_or_else(|_| DEFAULT_MINIO_ROOT_USER.to_string()); + let root_password = std::env::var("MINIO_ROOT_PASSWORD") + .unwrap_or_else(|_| DEFAULT_MINIO_ROOT_PASSWORD.to_string()); + + let container = GenericImage::new(image, tag) + .with_wait_for(WaitFor::message_on_stderr("API:")) + .with_exposed_port(9000.tcp()) + .with_env_var("MINIO_ROOT_USER", root_user.clone()) + .with_env_var("MINIO_ROOT_PASSWORD", root_password.clone()) + .with_cmd(["server", "/data", "--console-address", ":9001"]) + .start() + .await + .expect("failed to start MinIO container"); + + let host = container + .get_host() + .await + .expect("failed to resolve MinIO host") + .to_string(); + let port = container + .get_host_port_ipv4(9000) + .await + .expect("failed to resolve MinIO port"); + + SharedMinio { + endpoint_url: format!("http://{host}:{port}"), + root_user, + root_password, + _container: container, + } + }) + .await +} + +async fn shared_es() -> &'static SharedEs { + SHARED_ES + .get_or_init(|| async { + let run_id = std::env::var("GITHUB_RUN_ID").unwrap_or_default(); + let container = ElasticSearch::default() + .with_env_var("ES_JAVA_OPTS", "-Xms256m -Xmx256m") + .with_label("github.run_id", &run_id) + .with_startup_timeout(std::time::Duration::from_secs(120)) + .start() + .await + .expect("failed to start Elasticsearch container"); + + let port = container + .get_host_port_ipv4(9200) + .await + .expect("failed to get ES port"); + let host = container + .get_host() + .await + .expect("failed to get ES host") + .to_string(); + + SharedEs { + host, + port, + _container: container, + } + }) + .await +} + +fn ensure_minio_env(shared: &SharedMinio) { + MINIO_AWS_ENV.call_once(|| { + // SAFETY: executes exactly once before any S3 backend construction. + unsafe { + std::env::set_var("AWS_ACCESS_KEY_ID", &shared.root_user); + std::env::set_var("AWS_SECRET_ACCESS_KEY", &shared.root_password); + std::env::set_var("AWS_REGION", "us-east-1"); + std::env::set_var("AWS_EC2_METADATA_DISABLED", "true"); + } + }); +} + +async fn build_sdk_client(shared: &SharedMinio) -> Client { + let creds = Credentials::new( + shared.root_user.clone(), + shared.root_password.clone(), + None, + None, + "s3-es-tests", + ); + let sdk_config = aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new("us-east-1")) + .endpoint_url(shared.endpoint_url.clone()) + .credentials_provider(creds) + .load() + .await; + let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) + .force_path_style(true) + .build(); + Client::from_conf(s3_config) +} + +async fn ensure_bucket(client: &Client, bucket: &str) { + if client.head_bucket().bucket(bucket).send().await.is_ok() { + return; + } + client + .create_bucket() + .bucket(bucket) + .send() + .await + .expect("failed to create test bucket"); +} + +fn build_search_registry() -> Arc> { + let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(|p| p.parent()) + .map(|p| p.join("data")) + .unwrap_or_else(|| PathBuf::from("data")); + + let loader = SearchParameterLoader::new(FhirVersion::default()); + let mut registry = SearchParameterRegistry::new(); + + if let Ok(params) = loader.load_embedded() { + for p in params { + let _ = registry.register(p); + } + } + if let Ok(params) = loader.load_from_spec_file(&data_dir) { + for p in params { + let _ = registry.register(p); + } + } + + Arc::new(RwLock::new(registry)) +} + +// ============================================================================ +// Composite harness +// ============================================================================ + +struct S3EsHarness { + composite: CompositeStorage, + #[allow(dead_code)] + bucket: String, +} + +async fn make_harness(scope: &str) -> S3EsHarness { + let minio = shared_minio().await; + let es = shared_es().await; + ensure_minio_env(minio); + + let sdk_client = build_sdk_client(minio).await; + let bucket = format!("hfs-s3es-{}", Uuid::new_v4().simple()); + ensure_bucket(&sdk_client, &bucket).await; + + let prefix = format!("test/{}/{}", Uuid::new_v4(), scope); + + // S3 backend + let s3_config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: bucket.clone(), + }, + prefix: Some(prefix), + region: Some("us-east-1".to_string()), + endpoint_url: Some(minio.endpoint_url.clone()), + force_path_style: true, + allow_http: true, + validate_buckets_on_startup: true, + ..Default::default() + }; + let s3 = Arc::new(S3Backend::from_env(s3_config).expect("create S3 backend")); + + // Elasticsearch backend + let unique_prefix = format!("hfs_{}", Uuid::new_v4().simple()); + let es_config = ElasticsearchConfig { + nodes: vec![format!("http://{}:{}", es.host, es.port)], + index_prefix: unique_prefix, + number_of_replicas: 0, + refresh_interval: "1ms".to_string(), + ..Default::default() + }; + let search_registry = build_search_registry(); + let es_backend = Arc::new( + ElasticsearchBackend::with_shared_registry(es_config, search_registry) + .expect("create ES backend"), + ); + es_backend + .initialize() + .await + .expect("initialize ES backend"); + + // Composite + let composite_config = CompositeConfig::builder() + .primary("s3", BackendKind::S3) + .search_backend("es", BackendKind::Elasticsearch) + .build() + .expect("build composite config"); + + let mut backends: HashMap = HashMap::new(); + backends.insert("s3".to_string(), s3.clone() as DynStorage); + backends.insert("es".to_string(), es_backend.clone() as DynStorage); + + let mut search_providers: HashMap = HashMap::new(); + search_providers.insert("s3".to_string(), s3.clone() as DynSearchProvider); + search_providers.insert("es".to_string(), es_backend.clone() as DynSearchProvider); + + let composite = CompositeStorage::new(composite_config, backends) + .expect("create composite storage") + .with_search_providers(search_providers) + .with_full_primary(s3); + + S3EsHarness { composite, bucket } +} + +fn tenant(id: &str) -> TenantContext { + TenantContext::new(TenantId::new(id), TenantPermissions::full_access()) +} + +// ============================================================================ +// Tests +// ============================================================================ + +/// Write a Patient to S3, then verify it appears in ES search by name. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_create_then_search() { + if skip_if_disabled("s3_es_test_create_then_search") { + return; + } + + let harness = make_harness("create-search").await; + let tenant = tenant("s3-es-tenant"); + + let family = format!("TestFamily-{}", Uuid::new_v4().simple()); + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({ + "resourceType": "Patient", + "name": [{"family": family}] + }), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + // Allow ES refresh (1ms interval configured, but give a little time) + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let query = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&family)], + chain: vec![], + components: vec![], + }); + + let results = harness + .composite + .search(&tenant, &query) + .await + .expect("search should succeed"); + + assert!( + !results.resources.items.is_empty(), + "expected at least one result for family={family}" + ); + assert!( + results + .resources + .items + .iter() + .any(|r| r.id() == created.id()) + ); +} + +/// Update a Patient, then verify search returns updated fields. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_update_then_search() { + if skip_if_disabled("s3_es_test_update_then_search") { + return; + } + + let harness = make_harness("update-search").await; + let tenant = tenant("s3-es-tenant"); + + let original_family = format!("OrigFamily-{}", Uuid::new_v4().simple()); + let updated_family = format!("UpdatedFamily-{}", Uuid::new_v4().simple()); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "name": [{"family": original_family}]}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .update( + &tenant, + &created, + json!({"resourceType": "Patient", "id": created.id(), "name": [{"family": updated_family}]}), + ) + .await + .expect("update should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Search by new family name — should find it + let query_new = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&updated_family)], + chain: vec![], + components: vec![], + }); + let results = harness + .composite + .search(&tenant, &query_new) + .await + .expect("search by new family should succeed"); + assert!( + results + .resources + .items + .iter() + .any(|r| r.id() == created.id()), + "updated resource should appear in search" + ); +} + +/// Delete a resource, verify it no longer appears in search results. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_delete_then_search() { + if skip_if_disabled("s3_es_test_delete_then_search") { + return; + } + + let harness = make_harness("delete-search").await; + let tenant = tenant("s3-es-tenant"); + + let family = format!("DeleteMe-{}", Uuid::new_v4().simple()); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "name": [{"family": family}]}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + harness + .composite + .delete(&tenant, "Patient", created.id()) + .await + .expect("delete should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let query = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&family)], + chain: vec![], + components: vec![], + }); + let results = harness + .composite + .search(&tenant, &query) + .await + .expect("search after delete should not error"); + assert!( + results + .resources + .items + .iter() + .all(|r| r.id() != created.id()), + "deleted resource should not appear in search" + ); +} + +/// Call `_history` on a resource after updates — versions must come from S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_history_from_s3() { + if skip_if_disabled("s3_es_test_history_from_s3") { + return; + } + + use helios_persistence::core::history::{HistoryParams, InstanceHistoryProvider}; + + let harness = make_harness("history-s3").await; + let tenant = tenant("s3-es-tenant"); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .update( + &tenant, + &created, + json!({"resourceType": "Patient", "id": created.id(), "active": false}), + ) + .await + .expect("update should succeed"); + + let history = harness + .composite + .history_instance( + &tenant, + "Patient", + created.id(), + &HistoryParams::new().include_deleted(true), + ) + .await + .expect("history should succeed"); + + assert!( + history.items.len() >= 2, + "expected at least 2 history versions, got {}", + history.items.len() + ); + let ids: Vec<&str> = history + .items + .iter() + .map(|e| e.resource.version_id()) + .collect(); + assert!(ids.contains(&"1"), "version 1 should be in history"); + assert!(ids.contains(&"2"), "version 2 should be in history"); +} + +/// Confirm `_vread` reads from S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_vread_from_s3() { + if skip_if_disabled("s3_es_test_vread_from_s3") { + return; + } + + use helios_persistence::core::VersionedStorage; + + let harness = make_harness("vread-s3").await; + let tenant = tenant("s3-es-tenant"); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .update( + &tenant, + &created, + json!({"resourceType": "Patient", "id": created.id(), "active": false}), + ) + .await + .expect("update should succeed"); + + let v1 = harness + .composite + .vread(&tenant, "Patient", created.id(), "1") + .await + .expect("vread v1 should succeed") + .expect("v1 should exist"); + + assert_eq!(v1.version_id(), "1"); + assert_eq!(v1.content()["active"], true); +} + +/// Multi-tenant isolation: resource written to tenant-a must not appear in tenant-b search. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_multi_tenant_isolation() { + if skip_if_disabled("s3_es_test_multi_tenant_isolation") { + return; + } + + let harness = make_harness("tenant-isolation").await; + let tenant_a = tenant("tenant-a"); + let tenant_b = tenant("tenant-b"); + + let family = format!("IsolatedFamily-{}", Uuid::new_v4().simple()); + + harness + .composite + .create( + &tenant_a, + "Patient", + json!({"resourceType": "Patient", "name": [{"family": family}]}), + FhirVersion::default(), + ) + .await + .expect("create in tenant-a should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let query = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&family)], + chain: vec![], + components: vec![], + }); + + // tenant-b should NOT see tenant-a's data + let results_b = harness + .composite + .search(&tenant_b, &query) + .await + .expect("tenant-b search should succeed"); + assert!( + results_b.resources.items.is_empty(), + "tenant-b should not see tenant-a's resource" + ); + + // tenant-a should see its own data + let results_a = harness + .composite + .search(&tenant_a, &query) + .await + .expect("tenant-a search should succeed"); + assert!( + !results_a.resources.items.is_empty(), + "tenant-a should see its own resource" + ); +} + +/// Read returns the resource after it was written to S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_read_from_s3() { + if skip_if_disabled("s3_es_test_read_from_s3") { + return; + } + + let harness = make_harness("read-s3").await; + let tenant = tenant("s3-es-tenant"); + + let id = format!("patient-{}", Uuid::new_v4()); + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "id": id, "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + let read = harness + .composite + .read(&tenant, "Patient", created.id()) + .await + .expect("read should succeed") + .expect("resource should exist"); + + assert_eq!(read.id(), created.id()); + assert_eq!(read.content()["active"], true); +} + +/// Confirm read returns Not Found for missing resources. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_read_missing_returns_none() { + if skip_if_disabled("s3_es_test_read_missing_returns_none") { + return; + } + + let harness = make_harness("read-missing").await; + let tenant = tenant("s3-es-tenant"); + + let result = harness + .composite + .read(&tenant, "Patient", "does-not-exist") + .await + .expect("read of missing should not error"); + assert!(result.is_none()); +} + +/// Verify that S3 SearchProvider returns UnsupportedCapability, confirming +/// the composite routes search to ES and not S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_s3_search_returns_unsupported() { + if skip_if_disabled("s3_es_test_s3_search_returns_unsupported") { + return; + } + + let minio = shared_minio().await; + ensure_minio_env(minio); + + let sdk_client = build_sdk_client(minio).await; + let bucket = format!("hfs-s3-only-{}", Uuid::new_v4().simple()); + ensure_bucket(&sdk_client, &bucket).await; + + let s3_config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: bucket.clone(), + }, + region: Some("us-east-1".to_string()), + endpoint_url: Some(minio.endpoint_url.clone()), + force_path_style: true, + allow_http: true, + validate_buckets_on_startup: true, + ..Default::default() + }; + let s3 = S3Backend::from_env(s3_config).expect("create S3 backend"); + let tenant_ctx = tenant("s3-search-test"); + + let query = SearchQuery::new("Patient"); + let result = s3.search(&tenant_ctx, &query).await; + + // S3 alone must return UnsupportedCapability for search + // S3 search returns StorageError::Backend(BackendError::UnsupportedCapability) + assert!( + result.is_err(), + "S3 SearchProvider stub must return an error, got: {result:?}" + ); +} + +/// After delete, reading the resource returns a Gone error (S3 marks it deleted). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_read_after_delete_returns_gone() { + if skip_if_disabled("s3_es_test_read_after_delete_returns_gone") { + return; + } + + let harness = make_harness("gone").await; + let tenant = tenant("s3-es-tenant"); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .delete(&tenant, "Patient", created.id()) + .await + .expect("delete should succeed"); + + let result = harness + .composite + .read(&tenant, "Patient", created.id()) + .await; + + assert!( + matches!( + result, + Err(StorageError::Resource(ResourceError::Gone { .. })) + ), + "read after delete should return Gone, got: {result:?}" + ); +} diff --git a/crates/rest/src/config.rs b/crates/rest/src/config.rs index 2bc84d9a..01c153b0 100644 --- a/crates/rest/src/config.rs +++ b/crates/rest/src/config.rs @@ -66,6 +66,9 @@ pub enum StorageBackendMode { /// AWS S3 object storage for CRUD, versioning, history, and bulk operations. /// Requires AWS credentials via the standard provider chain. No search support. S3, + /// AWS S3 for CRUD/history + Elasticsearch for search. + /// Requires AWS credentials and a running Elasticsearch instance. + S3Elasticsearch, } impl fmt::Display for StorageBackendMode { @@ -78,6 +81,7 @@ impl fmt::Display for StorageBackendMode { write!(f, "postgres-elasticsearch") } StorageBackendMode::S3 => write!(f, "s3"), + StorageBackendMode::S3Elasticsearch => write!(f, "s3-elasticsearch"), } } } @@ -94,8 +98,9 @@ impl FromStr for StorageBackendMode { Ok(StorageBackendMode::PostgresElasticsearch) } "s3" | "objectstore" => Ok(StorageBackendMode::S3), + "s3-elasticsearch" | "s3-es" => Ok(StorageBackendMode::S3Elasticsearch), _ => Err(format!( - "Invalid storage backend '{}'. Valid values: sqlite, sqlite-elasticsearch, postgres, postgres-elasticsearch, s3", + "Invalid storage backend '{}'. Valid values: sqlite, sqlite-elasticsearch, postgres, postgres-elasticsearch, s3, s3-elasticsearch", s )), } @@ -304,7 +309,7 @@ pub struct ServerConfig { #[arg(long, env = "HFS_MAX_PAGE_SIZE", default_value = "1000")] pub max_page_size: usize, - /// Storage backend mode: sqlite (default), sqlite-elasticsearch, postgres, postgres-elasticsearch, or s3. + /// Storage backend mode: sqlite (default), sqlite-elasticsearch, postgres, postgres-elasticsearch, s3, or s3-elasticsearch. #[arg(long, env = "HFS_STORAGE_BACKEND", default_value = "sqlite")] pub storage_backend: String, @@ -641,6 +646,18 @@ mod tests { "S3".parse::().unwrap(), StorageBackendMode::S3 ); + assert_eq!( + "s3-elasticsearch".parse::().unwrap(), + StorageBackendMode::S3Elasticsearch + ); + assert_eq!( + "s3-es".parse::().unwrap(), + StorageBackendMode::S3Elasticsearch + ); + assert_eq!( + "S3-ES".parse::().unwrap(), + StorageBackendMode::S3Elasticsearch + ); assert!("invalid".parse::().is_err()); } @@ -657,6 +674,10 @@ mod tests { "postgres-elasticsearch" ); assert_eq!(StorageBackendMode::S3.to_string(), "s3"); + assert_eq!( + StorageBackendMode::S3Elasticsearch.to_string(), + "s3-elasticsearch" + ); } #[test] @@ -675,4 +696,214 @@ mod tests { assert!(!config.strict_validation); assert_eq!(config.jwt_tenant_claim, "tenant_id"); } + + // ── validate() – max_body_size == 0 ─────────────────────────── + + #[test] + fn test_validate_max_body_size_zero() { + let config = ServerConfig { + max_body_size: 0, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + assert!(errors.iter().any(|e| e.contains("body size"))); + } + + // ── validate() – request_timeout == 0 ──────────────────────── + + #[test] + fn test_validate_request_timeout_zero() { + let config = ServerConfig { + request_timeout: 0, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + assert!(errors.iter().any(|e| e.contains("timeout"))); + } + + // ── validate() – default_page_size == 0 ────────────────────── + + #[test] + fn test_validate_default_page_size_zero() { + let config = ServerConfig { + default_page_size: 0, + max_page_size: 100, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + assert!(errors.iter().any(|e| e.contains("page size"))); + } + + // ── validate() – multiple errors at once ───────────────────── + + #[test] + fn test_validate_multiple_errors() { + let config = ServerConfig { + max_body_size: 0, + request_timeout: 0, + default_page_size: 0, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + // At least the three errors above should be present + assert!(errors.len() >= 3); + } + + // ── full_base_url() ─────────────────────────────────────────── + + #[test] + fn test_full_base_url() { + let config = ServerConfig { + base_url: "https://fhir.example.com".to_string(), + ..Default::default() + }; + assert_eq!(config.full_base_url(), "https://fhir.example.com"); + } + + #[test] + fn test_full_base_url_default() { + let config = ServerConfig::default(); + assert_eq!(config.full_base_url(), "http://localhost:8080"); + } + + // ── multitenancy() accessor ─────────────────────────────────── + + #[test] + fn test_multitenancy_accessor() { + let config = ServerConfig::default(); + let mt = config.multitenancy(); + assert_eq!(mt.routing_mode, TenantRoutingMode::HeaderOnly); + } + + // ── StorageBackendMode::default() ───────────────────────────── + + #[test] + fn test_storage_backend_mode_default() { + let mode = StorageBackendMode::default(); + assert_eq!(mode, StorageBackendMode::Sqlite); + } + + // ── TenantRoutingMode::default() ────────────────────────────── + + #[test] + fn test_tenant_routing_mode_default() { + let mode = TenantRoutingMode::default(); + assert_eq!(mode, TenantRoutingMode::HeaderOnly); + } + + // ── Alias parsing variations ────────────────────────────────── + + #[test] + fn test_tenant_routing_mode_aliases() { + // headeronly / header + assert_eq!( + "headeronly".parse::().unwrap(), + TenantRoutingMode::HeaderOnly + ); + assert_eq!( + "header".parse::().unwrap(), + TenantRoutingMode::HeaderOnly + ); + // urlpath / url / path + assert_eq!( + "urlpath".parse::().unwrap(), + TenantRoutingMode::UrlPath + ); + assert_eq!( + "url".parse::().unwrap(), + TenantRoutingMode::UrlPath + ); + assert_eq!( + "path".parse::().unwrap(), + TenantRoutingMode::UrlPath + ); + // combined + assert_eq!( + "combined".parse::().unwrap(), + TenantRoutingMode::Both + ); + } + + #[test] + fn test_tenant_routing_mode_invalid() { + assert!("unknown_mode".parse::().is_err()); + } + + // ── MultitenancyConfig struct field behaviour (without env) ── + + #[test] + fn test_multitenancy_config_strict_validation_field() { + // Test the struct directly, avoiding env-var parallelism issues. + let config = MultitenancyConfig { + routing_mode: TenantRoutingMode::UrlPath, + strict_validation: true, + jwt_tenant_claim: "custom_claim".to_string(), + }; + assert_eq!(config.routing_mode, TenantRoutingMode::UrlPath); + assert!(config.strict_validation); + assert_eq!(config.jwt_tenant_claim, "custom_claim"); + } + + #[test] + fn test_multitenancy_config_from_env_routing_mode_parsed() { + // Parse the routing mode value the same way from_env does, without + // touching global env state. + let result: Result = "url_path".parse(); + assert_eq!(result.unwrap(), TenantRoutingMode::UrlPath); + } + + #[test] + fn test_multitenancy_strict_validation_string_parsing() { + // Mirror the logic inside MultitenancyConfig::from_env. + let parse_strict = |s: &str| -> bool { s.to_lowercase() == "true" || s == "1" }; + assert!(parse_strict("true")); + assert!(parse_strict("TRUE")); + assert!(parse_strict("1")); + assert!(!parse_strict("false")); + assert!(!parse_strict("0")); + assert!(!parse_strict("yes")); + } + + // ── storage_backend_mode() – invalid value ──────────────────── + + #[test] + fn test_storage_backend_mode_invalid_returns_error() { + let config = ServerConfig { + storage_backend: "unknown_backend".to_string(), + ..Default::default() + }; + assert!(config.storage_backend_mode().is_err()); + } + + // ── display for StorageBackendMode ──────────────────────────── + + #[test] + fn test_storage_backend_mode_display_all_variants() { + // Already partially tested, but ensure every variant round-trips + for (variant, expected) in [ + (StorageBackendMode::Sqlite, "sqlite"), + ( + StorageBackendMode::SqliteElasticsearch, + "sqlite-elasticsearch", + ), + (StorageBackendMode::Postgres, "postgres"), + ( + StorageBackendMode::PostgresElasticsearch, + "postgres-elasticsearch", + ), + (StorageBackendMode::S3, "s3"), + (StorageBackendMode::S3Elasticsearch, "s3-elasticsearch"), + ] { + assert_eq!(variant.to_string(), expected); + assert_eq!(expected.parse::().unwrap(), variant); + } + } }