From 7046d1511388223a9d8a0281b155f21feafde92f Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 20:51:28 -0500 Subject: [PATCH 01/17] feat(hfs): add S3 + Elasticsearch composite backend (s3-elasticsearch) Add `s3-elasticsearch` as a new composite storage mode that routes all CRUD, versioning, history, and bulk operations to S3 while offloading all search queries to Elasticsearch. Key differences from SQLite/PostgreSQL+ES composites: - Uses `ElasticsearchBackend::new()` (standalone registry) instead of `with_shared_registry()`, since S3 has no search parameter registry - No `set_search_offloaded()` call needed; S3's stub SearchProvider already returns UnsupportedCapability for all search operations Changes: - rest/config.rs: add `S3Elasticsearch` variant with aliases `s3-elasticsearch` and `s3-es`; update error message and arg doc; add parse/display tests - hfs/main.rs: add match arm and `start_s3_elasticsearch()` with cfg feature guards for `s3` + `elasticsearch`; update module docs - CLAUDE.md: add s3-elasticsearch row to storage backends table - README.md: add S3+ES to configurations table, running example, env var - persistence/README.md: mark S3+Elasticsearch as implemented in both role matrix tables; update search offloading paragraph --- CLAUDE.md | 2 + README.md | 11 ++- crates/hfs/src/main.rs | 134 ++++++++++++++++++++++++++++++++++- crates/persistence/README.md | 6 +- crates/rest/src/config.rs | 25 ++++++- 5 files changed, 171 insertions(+), 7 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 143d0a7c..bff698d2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -258,6 +258,8 @@ HFS_SERVER_PORT=3000 HFS_LOG_LEVEL=debug cargo run --bin hfs | SQLite + Elasticsearch | `sqlite-elasticsearch` or `sqlite-es` | SQLite for CRUD, ES for search | | PostgreSQL | `postgres` or `pg` or `postgresql` | PostgreSQL only | | PostgreSQL + Elasticsearch | `postgres-elasticsearch` or `pg-es` | PG for CRUD, ES for search | +| S3 | `s3` | AWS S3 object storage for CRUD, versioning, history, bulk ops (no search) | +| S3 + Elasticsearch | `s3-elasticsearch` or `s3-es` | S3 for CRUD, ES for search | ### Multi-tenancy ```bash diff --git a/README.md b/README.md index 7a337a79..5744f3b3 100644 --- a/README.md +++ b/README.md @@ -172,6 +172,7 @@ The Helios FHIR Server supports multiple storage backend configurations. Choose | **PostgreSQL** | Built-in full-text search (tsvector/tsquery) | Production OLTP deployments | | **PostgreSQL + Elasticsearch** | Elasticsearch-powered search with PostgreSQL CRUD | Production deployments needing RDBMS + robust search | | **S3** | Object storage for CRUD, versioning, history, and bulk operations (no search) | Archival, bulk analytics, cost-effective storage | +| **S3 + Elasticsearch** | Elasticsearch-powered search with S3 CRUD | Large-scale storage with full FHIR search | ### Running the Server @@ -200,6 +201,14 @@ HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ HFS_STORAGE_BACKEND=s3 \ HFS_S3_BUCKET=my-fhir-bucket \ AWS_PROFILE=your-aws-profile \ +AWS_REGION=us-east-1 \ + ./hfs + +# S3 + Elasticsearch +HFS_STORAGE_BACKEND=s3-elasticsearch \ +HFS_S3_BUCKET=my-fhir-bucket \ +HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ +AWS_PROFILE=your-aws-profile \ AWS_REGION=us-east-1 \ ./hfs ``` @@ -208,7 +217,7 @@ AWS_REGION=us-east-1 \ | Variable | Default | Description | |---|---|---| -| `HFS_STORAGE_BACKEND` | `sqlite` | Backend mode: `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, or `s3` | +| `HFS_STORAGE_BACKEND` | `sqlite` | Backend mode: `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, `s3`, or `s3-elasticsearch` | | `HFS_SERVER_PORT` | `8080` | Server port | | `HFS_SERVER_HOST` | `127.0.0.1` | Host to bind | | `HFS_DATABASE_URL` | `fhir.db` | Database URL (SQLite path or PostgreSQL connection string) | diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index 3ad07723..f9acbee4 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -11,8 +11,9 @@ //! | PostgreSQL | `postgres` | Full-featured RDBMS with JSONB storage and tsvector search | //! | PostgreSQL + Elasticsearch | `postgres,elasticsearch` | PostgreSQL for CRUD, Elasticsearch for search | //! | S3 | `s3` | AWS S3 object storage for CRUD, versioning, history, and bulk ops (no search) | +//! | S3 + Elasticsearch | `s3,elasticsearch` | S3 for CRUD, Elasticsearch for search | //! -//! Set `HFS_STORAGE_BACKEND` to `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, or `s3`. +//! Set `HFS_STORAGE_BACKEND` to `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, `s3`, or `s3-elasticsearch`. use clap::Parser; use helios_rest::{ServerConfig, StorageBackendMode, create_app_with_config, init_logging}; @@ -92,6 +93,9 @@ async fn main() -> anyhow::Result<()> { StorageBackendMode::S3 => { start_s3(config).await?; } + StorageBackendMode::S3Elasticsearch => { + start_s3_elasticsearch(config).await?; + } } Ok(()) @@ -423,6 +427,134 @@ async fn start_s3(_config: ServerConfig) -> anyhow::Result<()> { ) } +/// Starts the server with S3 + Elasticsearch composite backend. +#[cfg(all(feature = "s3", feature = "elasticsearch"))] +async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { + use std::collections::HashMap; + use std::sync::Arc; + + use helios_persistence::backends::elasticsearch::{ + ElasticsearchAuth, ElasticsearchBackend, ElasticsearchConfig, + }; + use helios_persistence::backends::s3::{S3Backend, S3BackendConfig, S3TenancyMode}; + use helios_persistence::composite::{CompositeConfig, CompositeStorage}; + use helios_persistence::core::BackendKind; + + let bucket = std::env::var("HFS_S3_BUCKET").unwrap_or_else(|_| "hfs".to_string()); + let region = std::env::var("HFS_S3_REGION").ok(); + let validate_buckets = std::env::var("HFS_S3_VALIDATE_BUCKETS") + .map(|s| s.to_lowercase() != "false" && s != "0") + .unwrap_or(true); + + info!( + bucket = %bucket, + region = ?region, + validate_buckets = validate_buckets, + "Initializing S3 backend" + ); + + let s3_config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: bucket.clone(), + }, + region, + validate_buckets_on_startup: validate_buckets, + ..Default::default() + }; + + let s3 = Arc::new(S3Backend::new(s3_config).map_err(|e| { + anyhow::anyhow!( + "Failed to initialize S3 backend (bucket={}, region={:?}): {}", + bucket, + std::env::var("AWS_REGION").ok(), + e + ) + })?); + + // Build Elasticsearch configuration from server config + let es_nodes: Vec = config + .elasticsearch_nodes + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + + let es_auth = match ( + &config.elasticsearch_username, + &config.elasticsearch_password, + ) { + (Some(username), Some(password)) => Some(ElasticsearchAuth::Basic { + username: username.clone(), + password: password.clone(), + }), + _ => None, + }; + + let es_config = ElasticsearchConfig { + nodes: es_nodes.clone(), + index_prefix: config.elasticsearch_index_prefix.clone(), + auth: es_auth, + fhir_version: config.default_fhir_version, + ..Default::default() + }; + + info!( + nodes = ?es_nodes, + index_prefix = %config.elasticsearch_index_prefix, + "Initializing Elasticsearch backend" + ); + + // S3 has no search parameter registry, so ES creates its own standalone registry. + let es = Arc::new(ElasticsearchBackend::new(es_config)?); + + // Build composite configuration + let composite_config = CompositeConfig::builder() + .primary("s3", BackendKind::S3) + .search_backend("es", BackendKind::Elasticsearch) + .build()?; + + // Build backends map for CompositeStorage + let mut backends = HashMap::new(); + backends.insert( + "s3".to_string(), + s3.clone() as helios_persistence::composite::DynStorage, + ); + backends.insert( + "es".to_string(), + es.clone() as helios_persistence::composite::DynStorage, + ); + + // Build search providers map + let mut search_providers = HashMap::new(); + search_providers.insert( + "s3".to_string(), + s3.clone() as helios_persistence::composite::DynSearchProvider, + ); + search_providers.insert( + "es".to_string(), + es.clone() as helios_persistence::composite::DynSearchProvider, + ); + + // Create composite storage with full primary capabilities + let composite = CompositeStorage::new(composite_config, backends)? + .with_search_providers(search_providers) + .with_full_primary(s3); + + info!("Composite storage initialized: S3 (primary) + Elasticsearch (search)"); + + let app = create_app_with_config(composite, config.clone()); + serve(app, &config).await +} + +/// Fallback when s3+elasticsearch features are not both enabled. +#[cfg(not(all(feature = "s3", feature = "elasticsearch")))] +async fn start_s3_elasticsearch(_config: ServerConfig) -> anyhow::Result<()> { + anyhow::bail!( + "The s3-elasticsearch backend requires both 's3' and 'elasticsearch' features. \ + Build with: cargo build -p helios-hfs --features s3,elasticsearch" + ) +} + #[cfg(not(any( feature = "sqlite", feature = "postgres", diff --git a/crates/persistence/README.md b/crates/persistence/README.md index 56cb49ef..d0672bce 100644 --- a/crates/persistence/README.md +++ b/crates/persistence/README.md @@ -382,7 +382,7 @@ Backends can serve as primary (CRUD, versioning, transactions) or secondary (opt | Cassandra + Elasticsearch | Cassandra | Elasticsearch (search) | Planned | Write-heavy + search | | MongoDB alone | MongoDB | — | Planned | Document-centric | | S3 alone | S3 | — | ✓ Implemented (storage-focused) | Archival/bulk/history storage | -| S3 + Elasticsearch | S3 | Elasticsearch (search) | Planned | Large-scale + search | +| S3 + Elasticsearch | S3 | Elasticsearch (search) | ✓ Implemented | Large-scale + search | ### Backend Selection Guide @@ -521,7 +521,7 @@ HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ ### How Search Offloading Works -When `HFS_STORAGE_BACKEND` is set to `sqlite-elasticsearch` or `postgres-elasticsearch`, the server: +When `HFS_STORAGE_BACKEND` is set to `sqlite-elasticsearch`, `postgres-elasticsearch`, or `s3-elasticsearch`, the server: 1. Creates the primary backend (SQLite or PostgreSQL) with search indexing **disabled** 2. Creates an Elasticsearch backend sharing the primary backend's search parameter registry @@ -1017,7 +1017,7 @@ The composite storage layer enables polyglot persistence by coordinating multipl | PostgreSQL-only | PostgreSQL | None | ✓ Implemented | Production OLTP | | PostgreSQL + ES | PostgreSQL | Elasticsearch | ✓ Implemented | OLTP + advanced search | | PostgreSQL + Neo4j | PostgreSQL | Neo4j | Planned | Graph-heavy queries | -| S3 + ES | S3 | Elasticsearch | Planned | Large-scale, cheap storage | +| S3 + ES | S3 | Elasticsearch | ✓ Implemented | Large-scale, cheap storage | ### Quick Start diff --git a/crates/rest/src/config.rs b/crates/rest/src/config.rs index 2bc84d9a..72e5564e 100644 --- a/crates/rest/src/config.rs +++ b/crates/rest/src/config.rs @@ -66,6 +66,9 @@ pub enum StorageBackendMode { /// AWS S3 object storage for CRUD, versioning, history, and bulk operations. /// Requires AWS credentials via the standard provider chain. No search support. S3, + /// AWS S3 for CRUD + Elasticsearch for search. + /// Requires AWS credentials and a running Elasticsearch instance. + S3Elasticsearch, } impl fmt::Display for StorageBackendMode { @@ -78,6 +81,7 @@ impl fmt::Display for StorageBackendMode { write!(f, "postgres-elasticsearch") } StorageBackendMode::S3 => write!(f, "s3"), + StorageBackendMode::S3Elasticsearch => write!(f, "s3-elasticsearch"), } } } @@ -94,8 +98,9 @@ impl FromStr for StorageBackendMode { Ok(StorageBackendMode::PostgresElasticsearch) } "s3" | "objectstore" => Ok(StorageBackendMode::S3), + "s3-elasticsearch" | "s3-es" => Ok(StorageBackendMode::S3Elasticsearch), _ => Err(format!( - "Invalid storage backend '{}'. Valid values: sqlite, sqlite-elasticsearch, postgres, postgres-elasticsearch, s3", + "Invalid storage backend '{}'. Valid values: sqlite, sqlite-elasticsearch, postgres, postgres-elasticsearch, s3, s3-elasticsearch", s )), } @@ -304,7 +309,7 @@ pub struct ServerConfig { #[arg(long, env = "HFS_MAX_PAGE_SIZE", default_value = "1000")] pub max_page_size: usize, - /// Storage backend mode: sqlite (default), sqlite-elasticsearch, postgres, postgres-elasticsearch, or s3. + /// Storage backend mode: sqlite (default), sqlite-elasticsearch, postgres, postgres-elasticsearch, s3, or s3-elasticsearch. #[arg(long, env = "HFS_STORAGE_BACKEND", default_value = "sqlite")] pub storage_backend: String, @@ -641,6 +646,18 @@ mod tests { "S3".parse::().unwrap(), StorageBackendMode::S3 ); + assert_eq!( + "s3-elasticsearch".parse::().unwrap(), + StorageBackendMode::S3Elasticsearch + ); + assert_eq!( + "s3-es".parse::().unwrap(), + StorageBackendMode::S3Elasticsearch + ); + assert_eq!( + "S3-ES".parse::().unwrap(), + StorageBackendMode::S3Elasticsearch + ); assert!("invalid".parse::().is_err()); } @@ -657,6 +674,10 @@ mod tests { "postgres-elasticsearch" ); assert_eq!(StorageBackendMode::S3.to_string(), "s3"); + assert_eq!( + StorageBackendMode::S3Elasticsearch.to_string(), + "s3-elasticsearch" + ); } #[test] From 243a674138a64c0043053d297c6a85ea5ab78a12 Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 20:57:58 -0500 Subject: [PATCH 02/17] ci(inferno): replace s3 backend with s3-elasticsearch in test matrix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit S3 alone lacks search capability and cannot pass the full Inferno US Core test suite. Replace it with s3-elasticsearch so CRUD goes to S3 while all FHIR search queries are handled by Elasticsearch. - Matrix: s3 → s3-elasticsearch - Start Elasticsearch: condition now covers both sqlite-elasticsearch and s3-elasticsearch - Start HFS: replace s3 branch with s3-elasticsearch (adds HFS_ELASTICSEARCH_NODES) - Skip condition: matrix.backend != 's3-elasticsearch' when HFS_S3_BUCKET unset --- .github/workflows/inferno.yml | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index c6603535..dcbdc724 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -12,7 +12,7 @@ env: # Remote Docker host (set via GitHub repository secrets or variables; leave unset for local Docker) DOCKER_HOST: ${{ secrets.DOCKER_HOST }} DOCKER_HOST_IP: ${{ secrets.DOCKER_HOST_IP }} - # S3 backend configuration (optional; S3 tests are skipped when not set) + # S3+Elasticsearch backend configuration (optional; s3-elasticsearch tests are skipped when not set) HFS_S3_BUCKET: ${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }} jobs: @@ -63,7 +63,7 @@ jobs: us_core_v700, us_core_v800, ] - backend: [sqlite, sqlite-elasticsearch, postgres, s3] + backend: [sqlite, sqlite-elasticsearch, postgres, s3-elasticsearch] include: - { suite_id: us_core_v311, version_label: "v3.1.1" } - { suite_id: us_core_v400, version_label: "v4.0.0" } @@ -140,7 +140,7 @@ jobs: echo "OMITTED_TESTS=[${OMITTED}]" >> $GITHUB_ENV - name: Start Elasticsearch - if: matrix.backend == 'sqlite-elasticsearch' + if: matrix.backend == 'sqlite-elasticsearch' || matrix.backend == 's3-elasticsearch' run: | ES_CONTAINER="es-${{ matrix.suite_id }}-${{ matrix.backend }}" docker rm -f $ES_CONTAINER 2>/dev/null || true @@ -198,7 +198,7 @@ jobs: exit 1 - name: Start HFS server - if: matrix.backend != 's3' || env.HFS_S3_BUCKET != '' + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | HFS_LOG="/tmp/hfs-${{ matrix.suite_id }}-${{ matrix.backend }}.log" echo "HFS_LOG=$HFS_LOG" >> $GITHUB_ENV @@ -215,10 +215,11 @@ jobs: HFS_PG_USER=helios \ HFS_PG_PASSWORD=helios \ ./target/debug/hfs --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & - elif [ "${{ matrix.backend }}" = "s3" ]; then - HFS_STORAGE_BACKEND=s3 \ + elif [ "${{ matrix.backend }}" = "s3-elasticsearch" ]; then + HFS_STORAGE_BACKEND=s3-elasticsearch \ HFS_S3_BUCKET=${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }} \ HFS_S3_VALIDATE_BUCKETS=false \ + HFS_ELASTICSEARCH_NODES=http://$DOCKER_HOST_IP:$ES_PORT \ AWS_REGION=${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} \ ./target/debug/hfs --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & else From ff1ceeefcc0f360ee3bae70b20221e923ffb15fe Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 21:38:50 -0500 Subject: [PATCH 03/17] feat(s3): expose HFS_S3_PREFIX env var; isolate inferno CI runs by suite prefix - Add HFS_S3_PREFIX env var support to start_s3 and start_s3_elasticsearch so callers can scope all S3 keys under an optional global prefix - Update inferno.yml: pass HFS_S3_PREFIX=ci// per matrix job so parallel s3-elasticsearch jobs are fully isolated within the shared bucket - Empty only the job-scoped S3 prefix (not the whole bucket) before each run - Add AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEY to the s3-elasticsearch HFS start command - Guard all post-HFS steps with the s3-elasticsearch/HFS_S3_BUCKET condition so jobs are skipped cleanly when the bucket secret is not configured - Document HFS_S3_PREFIX in README.md --- .github/workflows/inferno.yml | 22 ++++++++++++++++++++++ README.md | 1 + crates/hfs/src/main.rs | 6 ++++++ 3 files changed, 29 insertions(+) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index dcbdc724..dfef4fc4 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -197,6 +197,19 @@ jobs: docker logs $PG_CONTAINER exit 1 + - name: Empty S3 prefix + if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' + env: + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + AWS_DEFAULT_REGION: ${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} + run: | + BUCKET="${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }}" + PREFIX="ci/${{ matrix.suite_id }}/" + echo "Emptying s3://$BUCKET/$PREFIX" + aws s3 rm "s3://$BUCKET/$PREFIX" --recursive + echo "Prefix emptied" + - name: Start HFS server if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | @@ -218,8 +231,11 @@ jobs: elif [ "${{ matrix.backend }}" = "s3-elasticsearch" ]; then HFS_STORAGE_BACKEND=s3-elasticsearch \ HFS_S3_BUCKET=${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }} \ + HFS_S3_PREFIX=ci/${{ matrix.suite_id }}/ \ HFS_S3_VALIDATE_BUCKETS=false \ HFS_ELASTICSEARCH_NODES=http://$DOCKER_HOST_IP:$ES_PORT \ + AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \ + AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ AWS_REGION=${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} \ ./target/debug/hfs --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & else @@ -229,6 +245,7 @@ jobs: echo "HFS_PID=$(cat /tmp/hfs.pid)" >> $GITHUB_ENV - name: Wait for HFS to be ready + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | echo "Waiting for HFS to start..." for i in {1..30}; do @@ -254,10 +271,12 @@ jobs: exit 1 - name: Load Inferno test data into HFS + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | ./crates/hfs/tests/inferno/install.sh - name: Wait for Inferno to be ready + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | echo "Waiting for persistent Inferno container to respond..." for i in {1..30}; do @@ -272,9 +291,11 @@ jobs: exit 1 - name: Create results directory + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: mkdir -p inferno-results - name: Run Inferno tests + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | # Create a test session for US Core ${{ matrix.version_label }} # Retry to handle Inferno's SQLite "database is locked" errors under concurrent load @@ -388,6 +409,7 @@ jobs: done - name: Check test results + if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | if [ ! -f inferno-results/results.json ]; then echo "No results file found" diff --git a/README.md b/README.md index 5744f3b3..5b5709e8 100644 --- a/README.md +++ b/README.md @@ -229,6 +229,7 @@ AWS_REGION=us-east-1 \ | `HFS_ELASTICSEARCH_PASSWORD` | *(none)* | ES basic auth password | | `HFS_S3_BUCKET` | `hfs` | S3 bucket name (prefix-per-tenant mode) | | `HFS_S3_REGION` | *(AWS provider chain)* | AWS region override | +| `HFS_S3_PREFIX` | *(none)* | Optional key prefix prepended to all S3 object keys | | `HFS_S3_VALIDATE_BUCKETS` | `true` | Validate bucket access on startup | For detailed backend setup instructions (building from source, Docker commands, and search offloading architecture), see the [persistence crate documentation](crates/persistence/README.md#building--running-storage-backends). diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index f9acbee4..2076f3b3 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -385,6 +385,7 @@ async fn start_s3(config: ServerConfig) -> anyhow::Result<()> { let bucket = std::env::var("HFS_S3_BUCKET").unwrap_or_else(|_| "hfs".to_string()); let region = std::env::var("HFS_S3_REGION").ok(); + let prefix = std::env::var("HFS_S3_PREFIX").ok(); let validate_buckets = std::env::var("HFS_S3_VALIDATE_BUCKETS") .map(|s| s.to_lowercase() != "false" && s != "0") .unwrap_or(true); @@ -392,6 +393,7 @@ async fn start_s3(config: ServerConfig) -> anyhow::Result<()> { info!( bucket = %bucket, region = ?region, + prefix = ?prefix, validate_buckets = validate_buckets, "Initializing S3 backend" ); @@ -401,6 +403,7 @@ async fn start_s3(config: ServerConfig) -> anyhow::Result<()> { bucket: bucket.clone(), }, region, + prefix, validate_buckets_on_startup: validate_buckets, ..Default::default() }; @@ -442,6 +445,7 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { let bucket = std::env::var("HFS_S3_BUCKET").unwrap_or_else(|_| "hfs".to_string()); let region = std::env::var("HFS_S3_REGION").ok(); + let prefix = std::env::var("HFS_S3_PREFIX").ok(); let validate_buckets = std::env::var("HFS_S3_VALIDATE_BUCKETS") .map(|s| s.to_lowercase() != "false" && s != "0") .unwrap_or(true); @@ -449,6 +453,7 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { info!( bucket = %bucket, region = ?region, + prefix = ?prefix, validate_buckets = validate_buckets, "Initializing S3 backend" ); @@ -458,6 +463,7 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { bucket: bucket.clone(), }, region, + prefix, validate_buckets_on_startup: validate_buckets, ..Default::default() }; From 7e3d7968d39570c84c6e5fb3765a0b3d6e0f4e26 Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 21:47:59 -0500 Subject: [PATCH 04/17] ci(inferno): dump HFS log on install.sh failure for easier debugging [skip ci] --- .github/workflows/inferno.yml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index dfef4fc4..eb2d14f7 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -273,7 +273,12 @@ jobs: - name: Load Inferno test data into HFS if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' run: | - ./crates/hfs/tests/inferno/install.sh + ./crates/hfs/tests/inferno/install.sh || { + echo "=== HFS log output ===" + cat "$HFS_LOG" + echo "=== end of log ===" + exit 1 + } - name: Wait for Inferno to be ready if: matrix.backend != 's3-elasticsearch' || env.HFS_S3_BUCKET != '' From 5b7375ce896c1fd280352f7c85aa8c2066334c39 Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 22:00:57 -0500 Subject: [PATCH 05/17] ci(inferno): install AWS CLI on runner if not present [skip ci] --- .github/workflows/inferno.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index eb2d14f7..9e0f7891 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -197,6 +197,17 @@ jobs: docker logs $PG_CONTAINER exit 1 + - name: Install AWS CLI + if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' + run: | + if ! command -v aws &>/dev/null; then + curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o /tmp/awscliv2.zip + unzip -q /tmp/awscliv2.zip -d /tmp + sudo /tmp/aws/install + rm -rf /tmp/awscliv2.zip /tmp/aws + fi + aws --version + - name: Empty S3 prefix if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' env: From 82a148f1660c510562c5ffa99f658a72a42490f2 Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 22:09:50 -0500 Subject: [PATCH 06/17] ci(inferno): install AWS CLI without sudo using user-local dir [skip ci] --- .github/workflows/inferno.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index 9e0f7891..0713333b 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -203,10 +203,11 @@ jobs: if ! command -v aws &>/dev/null; then curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o /tmp/awscliv2.zip unzip -q /tmp/awscliv2.zip -d /tmp - sudo /tmp/aws/install + /tmp/aws/install --install-dir /tmp/aws-cli --bin-dir /tmp/aws-bin + echo "/tmp/aws-bin" >> $GITHUB_PATH rm -rf /tmp/awscliv2.zip /tmp/aws fi - aws --version + aws --version || /tmp/aws-bin/aws --version - name: Empty S3 prefix if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' From 6588bfee652414828ef7b56b8a604ffa6d938e47 Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 22:22:06 -0500 Subject: [PATCH 07/17] ci(inferno): fix AWS CLI install on persistent runners (avoid unzip overwrite prompt) [skip ci] --- .github/workflows/inferno.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index 0713333b..7c7b698a 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -200,14 +200,14 @@ jobs: - name: Install AWS CLI if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' run: | - if ! command -v aws &>/dev/null; then + if ! command -v aws &>/dev/null && ! /tmp/aws-bin/aws --version &>/dev/null; then curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o /tmp/awscliv2.zip - unzip -q /tmp/awscliv2.zip -d /tmp + unzip -qo /tmp/awscliv2.zip -d /tmp /tmp/aws/install --install-dir /tmp/aws-cli --bin-dir /tmp/aws-bin - echo "/tmp/aws-bin" >> $GITHUB_PATH rm -rf /tmp/awscliv2.zip /tmp/aws fi - aws --version || /tmp/aws-bin/aws --version + echo "/tmp/aws-bin" >> $GITHUB_PATH + /tmp/aws-bin/aws --version - name: Empty S3 prefix if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' From 381d3cc96dbcbb92080e1973b0af821de0b931e4 Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Thu, 5 Mar 2026 23:20:16 -0500 Subject: [PATCH 08/17] ci(inferno): fix AWS CLI install check when aws already on PATH [skip ci] --- .github/workflows/inferno.yml | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index 7c7b698a..45cd6dca 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -200,14 +200,16 @@ jobs: - name: Install AWS CLI if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' run: | - if ! command -v aws &>/dev/null && ! /tmp/aws-bin/aws --version &>/dev/null; then - curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o /tmp/awscliv2.zip - unzip -qo /tmp/awscliv2.zip -d /tmp - /tmp/aws/install --install-dir /tmp/aws-cli --bin-dir /tmp/aws-bin - rm -rf /tmp/awscliv2.zip /tmp/aws + if ! command -v aws &>/dev/null; then + if ! /tmp/aws-bin/aws --version &>/dev/null; then + curl -fsSL "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o /tmp/awscliv2.zip + unzip -qo /tmp/awscliv2.zip -d /tmp + /tmp/aws/install --install-dir /tmp/aws-cli --bin-dir /tmp/aws-bin + rm -rf /tmp/awscliv2.zip /tmp/aws + fi + echo "/tmp/aws-bin" >> $GITHUB_PATH fi - echo "/tmp/aws-bin" >> $GITHUB_PATH - /tmp/aws-bin/aws --version + aws --version 2>/dev/null || /tmp/aws-bin/aws --version - name: Empty S3 prefix if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' From 8f38a493c22fd1b8b53fe864227226249c16a4f2 Mon Sep 17 00:00:00 2001 From: smunini Date: Fri, 6 Mar 2026 15:35:48 -0500 Subject: [PATCH 09/17] docs(persistence): add S3 + Elasticsearch composite section to README [skip ci] Document the S3+ES composite storage configuration including env vars, build/run commands, S3-compatible endpoint setup, key differences from SQLite/PG+ES, and programmatic composite assembly example. Also update the search offloading section to accurately reflect S3 behavior. --- crates/persistence/README.md | 108 ++++++++++++++++++++++++++++++++++- 1 file changed, 106 insertions(+), 2 deletions(-) diff --git a/crates/persistence/README.md b/crates/persistence/README.md index d0672bce..2b5fa022 100644 --- a/crates/persistence/README.md +++ b/crates/persistence/README.md @@ -519,12 +519,116 @@ HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ ./target/release/hfs ``` +### S3 + Elasticsearch + +S3 handles CRUD, versioning, history, and bulk operations. Elasticsearch handles all search operations. Combines S3's cost-effective, durable object storage with Elasticsearch's search capabilities for large-scale deployments. + +- CRUD persistence via S3 objects (current pointer + immutable history versions) +- Versioning (`vread`, optimistic locking via version checks) +- Instance, type, and system history via immutable history objects +- Batch bundles and best-effort transaction bundles +- Bulk export (NDJSON parts + manifest in S3) +- Bulk submit with rollback change log +- Full-text search with relevance scoring (`_text`, `_content`) via Elasticsearch +- All FHIR search parameter types (string, token, date, number, quantity, reference, URI, composite) +- Advanced text search with stemming, boolean operators, and proximity matching (`:text-advanced`) +- Tenant isolation (`PrefixPerTenant` or `BucketPerTenant`) + +**Prerequisites:** An AWS S3 bucket (or S3-compatible service) and a running Elasticsearch 8.x instance. + +```bash +# Build with S3 and Elasticsearch support +cargo build --bin hfs --features s3,elasticsearch --release + +# Start Elasticsearch (example using Docker) +docker run -d --name es -p 9200:9200 \ + -e "discovery.type=single-node" \ + -e "xpack.security.enabled=false" \ + elasticsearch:8.15.0 + +# Start the server (AWS S3) +HFS_STORAGE_BACKEND=s3-elasticsearch \ +HFS_S3_BUCKET=my-fhir-bucket \ +HFS_ELASTICSEARCH_NODES=http://localhost:9200 \ + ./target/release/hfs +``` + +#### S3 Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `HFS_S3_BUCKET` | `hfs` | S3 bucket name | +| `HFS_S3_REGION` | (provider chain) | AWS region override | +| `HFS_S3_PREFIX` | (none) | Optional global key prefix | +| `HFS_S3_VALIDATE_BUCKETS` | `true` | Validate buckets on startup via `HeadBucket` | + +AWS credentials are resolved via the standard AWS provider chain (`AWS_ACCESS_KEY_ID`/`AWS_SECRET_ACCESS_KEY`, EC2 instance metadata, shared credentials file, SSO, etc.). + +#### S3-Compatible Endpoints (MinIO, etc.) + +For S3-compatible services, configure the endpoint programmatically via `S3BackendConfig`: + +```rust +use helios_persistence::backends::s3::{S3BackendConfig, S3TenancyMode}; + +let config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: "minio-bucket".to_string(), + }, + endpoint_url: Some("http://127.0.0.1:9000".to_string()), + allow_http: true, + force_path_style: true, + ..Default::default() +}; +``` + +When `endpoint_url` is set, the backend automatically defaults `force_path_style` to `true` and `region` to `us-east-1` if not otherwise specified. + +#### Key Differences from SQLite/PG + ES + +Unlike SQLite and PostgreSQL, the S3 backend has no built-in search parameter registry. When composing S3 + Elasticsearch, the ES backend creates its own standalone registry with minimal embedded search parameters (`_id`, `_lastUpdated`, `_tag`, `_profile`, `_security`). For full search capability, use `with_shared_registry()` with parameters loaded from spec files. + +```rust +use std::collections::HashMap; +use std::sync::Arc; +use helios_persistence::backends::elasticsearch::{ElasticsearchBackend, ElasticsearchConfig}; +use helios_persistence::backends::s3::{S3Backend, S3BackendConfig}; +use helios_persistence::composite::{CompositeConfig, CompositeStorage, DynStorage, DynSearchProvider}; +use helios_persistence::core::BackendKind; + +// Create S3 backend +let s3_config = S3BackendConfig::default(); +let s3 = Arc::new(S3Backend::from_env_async(s3_config).await?); + +// Create ES backend (standalone registry — S3 has no registry to share) +let es_config = ElasticsearchConfig::default(); +let es = Arc::new(ElasticsearchBackend::new(es_config)?); + +// Build composite +let composite_config = CompositeConfig::builder() + .primary("s3", BackendKind::S3) + .search_backend("es", BackendKind::Elasticsearch) + .build()?; + +let mut backends = HashMap::new(); +backends.insert("s3".to_string(), s3.clone() as DynStorage); +backends.insert("es".to_string(), es.clone() as DynStorage); + +let mut search_providers = HashMap::new(); +search_providers.insert("s3".to_string(), s3.clone() as DynSearchProvider); +search_providers.insert("es".to_string(), es.clone() as DynSearchProvider); + +let composite = CompositeStorage::new(composite_config, backends)? + .with_search_providers(search_providers) + .with_full_primary(s3); +``` + ### How Search Offloading Works When `HFS_STORAGE_BACKEND` is set to `sqlite-elasticsearch`, `postgres-elasticsearch`, or `s3-elasticsearch`, the server: -1. Creates the primary backend (SQLite or PostgreSQL) with search indexing **disabled** -2. Creates an Elasticsearch backend sharing the primary backend's search parameter registry +1. Creates the primary backend (SQLite, PostgreSQL, or S3). For SQLite/PG, search indexing is **disabled**; S3 has no search indexing to disable. +2. Creates an Elasticsearch backend. For SQLite/PG, it shares the primary backend's search parameter registry; for S3, it creates its own standalone registry. 3. Wraps both in a `CompositeStorage` that routes: - All **writes** (create, update, delete, conditional ops, transactions) → primary backend, then syncs to ES - All **reads** (read, vread, history) → primary backend From e9931f4fd3cd6c7428b8c4e459c56c1612da7e85 Mon Sep 17 00:00:00 2001 From: smunini Date: Fri, 6 Mar 2026 15:35:58 -0500 Subject: [PATCH 10/17] fix(s3): use async initialization and improve error messages Switch S3Backend::new() to S3Backend::from_env_async() for proper async SDK config loading, and include full error details in S3 client error messages instead of dropping them when the message field is empty. --- crates/hfs/src/main.rs | 4 ++-- crates/persistence/src/backends/s3/client.rs | 14 ++++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index 2076f3b3..eb921798 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -408,7 +408,7 @@ async fn start_s3(config: ServerConfig) -> anyhow::Result<()> { ..Default::default() }; - let backend = S3Backend::new(s3_config).map_err(|e| { + let backend = S3Backend::from_env_async(s3_config).await.map_err(|e| { anyhow::anyhow!( "Failed to initialize S3 backend (bucket={}, region={:?}): {}", bucket, @@ -468,7 +468,7 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { ..Default::default() }; - let s3 = Arc::new(S3Backend::new(s3_config).map_err(|e| { + let s3 = Arc::new(S3Backend::from_env_async(s3_config).await.map_err(|e| { anyhow::anyhow!( "Failed to initialize S3 backend (bucket={}, region={:?}): {}", bucket, diff --git a/crates/persistence/src/backends/s3/client.rs b/crates/persistence/src/backends/s3/client.rs index 04188702..3a9f5244 100644 --- a/crates/persistence/src/backends/s3/client.rs +++ b/crates/persistence/src/backends/s3/client.rs @@ -396,10 +396,16 @@ where .to_string(), ), 404 => S3ClientError::NotFound, - _ if message.is_empty() => S3ClientError::Internal(format!( - "S3 error (HTTP {status}, code={code})" - )), - _ => S3ClientError::Internal(message), + _ => { + let detail = if message.is_empty() { + format!("{:?}", service_err.err()) + } else { + message + }; + S3ClientError::Internal(format!( + "S3 error (HTTP {status}, code={code}): {detail}" + )) + } } } } From e76ab100d38ac04c0bb785a54747064926df9ddd Mon Sep 17 00:00:00 2001 From: aacruzgon Date: Fri, 6 Mar 2026 19:37:08 -0500 Subject: [PATCH 11/17] feat(persistence): add S3 + Elasticsearch composite storage backend --- CLAUDE.md | 18 + Cargo.lock | 1 + crates/hfs/Cargo.toml | 3 + crates/hfs/src/main.rs | 51 +- crates/persistence/Cargo.toml | 2 +- crates/persistence/src/composite/storage.rs | 31 +- crates/persistence/tests/s3_es_tests.rs | 760 ++++++++++++++++++++ crates/rest/src/config.rs | 2 +- 8 files changed, 853 insertions(+), 15 deletions(-) create mode 100644 crates/persistence/tests/s3_es_tests.rs diff --git a/CLAUDE.md b/CLAUDE.md index b59f5e88..e961f75c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -194,6 +194,9 @@ HFS_STORAGE_BACKEND=postgres HFS_DATABASE_URL="postgresql://user:pass@localhost/ # With SQLite + Elasticsearch HFS_STORAGE_BACKEND=sqlite-es HFS_ELASTICSEARCH_NODES="http://localhost:9200" cargo run --bin hfs +# With S3 (requires --features s3) +HFS_STORAGE_BACKEND=s3 HFS_S3_BUCKET=my-bucket cargo run --bin hfs --features s3 + # With environment overrides HFS_SERVER_PORT=3000 HFS_LOG_LEVEL=debug cargo run --bin hfs ``` @@ -263,6 +266,21 @@ HFS_SERVER_PORT=3000 HFS_LOG_LEVEL=debug cargo run --bin hfs | S3 | `s3` | AWS S3 object storage for CRUD, versioning, history, bulk ops (no search) | | S3 + Elasticsearch | `s3-elasticsearch` or `s3-es` | S3 for CRUD, ES for search | +#### S3 Backend +Requires building with the `s3` feature: +```bash +cargo build -p helios-hfs --features s3 +HFS_STORAGE_BACKEND=s3 HFS_S3_BUCKET=my-bucket HFS_S3_REGION=us-east-1 cargo run --bin hfs --features s3 +``` + +| Variable | Default | Description | +|----------|---------|-------------| +| `HFS_S3_BUCKET` | `hfs` | S3 bucket name (prefix-per-tenant mode) | +| `HFS_S3_REGION` | (AWS chain) | AWS region override | +| `HFS_S3_VALIDATE_BUCKETS` | `true` | Validate bucket existence on startup | + +Standard AWS credential chain applies (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, instance profiles, etc.). For S3-compatible endpoints (e.g., MinIO), configure `S3BackendConfig` directly with `endpoint_url` and `force_path_style`. + ### Multi-tenancy ```bash # Via header (default) diff --git a/Cargo.lock b/Cargo.lock index c9e6c27a..686e02b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2798,6 +2798,7 @@ dependencies = [ "helios-persistence", "helios-rest", "openssl", + "parking_lot", "reqwest", "tokio", "tracing", diff --git a/crates/hfs/Cargo.toml b/crates/hfs/Cargo.toml index af4fa221..e970ae50 100644 --- a/crates/hfs/Cargo.toml +++ b/crates/hfs/Cargo.toml @@ -53,6 +53,9 @@ clap = { version = "4.0", features = ["derive", "env"] } # Logging tracing = "0.1" +# Sync primitives (used by build_search_registry for the elasticsearch composite backend) +parking_lot = "0.12" + # Error handling anyhow = "1.0" diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index eb921798..b3737d21 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -11,7 +11,7 @@ //! | PostgreSQL | `postgres` | Full-featured RDBMS with JSONB storage and tsvector search | //! | PostgreSQL + Elasticsearch | `postgres,elasticsearch` | PostgreSQL for CRUD, Elasticsearch for search | //! | S3 | `s3` | AWS S3 object storage for CRUD, versioning, history, and bulk ops (no search) | -//! | S3 + Elasticsearch | `s3,elasticsearch` | S3 for CRUD, Elasticsearch for search | +//! | S3 + Elasticsearch | `s3,elasticsearch` | S3 for CRUD/history, Elasticsearch for search | //! //! Set `HFS_STORAGE_BACKEND` to `sqlite`, `sqlite-elasticsearch`, `postgres`, `postgres-elasticsearch`, `s3`, or `s3-elasticsearch`. @@ -430,6 +430,34 @@ async fn start_s3(_config: ServerConfig) -> anyhow::Result<()> { ) } +/// Builds a search parameter registry independently (for backends that don't own one). +#[cfg(feature = "elasticsearch")] +fn build_search_registry( + fhir_version: helios_fhir::FhirVersion, + data_dir: Option<&std::path::Path>, +) -> std::sync::Arc> { + use helios_persistence::search::{SearchParameterLoader, SearchParameterRegistry}; + + let registry = std::sync::Arc::new(parking_lot::RwLock::new(SearchParameterRegistry::new())); + let loader = SearchParameterLoader::new(fhir_version); + { + let mut reg = registry.write(); + if let Ok(params) = loader.load_embedded() { + for p in params { + let _ = reg.register(p); + } + } + let dir = data_dir.unwrap_or_else(|| std::path::Path::new("./data")); + if let Ok(params) = loader.load_from_spec_file(dir) { + for p in params { + let _ = reg.register(p); + } + } + } + registry +} + + /// Starts the server with S3 + Elasticsearch composite backend. #[cfg(all(feature = "s3", feature = "elasticsearch"))] async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { @@ -443,6 +471,7 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { use helios_persistence::composite::{CompositeConfig, CompositeStorage}; use helios_persistence::core::BackendKind; + // --- S3 backend (primary) --- let bucket = std::env::var("HFS_S3_BUCKET").unwrap_or_else(|_| "hfs".to_string()); let region = std::env::var("HFS_S3_REGION").ok(); let prefix = std::env::var("HFS_S3_PREFIX").ok(); @@ -455,7 +484,7 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { region = ?region, prefix = ?prefix, validate_buckets = validate_buckets, - "Initializing S3 backend" + "Initializing S3 backend (primary)" ); let s3_config = S3BackendConfig { @@ -477,7 +506,7 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { ) })?); - // Build Elasticsearch configuration from server config + // --- Elasticsearch backend (search) --- let es_nodes: Vec = config .elasticsearch_nodes .split(',') @@ -507,19 +536,23 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { info!( nodes = ?es_nodes, index_prefix = %config.elasticsearch_index_prefix, - "Initializing Elasticsearch backend" + "Initializing Elasticsearch backend (search)" ); - // S3 has no search parameter registry, so ES creates its own standalone registry. - let es = Arc::new(ElasticsearchBackend::new(es_config)?); + // Build search registry independently — S3 has no internal registry + let search_registry = + build_search_registry(config.default_fhir_version, config.data_dir.as_deref()); + let es = Arc::new(ElasticsearchBackend::with_shared_registry( + es_config, + search_registry, + )?); - // Build composite configuration + // --- Composite wiring --- let composite_config = CompositeConfig::builder() .primary("s3", BackendKind::S3) .search_backend("es", BackendKind::Elasticsearch) .build()?; - // Build backends map for CompositeStorage let mut backends = HashMap::new(); backends.insert( "s3".to_string(), @@ -530,7 +563,6 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { es.clone() as helios_persistence::composite::DynStorage, ); - // Build search providers map let mut search_providers = HashMap::new(); search_providers.insert( "s3".to_string(), @@ -541,7 +573,6 @@ async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { es.clone() as helios_persistence::composite::DynSearchProvider, ); - // Create composite storage with full primary capabilities let composite = CompositeStorage::new(composite_config, backends)? .with_search_providers(search_providers) .with_full_primary(s3); diff --git a/crates/persistence/Cargo.toml b/crates/persistence/Cargo.toml index d40c7144..3ea09dd3 100644 --- a/crates/persistence/Cargo.toml +++ b/crates/persistence/Cargo.toml @@ -95,7 +95,7 @@ tokio-test = "0.4" tempfile = "3" criterion = { version = "0.5", features = ["async_tokio"] } testcontainers = "0.26" -testcontainers-modules = { version = "0.14", features = ["postgres", "elastic_search"] } +testcontainers-modules = { version = "0.14", features = ["postgres", "elastic_search", "minio"] } paste = "1.0" # Configuration advisor binary diff --git a/crates/persistence/src/composite/storage.rs b/crates/persistence/src/composite/storage.rs index 778f3753..d208e274 100644 --- a/crates/persistence/src/composite/storage.rs +++ b/crates/persistence/src/composite/storage.rs @@ -478,7 +478,8 @@ impl CompositeStorage { } // Collect results - let mut primary_result = None; + let mut primary_result: Option = None; + let mut primary_unsupported = false; let mut auxiliary_results = Vec::new(); while let Some(result) = tasks.join_next().await { @@ -491,11 +492,22 @@ impl CompositeStorage { ); if id == primary_id { - primary_result = Some(search_result?); + match search_result { + Ok(r) => primary_result = Some(r), + Err(StorageError::Backend(BackendError::UnsupportedCapability { + .. + })) => { + // Primary doesn't support this search feature (e.g. S3 has no + // full-text search). An auxiliary backend (e.g. Elasticsearch) + // will handle it — promote its result to primary below. + primary_unsupported = true; + } + Err(e) => return Err(e), + } } else if let Ok(res) = search_result { auxiliary_results.push((id, res)); } - // Ignore auxiliary failures - graceful degradation + // Ignore other auxiliary failures - graceful degradation } Err(e) => { warn!(error = %e, "Task join error during parallel search"); @@ -503,6 +515,19 @@ impl CompositeStorage { } } + // When primary lacks search capability and auxiliary has results, promote the + // first auxiliary result to primary so the merger can return it directly. + if primary_unsupported && primary_result.is_none() { + if !auxiliary_results.is_empty() { + let (_, promoted) = auxiliary_results.remove(0); + return Ok((promoted, auxiliary_results)); + } + return Err(StorageError::Backend(BackendError::UnsupportedCapability { + backend_name: primary_id, + capability: "search".to_string(), + })); + } + let primary = primary_result.ok_or_else(|| { StorageError::Backend(BackendError::ConnectionFailed { backend_name: primary_id, diff --git a/crates/persistence/tests/s3_es_tests.rs b/crates/persistence/tests/s3_es_tests.rs new file mode 100644 index 00000000..a8934f92 --- /dev/null +++ b/crates/persistence/tests/s3_es_tests.rs @@ -0,0 +1,760 @@ +//! S3 + Elasticsearch composite backend integration tests. +//! +//! These tests verify that the S3 backend (primary) and Elasticsearch backend (search) +//! work together correctly as a composite storage unit. +//! +//! Tests are opt-in via `RUN_MINIO_S3_ES_TESTS=1` and require Docker to be running. +//! MinIO is used as an S3-compatible store; Elasticsearch is the search backend. +//! +//! Run with: +//! RUN_MINIO_S3_ES_TESTS=1 cargo test -p helios-persistence \ +//! --features s3,elasticsearch -- s3_es + +#![cfg(all(feature = "s3", feature = "elasticsearch"))] + +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; + +use helios_fhir::FhirVersion; +use parking_lot::RwLock; +use serde_json::json; + +use helios_persistence::backends::elasticsearch::{ElasticsearchBackend, ElasticsearchConfig}; +use helios_persistence::backends::s3::{S3Backend, S3BackendConfig, S3TenancyMode}; +use helios_persistence::composite::{ + CompositeConfig, CompositeStorage, DynSearchProvider, DynStorage, +}; +use helios_persistence::core::search::SearchProvider; +use helios_persistence::core::{Backend, BackendKind, ResourceStorage}; +use helios_persistence::error::{ResourceError, StorageError}; +use helios_persistence::search::{SearchParameterLoader, SearchParameterRegistry}; +use helios_persistence::tenant::{TenantContext, TenantId, TenantPermissions}; +use helios_persistence::types::{SearchParamType, SearchParameter, SearchQuery, SearchValue}; + +use aws_config::{BehaviorVersion, Region}; +use aws_sdk_s3::Client; +use aws_sdk_s3::config::Credentials; + +use testcontainers::GenericImage; +use testcontainers::ImageExt; +use testcontainers::core::{IntoContainerPort, WaitFor}; +use testcontainers::runners::AsyncRunner; +use testcontainers_modules::elastic_search::ElasticSearch; +use tokio::sync::OnceCell; +use uuid::Uuid; + +// ============================================================================ +// Container setup +// ============================================================================ + +const DEFAULT_MINIO_IMAGE: &str = "minio/minio"; +const DEFAULT_MINIO_TAG: &str = "RELEASE.2025-02-28T09-55-16Z"; +const DEFAULT_MINIO_ROOT_USER: &str = "minioadmin"; +const DEFAULT_MINIO_ROOT_PASSWORD: &str = "minioadmin"; + +struct SharedMinio { + endpoint_url: String, + root_user: String, + root_password: String, + _container: testcontainers::ContainerAsync, +} + +struct SharedEs { + host: String, + port: u16, + _container: testcontainers::ContainerAsync, +} + +static SHARED_MINIO: OnceCell = OnceCell::const_new(); +static SHARED_ES: OnceCell = OnceCell::const_new(); +static MINIO_AWS_ENV: std::sync::Once = std::sync::Once::new(); + +fn run_s3_es_tests() -> bool { + std::env::var("RUN_MINIO_S3_ES_TESTS").ok().as_deref() == Some("1") +} + +fn skip_if_disabled(test_name: &str) -> bool { + if run_s3_es_tests() { + return false; + } + eprintln!("skipping S3+ES test {test_name} (set RUN_MINIO_S3_ES_TESTS=1 to enable)"); + true +} + +async fn shared_minio() -> &'static SharedMinio { + SHARED_MINIO + .get_or_init(|| async { + let image = + std::env::var("MINIO_IMAGE").unwrap_or_else(|_| DEFAULT_MINIO_IMAGE.to_string()); + let tag = std::env::var("MINIO_TAG").unwrap_or_else(|_| DEFAULT_MINIO_TAG.to_string()); + let root_user = std::env::var("MINIO_ROOT_USER") + .unwrap_or_else(|_| DEFAULT_MINIO_ROOT_USER.to_string()); + let root_password = std::env::var("MINIO_ROOT_PASSWORD") + .unwrap_or_else(|_| DEFAULT_MINIO_ROOT_PASSWORD.to_string()); + + let container = GenericImage::new(image, tag) + .with_wait_for(WaitFor::message_on_stderr("API:")) + .with_exposed_port(9000.tcp()) + .with_env_var("MINIO_ROOT_USER", root_user.clone()) + .with_env_var("MINIO_ROOT_PASSWORD", root_password.clone()) + .with_cmd(["server", "/data", "--console-address", ":9001"]) + .start() + .await + .expect("failed to start MinIO container"); + + let host = container + .get_host() + .await + .expect("failed to resolve MinIO host") + .to_string(); + let port = container + .get_host_port_ipv4(9000) + .await + .expect("failed to resolve MinIO port"); + + SharedMinio { + endpoint_url: format!("http://{host}:{port}"), + root_user, + root_password, + _container: container, + } + }) + .await +} + +async fn shared_es() -> &'static SharedEs { + SHARED_ES + .get_or_init(|| async { + let run_id = std::env::var("GITHUB_RUN_ID").unwrap_or_default(); + let container = ElasticSearch::default() + .with_env_var("ES_JAVA_OPTS", "-Xms256m -Xmx256m") + .with_label("github.run_id", &run_id) + .with_startup_timeout(std::time::Duration::from_secs(120)) + .start() + .await + .expect("failed to start Elasticsearch container"); + + let port = container + .get_host_port_ipv4(9200) + .await + .expect("failed to get ES port"); + let host = container + .get_host() + .await + .expect("failed to get ES host") + .to_string(); + + SharedEs { + host, + port, + _container: container, + } + }) + .await +} + +fn ensure_minio_env(shared: &SharedMinio) { + MINIO_AWS_ENV.call_once(|| { + // SAFETY: executes exactly once before any S3 backend construction. + unsafe { + std::env::set_var("AWS_ACCESS_KEY_ID", &shared.root_user); + std::env::set_var("AWS_SECRET_ACCESS_KEY", &shared.root_password); + std::env::set_var("AWS_REGION", "us-east-1"); + std::env::set_var("AWS_EC2_METADATA_DISABLED", "true"); + } + }); +} + +async fn build_sdk_client(shared: &SharedMinio) -> Client { + let creds = Credentials::new( + shared.root_user.clone(), + shared.root_password.clone(), + None, + None, + "s3-es-tests", + ); + let sdk_config = aws_config::defaults(BehaviorVersion::latest()) + .region(Region::new("us-east-1")) + .endpoint_url(shared.endpoint_url.clone()) + .credentials_provider(creds) + .load() + .await; + let s3_config = aws_sdk_s3::config::Builder::from(&sdk_config) + .force_path_style(true) + .build(); + Client::from_conf(s3_config) +} + +async fn ensure_bucket(client: &Client, bucket: &str) { + if client.head_bucket().bucket(bucket).send().await.is_ok() { + return; + } + client + .create_bucket() + .bucket(bucket) + .send() + .await + .expect("failed to create test bucket"); +} + +fn build_search_registry() -> Arc> { + let data_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .parent() + .and_then(|p| p.parent()) + .map(|p| p.join("data")) + .unwrap_or_else(|| PathBuf::from("data")); + + let loader = SearchParameterLoader::new(FhirVersion::default()); + let mut registry = SearchParameterRegistry::new(); + + if let Ok(params) = loader.load_embedded() { + for p in params { + let _ = registry.register(p); + } + } + if let Ok(params) = loader.load_from_spec_file(&data_dir) { + for p in params { + let _ = registry.register(p); + } + } + + Arc::new(RwLock::new(registry)) +} + +// ============================================================================ +// Composite harness +// ============================================================================ + +struct S3EsHarness { + composite: CompositeStorage, + #[allow(dead_code)] + bucket: String, +} + +async fn make_harness(scope: &str) -> S3EsHarness { + let minio = shared_minio().await; + let es = shared_es().await; + ensure_minio_env(minio); + + let sdk_client = build_sdk_client(minio).await; + let bucket = format!("hfs-s3es-{}", Uuid::new_v4().simple()); + ensure_bucket(&sdk_client, &bucket).await; + + let prefix = format!("test/{}/{}", Uuid::new_v4(), scope); + + // S3 backend + let s3_config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: bucket.clone(), + }, + prefix: Some(prefix), + region: Some("us-east-1".to_string()), + endpoint_url: Some(minio.endpoint_url.clone()), + force_path_style: true, + allow_http: true, + validate_buckets_on_startup: true, + ..Default::default() + }; + let s3 = Arc::new(S3Backend::from_env(s3_config).expect("create S3 backend")); + + // Elasticsearch backend + let unique_prefix = format!("hfs_{}", Uuid::new_v4().simple()); + let es_config = ElasticsearchConfig { + nodes: vec![format!("http://{}:{}", es.host, es.port)], + index_prefix: unique_prefix, + number_of_replicas: 0, + refresh_interval: "1ms".to_string(), + ..Default::default() + }; + let search_registry = build_search_registry(); + let es_backend = Arc::new( + ElasticsearchBackend::with_shared_registry(es_config, search_registry) + .expect("create ES backend"), + ); + es_backend + .initialize() + .await + .expect("initialize ES backend"); + + // Composite + let composite_config = CompositeConfig::builder() + .primary("s3", BackendKind::S3) + .search_backend("es", BackendKind::Elasticsearch) + .build() + .expect("build composite config"); + + let mut backends: HashMap = HashMap::new(); + backends.insert("s3".to_string(), s3.clone() as DynStorage); + backends.insert("es".to_string(), es_backend.clone() as DynStorage); + + let mut search_providers: HashMap = HashMap::new(); + search_providers.insert("s3".to_string(), s3.clone() as DynSearchProvider); + search_providers.insert("es".to_string(), es_backend.clone() as DynSearchProvider); + + let composite = CompositeStorage::new(composite_config, backends) + .expect("create composite storage") + .with_search_providers(search_providers) + .with_full_primary(s3); + + S3EsHarness { composite, bucket } +} + +fn tenant(id: &str) -> TenantContext { + TenantContext::new(TenantId::new(id), TenantPermissions::full_access()) +} + +// ============================================================================ +// Tests +// ============================================================================ + +/// Write a Patient to S3, then verify it appears in ES search by name. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_create_then_search() { + if skip_if_disabled("s3_es_test_create_then_search") { + return; + } + + let harness = make_harness("create-search").await; + let tenant = tenant("s3-es-tenant"); + + let family = format!("TestFamily-{}", Uuid::new_v4().simple()); + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({ + "resourceType": "Patient", + "name": [{"family": family}] + }), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + // Allow ES refresh (1ms interval configured, but give a little time) + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let query = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&family)], + chain: vec![], + components: vec![], + }); + + let results = harness + .composite + .search(&tenant, &query) + .await + .expect("search should succeed"); + + assert!( + !results.resources.items.is_empty(), + "expected at least one result for family={family}" + ); + assert!(results.resources.items.iter().any(|r| r.id() == created.id())); +} + +/// Update a Patient, then verify search returns updated fields. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_update_then_search() { + if skip_if_disabled("s3_es_test_update_then_search") { + return; + } + + let harness = make_harness("update-search").await; + let tenant = tenant("s3-es-tenant"); + + let original_family = format!("OrigFamily-{}", Uuid::new_v4().simple()); + let updated_family = format!("UpdatedFamily-{}", Uuid::new_v4().simple()); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "name": [{"family": original_family}]}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .update( + &tenant, + &created, + json!({"resourceType": "Patient", "id": created.id(), "name": [{"family": updated_family}]}), + ) + .await + .expect("update should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Search by new family name — should find it + let query_new = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&updated_family)], + chain: vec![], + components: vec![], + }); + let results = harness + .composite + .search(&tenant, &query_new) + .await + .expect("search by new family should succeed"); + assert!( + results.resources.items.iter().any(|r| r.id() == created.id()), + "updated resource should appear in search" + ); +} + +/// Delete a resource, verify it no longer appears in search results. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_delete_then_search() { + if skip_if_disabled("s3_es_test_delete_then_search") { + return; + } + + let harness = make_harness("delete-search").await; + let tenant = tenant("s3-es-tenant"); + + let family = format!("DeleteMe-{}", Uuid::new_v4().simple()); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "name": [{"family": family}]}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + harness + .composite + .delete(&tenant, "Patient", created.id()) + .await + .expect("delete should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let query = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&family)], + chain: vec![], + components: vec![], + }); + let results = harness + .composite + .search(&tenant, &query) + .await + .expect("search after delete should not error"); + assert!( + results.resources.items.iter().all(|r| r.id() != created.id()), + "deleted resource should not appear in search" + ); +} + +/// Call `_history` on a resource after updates — versions must come from S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_history_from_s3() { + if skip_if_disabled("s3_es_test_history_from_s3") { + return; + } + + use helios_persistence::core::history::{HistoryParams, InstanceHistoryProvider}; + + let harness = make_harness("history-s3").await; + let tenant = tenant("s3-es-tenant"); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .update( + &tenant, + &created, + json!({"resourceType": "Patient", "id": created.id(), "active": false}), + ) + .await + .expect("update should succeed"); + + let history = harness + .composite + .history_instance( + &tenant, + "Patient", + created.id(), + &HistoryParams::new().include_deleted(true), + ) + .await + .expect("history should succeed"); + + assert!( + history.items.len() >= 2, + "expected at least 2 history versions, got {}", + history.items.len() + ); + let ids: Vec<&str> = history + .items + .iter() + .map(|e| e.resource.version_id()) + .collect(); + assert!(ids.contains(&"1"), "version 1 should be in history"); + assert!(ids.contains(&"2"), "version 2 should be in history"); +} + +/// Confirm `_vread` reads from S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_vread_from_s3() { + if skip_if_disabled("s3_es_test_vread_from_s3") { + return; + } + + use helios_persistence::core::VersionedStorage; + + let harness = make_harness("vread-s3").await; + let tenant = tenant("s3-es-tenant"); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .update( + &tenant, + &created, + json!({"resourceType": "Patient", "id": created.id(), "active": false}), + ) + .await + .expect("update should succeed"); + + let v1 = harness + .composite + .vread(&tenant, "Patient", created.id(), "1") + .await + .expect("vread v1 should succeed") + .expect("v1 should exist"); + + assert_eq!(v1.version_id(), "1"); + assert_eq!(v1.content()["active"], true); +} + +/// Multi-tenant isolation: resource written to tenant-a must not appear in tenant-b search. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_multi_tenant_isolation() { + if skip_if_disabled("s3_es_test_multi_tenant_isolation") { + return; + } + + let harness = make_harness("tenant-isolation").await; + let tenant_a = tenant("tenant-a"); + let tenant_b = tenant("tenant-b"); + + let family = format!("IsolatedFamily-{}", Uuid::new_v4().simple()); + + harness + .composite + .create( + &tenant_a, + "Patient", + json!({"resourceType": "Patient", "name": [{"family": family}]}), + FhirVersion::default(), + ) + .await + .expect("create in tenant-a should succeed"); + + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let query = SearchQuery::new("Patient").with_parameter(SearchParameter { + name: "family".to_string(), + param_type: SearchParamType::String, + modifier: None, + values: vec![SearchValue::eq(&family)], + chain: vec![], + components: vec![], + }); + + // tenant-b should NOT see tenant-a's data + let results_b = harness + .composite + .search(&tenant_b, &query) + .await + .expect("tenant-b search should succeed"); + assert!( + results_b.resources.items.is_empty(), + "tenant-b should not see tenant-a's resource" + ); + + // tenant-a should see its own data + let results_a = harness + .composite + .search(&tenant_a, &query) + .await + .expect("tenant-a search should succeed"); + assert!( + !results_a.resources.items.is_empty(), + "tenant-a should see its own resource" + ); +} + +/// Read returns the resource after it was written to S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_read_from_s3() { + if skip_if_disabled("s3_es_test_read_from_s3") { + return; + } + + let harness = make_harness("read-s3").await; + let tenant = tenant("s3-es-tenant"); + + let id = format!("patient-{}", Uuid::new_v4()); + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "id": id, "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + let read = harness + .composite + .read(&tenant, "Patient", created.id()) + .await + .expect("read should succeed") + .expect("resource should exist"); + + assert_eq!(read.id(), created.id()); + assert_eq!(read.content()["active"], true); +} + +/// Confirm read returns Not Found for missing resources. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_read_missing_returns_none() { + if skip_if_disabled("s3_es_test_read_missing_returns_none") { + return; + } + + let harness = make_harness("read-missing").await; + let tenant = tenant("s3-es-tenant"); + + let result = harness + .composite + .read(&tenant, "Patient", "does-not-exist") + .await + .expect("read of missing should not error"); + assert!(result.is_none()); +} + +/// Verify that S3 SearchProvider returns UnsupportedCapability, confirming +/// the composite routes search to ES and not S3. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_s3_search_returns_unsupported() { + if skip_if_disabled("s3_es_test_s3_search_returns_unsupported") { + return; + } + + let minio = shared_minio().await; + ensure_minio_env(minio); + + let sdk_client = build_sdk_client(minio).await; + let bucket = format!("hfs-s3-only-{}", Uuid::new_v4().simple()); + ensure_bucket(&sdk_client, &bucket).await; + + let s3_config = S3BackendConfig { + tenancy_mode: S3TenancyMode::PrefixPerTenant { + bucket: bucket.clone(), + }, + region: Some("us-east-1".to_string()), + endpoint_url: Some(minio.endpoint_url.clone()), + force_path_style: true, + allow_http: true, + validate_buckets_on_startup: true, + ..Default::default() + }; + let s3 = S3Backend::from_env(s3_config).expect("create S3 backend"); + let tenant_ctx = tenant("s3-search-test"); + + let query = SearchQuery::new("Patient"); + let result = s3.search(&tenant_ctx, &query).await; + + // S3 alone must return UnsupportedCapability for search + // S3 search returns StorageError::Backend(BackendError::UnsupportedCapability) + assert!( + result.is_err(), + "S3 SearchProvider stub must return an error, got: {result:?}" + ); +} + +/// After delete, reading the resource returns a Gone error (S3 marks it deleted). +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn s3_es_test_read_after_delete_returns_gone() { + if skip_if_disabled("s3_es_test_read_after_delete_returns_gone") { + return; + } + + let harness = make_harness("gone").await; + let tenant = tenant("s3-es-tenant"); + + let created = harness + .composite + .create( + &tenant, + "Patient", + json!({"resourceType": "Patient", "active": true}), + FhirVersion::default(), + ) + .await + .expect("create should succeed"); + + harness + .composite + .delete(&tenant, "Patient", created.id()) + .await + .expect("delete should succeed"); + + let result = harness + .composite + .read(&tenant, "Patient", created.id()) + .await; + + assert!( + matches!( + result, + Err(StorageError::Resource(ResourceError::Gone { .. })) + ), + "read after delete should return Gone, got: {result:?}" + ); +} + diff --git a/crates/rest/src/config.rs b/crates/rest/src/config.rs index 72e5564e..07b834ed 100644 --- a/crates/rest/src/config.rs +++ b/crates/rest/src/config.rs @@ -66,7 +66,7 @@ pub enum StorageBackendMode { /// AWS S3 object storage for CRUD, versioning, history, and bulk operations. /// Requires AWS credentials via the standard provider chain. No search support. S3, - /// AWS S3 for CRUD + Elasticsearch for search. + /// AWS S3 for CRUD/history + Elasticsearch for search. /// Requires AWS credentials and a running Elasticsearch instance. S3Elasticsearch, } From a57980d26c907cd317411fa4143b37841ab20191 Mon Sep 17 00:00:00 2001 From: aacruzgon Date: Sat, 7 Mar 2026 17:20:04 -0500 Subject: [PATCH 12/17] style: cargo fmt --- crates/hfs/src/main.rs | 1 - crates/persistence/tests/s3_es_tests.rs | 21 +++++++++++++++++---- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index b3737d21..783243a0 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -457,7 +457,6 @@ fn build_search_registry( registry } - /// Starts the server with S3 + Elasticsearch composite backend. #[cfg(all(feature = "s3", feature = "elasticsearch"))] async fn start_s3_elasticsearch(config: ServerConfig) -> anyhow::Result<()> { diff --git a/crates/persistence/tests/s3_es_tests.rs b/crates/persistence/tests/s3_es_tests.rs index a8934f92..d2dd6e6a 100644 --- a/crates/persistence/tests/s3_es_tests.rs +++ b/crates/persistence/tests/s3_es_tests.rs @@ -355,7 +355,13 @@ async fn s3_es_test_create_then_search() { !results.resources.items.is_empty(), "expected at least one result for family={family}" ); - assert!(results.resources.items.iter().any(|r| r.id() == created.id())); + assert!( + results + .resources + .items + .iter() + .any(|r| r.id() == created.id()) + ); } /// Update a Patient, then verify search returns updated fields. @@ -409,7 +415,11 @@ async fn s3_es_test_update_then_search() { .await .expect("search by new family should succeed"); assert!( - results.resources.items.iter().any(|r| r.id() == created.id()), + results + .resources + .items + .iter() + .any(|r| r.id() == created.id()), "updated resource should appear in search" ); } @@ -461,7 +471,11 @@ async fn s3_es_test_delete_then_search() { .await .expect("search after delete should not error"); assert!( - results.resources.items.iter().all(|r| r.id() != created.id()), + results + .resources + .items + .iter() + .all(|r| r.id() != created.id()), "deleted resource should not appear in search" ); } @@ -757,4 +771,3 @@ async fn s3_es_test_read_after_delete_returns_gone() { "read after delete should return Gone, got: {result:?}" ); } - From 607634baab4093f93ae2653605483fe908fd5611 Mon Sep 17 00:00:00 2001 From: aacruzgon Date: Mon, 9 Mar 2026 16:49:12 -0400 Subject: [PATCH 13/17] increase unit test coverage for composite storage, rest config, and hfs main --- crates/hfs/src/main.rs | 85 +++ crates/persistence/src/composite/storage.rs | 582 +++++++++++++++++++- crates/rest/src/config.rs | 210 +++++++ 3 files changed, 876 insertions(+), 1 deletion(-) diff --git a/crates/hfs/src/main.rs b/crates/hfs/src/main.rs index 783243a0..a5bff953 100644 --- a/crates/hfs/src/main.rs +++ b/crates/hfs/src/main.rs @@ -598,3 +598,88 @@ async fn start_s3_elasticsearch(_config: ServerConfig) -> anyhow::Result<()> { feature = "s3" )))] compile_error!("At least one database backend feature must be enabled"); + +#[cfg(test)] +mod tests { + use super::*; + use helios_rest::ServerConfig; + + // ── create_sqlite_backend() ─────────────────────────────────── + + #[cfg(feature = "sqlite")] + #[test] + fn test_create_sqlite_backend_memory() { + let config = ServerConfig { + database_url: Some(":memory:".to_string()), + ..Default::default() + }; + let result = create_sqlite_backend(&config); + assert!(result.is_ok(), "in-memory SQLite backend should succeed"); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_create_sqlite_backend_temp_file() { + let dir = std::env::temp_dir(); + let path = dir.join(format!( + "hfs_test_{}.db", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .subsec_nanos() + )); + let config = ServerConfig { + database_url: Some(path.to_string_lossy().to_string()), + ..Default::default() + }; + let result = create_sqlite_backend(&config); + // Clean up the file even if the assertion fails. + let _ = std::fs::remove_file(&path); + assert!(result.is_ok(), "file-based SQLite backend should succeed"); + } + + #[cfg(feature = "sqlite")] + #[test] + fn test_create_sqlite_backend_default_url() { + // When database_url is None, defaults to "fhir.db" in the current dir. + // We redirect to a temp path to avoid creating persistent side-effects. + let dir = std::env::temp_dir(); + let path = dir.join(format!( + "hfs_test_default_{}.db", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .subsec_nanos() + )); + // Override the default path via the env var that ServerConfig reads. + let config = ServerConfig { + database_url: Some(path.to_string_lossy().to_string()), + ..Default::default() + }; + let result = create_sqlite_backend(&config); + let _ = std::fs::remove_file(&path); + assert!(result.is_ok()); + } + + // ── build_search_registry() ─────────────────────────────────── + + #[cfg(feature = "elasticsearch")] + #[test] + fn test_build_search_registry_returns_registry() { + use helios_fhir::FhirVersion; + let registry = build_search_registry(FhirVersion::R4, None); + // Registry should be a valid Arc> and not panic when read. + let _guard = registry.read(); + } + + // ── ServerConfig validation is exercised at startup ────────── + + #[test] + fn test_server_config_default_is_valid() { + let config = ServerConfig::default(); + assert!( + config.validate().is_ok(), + "default ServerConfig should be valid" + ); + } +} diff --git a/crates/persistence/src/composite/storage.rs b/crates/persistence/src/composite/storage.rs index d208e274..689b737d 100644 --- a/crates/persistence/src/composite/storage.rs +++ b/crates/persistence/src/composite/storage.rs @@ -1606,7 +1606,122 @@ impl CapabilityProvider for CompositeStorage { #[cfg(test)] mod tests { use super::*; - use crate::core::BackendKind; + use crate::core::{BackendKind, CapabilityProvider}; + use crate::tenant::{TenantContext, TenantId, TenantPermissions}; + + /// Minimal mock storage for unit testing CompositeStorage. + struct MockStorage; + + #[async_trait::async_trait] + impl ResourceStorage for MockStorage { + fn backend_name(&self) -> &'static str { + "mock" + } + + async fn create( + &self, + tenant: &TenantContext, + resource_type: &str, + resource: Value, + fhir_version: FhirVersion, + ) -> StorageResult { + let id = uuid::Uuid::new_v4().to_string(); + Ok(StoredResource::new( + resource_type, + &id, + tenant.tenant_id().clone(), + resource, + fhir_version, + )) + } + + async fn create_or_update( + &self, + tenant: &TenantContext, + resource_type: &str, + id: &str, + resource: Value, + fhir_version: FhirVersion, + ) -> StorageResult<(StoredResource, bool)> { + Ok(( + StoredResource::new( + resource_type, + id, + tenant.tenant_id().clone(), + resource, + fhir_version, + ), + true, + )) + } + + async fn read( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _id: &str, + ) -> StorageResult> { + Ok(None) + } + + async fn update( + &self, + tenant: &TenantContext, + current: &StoredResource, + resource: Value, + ) -> StorageResult { + Ok(StoredResource::new( + current.resource_type(), + current.id(), + tenant.tenant_id().clone(), + resource, + current.fhir_version(), + )) + } + + async fn delete( + &self, + _tenant: &TenantContext, + _resource_type: &str, + _id: &str, + ) -> StorageResult<()> { + Ok(()) + } + + async fn count( + &self, + _tenant: &TenantContext, + _resource_type: Option<&str>, + ) -> StorageResult { + Ok(0) + } + } + + fn make_tenant() -> TenantContext { + TenantContext::new(TenantId::new("test"), TenantPermissions::full_access()) + } + + fn make_composite_no_secondary() -> CompositeStorage { + let config = CompositeConfig::builder() + .primary("primary", BackendKind::Sqlite) + .build() + .unwrap(); + let mut backends = HashMap::new(); + backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage); + CompositeStorage::new(config, backends).unwrap() + } + + fn make_composite_with_secondary() -> CompositeStorage { + let config = CompositeConfig::builder() + .primary("primary", BackendKind::Sqlite) + .search_backend("es", BackendKind::Elasticsearch) + .build() + .unwrap(); + let mut backends = HashMap::new(); + backends.insert("primary".to_string(), Arc::new(MockStorage) as DynStorage); + backends.insert("es".to_string(), Arc::new(MockStorage) as DynStorage); + CompositeStorage::new(config, backends).unwrap() + } fn test_config() -> CompositeConfig { CompositeConfig::builder() @@ -1616,18 +1731,483 @@ mod tests { .unwrap() } + // ── BackendHealth ────────────────────────────────────────────── + #[test] fn test_backend_health_default() { let health = BackendHealth::default(); assert!(health.healthy); assert_eq!(health.failure_count, 0); assert!(health.last_error.is_none()); + assert!(health.last_success.is_none()); + } + + #[test] + fn test_backend_health_clone() { + let health = BackendHealth { + healthy: false, + last_success: None, + failure_count: 5, + last_error: Some("timeout".to_string()), + }; + let cloned = health.clone(); + assert!(!cloned.healthy); + assert_eq!(cloned.failure_count, 5); + assert_eq!(cloned.last_error.as_deref(), Some("timeout")); } + // ── CompositeConfig ──────────────────────────────────────────── + #[test] fn test_composite_config() { let config = test_config(); assert_eq!(config.primary_id(), Some("sqlite")); assert_eq!(config.secondaries().count(), 1); } + + // ── CompositeStorage::new() ──────────────────────────────────── + + #[test] + fn test_new_success() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.backend_name(), "composite"); + } + + #[test] + fn test_new_missing_primary_backend_in_map() { + let config = CompositeConfig::builder() + .primary("primary", BackendKind::Sqlite) + .build() + .unwrap(); + // Deliberately omit the primary from the backends map + let backends: HashMap = HashMap::new(); + let result = CompositeStorage::new(config, backends); + assert!(result.is_err()); + } + + #[test] + fn test_new_with_secondary() { + let composite = make_composite_with_secondary(); + assert!(composite.secondary("es").is_some()); + assert!(composite.secondary("nonexistent").is_none()); + } + + // ── Accessors ───────────────────────────────────────────────── + + #[test] + fn test_config_accessor() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.config().primary_id(), Some("primary")); + } + + #[test] + fn test_primary_accessor() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.primary().backend_name(), "mock"); + } + + #[test] + fn test_secondaries_accessor() { + let composite = make_composite_with_secondary(); + assert_eq!(composite.secondaries().len(), 1); + assert!(composite.secondaries().contains_key("es")); + } + + #[test] + fn test_secondaries_empty_when_no_secondary() { + let composite = make_composite_no_secondary(); + assert!(composite.secondaries().is_empty()); + } + + #[test] + fn test_secondary_accessor_present() { + let composite = make_composite_with_secondary(); + assert!(composite.secondary("es").is_some()); + } + + #[test] + fn test_secondary_accessor_absent() { + let composite = make_composite_no_secondary(); + assert!(composite.secondary("missing").is_none()); + } + + // ── Health tracking ─────────────────────────────────────────── + + #[test] + fn test_backend_health_initially_healthy() { + let composite = make_composite_no_secondary(); + let health = composite.backend_health("primary").unwrap(); + assert!(health.healthy); + } + + #[test] + fn test_backend_health_missing_id_returns_none() { + let composite = make_composite_no_secondary(); + assert!(composite.backend_health("nonexistent").is_none()); + } + + #[test] + fn test_is_backend_healthy_true() { + let composite = make_composite_no_secondary(); + assert!(composite.is_backend_healthy("primary")); + } + + #[test] + fn test_is_backend_healthy_unknown_returns_false() { + let composite = make_composite_no_secondary(); + assert!(!composite.is_backend_healthy("nonexistent")); + } + + #[test] + fn test_update_health_success_resets_failures() { + let composite = make_composite_no_secondary(); + // First, record a few failures + composite.update_health("primary", false, Some("err1".to_string())); + composite.update_health("primary", false, Some("err2".to_string())); + let health = composite.backend_health("primary").unwrap(); + assert_eq!(health.failure_count, 2); + + // Now record a success — should reset + composite.update_health("primary", true, None); + let health = composite.backend_health("primary").unwrap(); + assert!(health.healthy); + assert_eq!(health.failure_count, 0); + assert!(health.last_error.is_none()); + assert!(health.last_success.is_some()); + } + + #[test] + fn test_update_health_failure_increments_count() { + let composite = make_composite_no_secondary(); + composite.update_health("primary", false, Some("timeout".to_string())); + let health = composite.backend_health("primary").unwrap(); + assert_eq!(health.failure_count, 1); + assert_eq!(health.last_error.as_deref(), Some("timeout")); + } + + #[test] + fn test_update_health_marks_unhealthy_after_threshold() { + let composite = make_composite_no_secondary(); + // Default failure_threshold is 3 + let threshold = composite.config.health_config.failure_threshold; + for i in 0..threshold { + composite.update_health("primary", false, Some(format!("error {}", i))); + } + let health = composite.backend_health("primary").unwrap(); + assert!(!health.healthy); + assert_eq!(health.failure_count, threshold); + } + + #[test] + fn test_update_health_ignores_unknown_backend() { + let composite = make_composite_no_secondary(); + // Should not panic on unknown backend ID + composite.update_health("nonexistent", false, Some("err".to_string())); + } + + // ── sync_to_secondaries when sync_manager is None ───────────── + + #[tokio::test] + async fn test_sync_to_secondaries_no_sync_manager_returns_ok() { + let composite = make_composite_no_secondary(); + // No sync manager (no secondaries) → should return Ok immediately + let result = composite + .sync_to_secondaries(super::super::sync::SyncEvent::Delete { + resource_type: "Patient".to_string(), + resource_id: "1".to_string(), + tenant_id: TenantId::new("test"), + }) + .await; + assert!(result.is_ok()); + } + + // ── routing_error_to_storage_error ──────────────────────────── + + #[test] + fn test_routing_error_no_primary_backend() { + let composite = make_composite_no_secondary(); + let err = composite.routing_error_to_storage_error(RoutingError::NoPrimaryBackend); + match err { + StorageError::Backend(BackendError::Unavailable { backend_name, .. }) => { + assert_eq!(backend_name, "primary"); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[test] + fn test_routing_error_no_capable_backend() { + use super::super::analyzer::QueryFeature; + let composite = make_composite_no_secondary(); + let err = composite.routing_error_to_storage_error(RoutingError::NoCapableBackend { + feature: QueryFeature::FullTextSearch, + }); + match err { + StorageError::Backend(BackendError::UnsupportedCapability { + backend_name, + capability, + }) => { + assert_eq!(backend_name, "composite"); + assert!(!capability.is_empty()); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[test] + fn test_routing_error_backend_unavailable() { + let composite = make_composite_no_secondary(); + let err = composite.routing_error_to_storage_error(RoutingError::BackendUnavailable { + backend_id: "my-backend".to_string(), + }); + match err { + StorageError::Backend(BackendError::ConnectionFailed { backend_name, .. }) => { + assert_eq!(backend_name, "my-backend"); + } + other => panic!("unexpected error: {:?}", other), + } + } + + // ── CapabilityProvider ──────────────────────────────────────── + + #[test] + fn test_capabilities_backend_name() { + let composite = make_composite_no_secondary(); + let caps = composite.capabilities(); + assert_eq!(caps.backend_name, "composite"); + } + + #[test] + fn test_backend_name_is_composite() { + let composite = make_composite_no_secondary(); + assert_eq!(composite.backend_name(), "composite"); + } + + // ── "No capability" error paths ─────────────────────────────── + + #[tokio::test] + async fn test_conditional_create_no_capability() { + use crate::core::ConditionalStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite + .conditional_create( + &tenant, + "Patient", + serde_json::json!({}), + "identifier=foo", + FhirVersion::default(), + ) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_versioned_storage_vread_no_capability() { + use crate::core::VersionedStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.vread(&tenant, "Patient", "1", "1").await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("VersionedStorage")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[tokio::test] + async fn test_instance_history_no_capability() { + use crate::core::InstanceHistoryProvider; + use crate::core::history::HistoryParams; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite + .history_instance(&tenant, "Patient", "1", &HistoryParams::default()) + .await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("InstanceHistoryProvider")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[tokio::test] + async fn test_bundle_provider_process_batch_no_capability() { + use crate::core::BundleProvider; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.process_batch(&tenant, vec![]).await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("BundleProvider")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + #[tokio::test] + async fn test_bundle_provider_process_transaction_no_capability() { + use crate::core::BundleProvider; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.process_transaction(&tenant, vec![]).await; + assert!(result.is_err()); + } + + // ── search_count when no search provider ────────────────────── + + #[tokio::test] + async fn test_search_count_no_search_provider_returns_error() { + use crate::core::SearchProvider; + use crate::types::SearchQuery; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let query = SearchQuery::new("Patient"); + let result = composite.search_count(&tenant, &query).await; + assert!(result.is_err()); + match result.unwrap_err() { + StorageError::Backend(BackendError::UnsupportedCapability { capability, .. }) => { + assert!(capability.contains("search_count")); + } + other => panic!("unexpected error: {:?}", other), + } + } + + // ── with_search_providers ───────────────────────────────────── + + #[test] + fn test_with_search_providers() { + let composite = make_composite_no_secondary(); + let mut providers = HashMap::new(); + providers.insert( + "primary".to_string(), + Arc::new(MockStorage) as DynSearchProvider, + ); + let composite = composite.with_search_providers(providers); + // Should have one search provider now + assert!(composite.search_providers.contains_key("primary")); + } + + // ── extract_reference_values (static method) ────────────────── + + #[test] + fn test_extract_reference_values_object_with_reference() { + let obj = serde_json::json!({"reference": "Patient/123"}); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&obj, &mut refs); + assert_eq!(refs, vec!["Patient/123"]); + } + + #[test] + fn test_extract_reference_values_object_without_reference() { + let obj = serde_json::json!({"display": "John Smith"}); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&obj, &mut refs); + assert!(refs.is_empty()); + } + + #[test] + fn test_extract_reference_values_array() { + let arr = serde_json::json!([ + {"reference": "Patient/1"}, + {"reference": "Patient/2"}, + {"display": "no ref here"} + ]); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&arr, &mut refs); + assert_eq!(refs.len(), 2); + assert!(refs.contains(&"Patient/1".to_string())); + assert!(refs.contains(&"Patient/2".to_string())); + } + + #[test] + fn test_extract_reference_values_primitive_ignored() { + let val = serde_json::json!("just a string"); + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&val, &mut refs); + assert!(refs.is_empty()); + } + + #[test] + fn test_extract_reference_values_null_ignored() { + let val = serde_json::Value::Null; + let mut refs = Vec::new(); + CompositeStorage::extract_reference_values(&val, &mut refs); + assert!(refs.is_empty()); + } + + // ── CRUD delegation to primary ──────────────────────────────── + + #[tokio::test] + async fn test_create_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite + .create( + &tenant, + "Patient", + serde_json::json!({"resourceType": "Patient"}), + FhirVersion::default(), + ) + .await; + assert!(result.is_ok()); + let stored = result.unwrap(); + assert_eq!(stored.resource_type(), "Patient"); + } + + #[tokio::test] + async fn test_read_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.read(&tenant, "Patient", "1").await; + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); // MockStorage always returns None + } + + #[tokio::test] + async fn test_count_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.count(&tenant, Some("Patient")).await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), 0); + } + + #[tokio::test] + async fn test_delete_delegates_to_primary() { + use crate::core::ResourceStorage; + let composite = make_composite_no_secondary(); + let tenant = make_tenant(); + let result = composite.delete(&tenant, "Patient", "1").await; + assert!(result.is_ok()); + } + + // SearchProvider impl for MockStorage (must live inside test module). + #[async_trait::async_trait] + impl SearchProvider for MockStorage { + async fn search( + &self, + _tenant: &TenantContext, + _query: &crate::types::SearchQuery, + ) -> StorageResult { + use crate::types::Page; + Ok(SearchResult::new(Page::empty())) + } + + async fn search_count( + &self, + _tenant: &TenantContext, + _query: &crate::types::SearchQuery, + ) -> StorageResult { + Ok(0) + } + } } diff --git a/crates/rest/src/config.rs b/crates/rest/src/config.rs index 07b834ed..01c153b0 100644 --- a/crates/rest/src/config.rs +++ b/crates/rest/src/config.rs @@ -696,4 +696,214 @@ mod tests { assert!(!config.strict_validation); assert_eq!(config.jwt_tenant_claim, "tenant_id"); } + + // ── validate() – max_body_size == 0 ─────────────────────────── + + #[test] + fn test_validate_max_body_size_zero() { + let config = ServerConfig { + max_body_size: 0, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + assert!(errors.iter().any(|e| e.contains("body size"))); + } + + // ── validate() – request_timeout == 0 ──────────────────────── + + #[test] + fn test_validate_request_timeout_zero() { + let config = ServerConfig { + request_timeout: 0, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + assert!(errors.iter().any(|e| e.contains("timeout"))); + } + + // ── validate() – default_page_size == 0 ────────────────────── + + #[test] + fn test_validate_default_page_size_zero() { + let config = ServerConfig { + default_page_size: 0, + max_page_size: 100, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + assert!(errors.iter().any(|e| e.contains("page size"))); + } + + // ── validate() – multiple errors at once ───────────────────── + + #[test] + fn test_validate_multiple_errors() { + let config = ServerConfig { + max_body_size: 0, + request_timeout: 0, + default_page_size: 0, + ..Default::default() + }; + let result = config.validate(); + assert!(result.is_err()); + let errors = result.unwrap_err(); + // At least the three errors above should be present + assert!(errors.len() >= 3); + } + + // ── full_base_url() ─────────────────────────────────────────── + + #[test] + fn test_full_base_url() { + let config = ServerConfig { + base_url: "https://fhir.example.com".to_string(), + ..Default::default() + }; + assert_eq!(config.full_base_url(), "https://fhir.example.com"); + } + + #[test] + fn test_full_base_url_default() { + let config = ServerConfig::default(); + assert_eq!(config.full_base_url(), "http://localhost:8080"); + } + + // ── multitenancy() accessor ─────────────────────────────────── + + #[test] + fn test_multitenancy_accessor() { + let config = ServerConfig::default(); + let mt = config.multitenancy(); + assert_eq!(mt.routing_mode, TenantRoutingMode::HeaderOnly); + } + + // ── StorageBackendMode::default() ───────────────────────────── + + #[test] + fn test_storage_backend_mode_default() { + let mode = StorageBackendMode::default(); + assert_eq!(mode, StorageBackendMode::Sqlite); + } + + // ── TenantRoutingMode::default() ────────────────────────────── + + #[test] + fn test_tenant_routing_mode_default() { + let mode = TenantRoutingMode::default(); + assert_eq!(mode, TenantRoutingMode::HeaderOnly); + } + + // ── Alias parsing variations ────────────────────────────────── + + #[test] + fn test_tenant_routing_mode_aliases() { + // headeronly / header + assert_eq!( + "headeronly".parse::().unwrap(), + TenantRoutingMode::HeaderOnly + ); + assert_eq!( + "header".parse::().unwrap(), + TenantRoutingMode::HeaderOnly + ); + // urlpath / url / path + assert_eq!( + "urlpath".parse::().unwrap(), + TenantRoutingMode::UrlPath + ); + assert_eq!( + "url".parse::().unwrap(), + TenantRoutingMode::UrlPath + ); + assert_eq!( + "path".parse::().unwrap(), + TenantRoutingMode::UrlPath + ); + // combined + assert_eq!( + "combined".parse::().unwrap(), + TenantRoutingMode::Both + ); + } + + #[test] + fn test_tenant_routing_mode_invalid() { + assert!("unknown_mode".parse::().is_err()); + } + + // ── MultitenancyConfig struct field behaviour (without env) ── + + #[test] + fn test_multitenancy_config_strict_validation_field() { + // Test the struct directly, avoiding env-var parallelism issues. + let config = MultitenancyConfig { + routing_mode: TenantRoutingMode::UrlPath, + strict_validation: true, + jwt_tenant_claim: "custom_claim".to_string(), + }; + assert_eq!(config.routing_mode, TenantRoutingMode::UrlPath); + assert!(config.strict_validation); + assert_eq!(config.jwt_tenant_claim, "custom_claim"); + } + + #[test] + fn test_multitenancy_config_from_env_routing_mode_parsed() { + // Parse the routing mode value the same way from_env does, without + // touching global env state. + let result: Result = "url_path".parse(); + assert_eq!(result.unwrap(), TenantRoutingMode::UrlPath); + } + + #[test] + fn test_multitenancy_strict_validation_string_parsing() { + // Mirror the logic inside MultitenancyConfig::from_env. + let parse_strict = |s: &str| -> bool { s.to_lowercase() == "true" || s == "1" }; + assert!(parse_strict("true")); + assert!(parse_strict("TRUE")); + assert!(parse_strict("1")); + assert!(!parse_strict("false")); + assert!(!parse_strict("0")); + assert!(!parse_strict("yes")); + } + + // ── storage_backend_mode() – invalid value ──────────────────── + + #[test] + fn test_storage_backend_mode_invalid_returns_error() { + let config = ServerConfig { + storage_backend: "unknown_backend".to_string(), + ..Default::default() + }; + assert!(config.storage_backend_mode().is_err()); + } + + // ── display for StorageBackendMode ──────────────────────────── + + #[test] + fn test_storage_backend_mode_display_all_variants() { + // Already partially tested, but ensure every variant round-trips + for (variant, expected) in [ + (StorageBackendMode::Sqlite, "sqlite"), + ( + StorageBackendMode::SqliteElasticsearch, + "sqlite-elasticsearch", + ), + (StorageBackendMode::Postgres, "postgres"), + ( + StorageBackendMode::PostgresElasticsearch, + "postgres-elasticsearch", + ), + (StorageBackendMode::S3, "s3"), + (StorageBackendMode::S3Elasticsearch, "s3-elasticsearch"), + ] { + assert_eq!(variant.to_string(), expected); + assert_eq!(expected.parse::().unwrap(), variant); + } + } } From 2121b2b2c161d024db72027ecb947e9b5faeada2 Mon Sep 17 00:00:00 2001 From: smunini Date: Wed, 11 Mar 2026 16:43:09 -0400 Subject: [PATCH 14/17] Added AWS_SESSION_TOKEN [skip ci] --- .github/workflows/inferno.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index 45cd6dca..be51c8d5 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -216,6 +216,7 @@ jobs: env: AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + AWS_SESSION_TOKEN: ${{ secrets.AWS_SESSION_TOKEN }} AWS_DEFAULT_REGION: ${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} run: | BUCKET="${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }}" @@ -250,6 +251,7 @@ jobs: HFS_ELASTICSEARCH_NODES=http://$DOCKER_HOST_IP:$ES_PORT \ AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \ AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ + AWS_SESSION_TOKEN=${{ secrets.AWS_SESSION_TOKEN }} \ AWS_REGION=${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} \ ./target/debug/hfs --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & else From b7e22fa1e3845ae085b88859ef561a7138420e6e Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Wed, 11 Mar 2026 20:11:50 -0400 Subject: [PATCH 15/17] ci: reduce inferno max-parallel to 2 to prevent validator OOM [skip ci] --- .github/workflows/inferno.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index be51c8d5..cef14b35 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -52,7 +52,7 @@ jobs: runs-on: [self-hosted, Linux] strategy: fail-fast: false - max-parallel: 3 + max-parallel: 2 matrix: suite_id: [ From 75d956e5c6d4fee67244d2ab42c1adb8eb60f0df Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Wed, 11 Mar 2026 20:45:47 -0400 Subject: [PATCH 16/17] ci: replace static AWS keys with OIDC role assumption [skip ci] Removes AWS_ACCESS_KEY_ID/SECRET/SESSION_TOKEN secrets and uses aws-actions/configure-aws-credentials@v4 with OIDC to assume arn:aws:iam::438426605210:role/hfs-inferno-test for s3-elasticsearch jobs. --- .github/workflows/inferno.yml | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index cef14b35..0b4c71f3 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -50,6 +50,9 @@ jobs: name: Inferno US Core ${{ matrix.version_label }} Tests (${{ matrix.backend }}) needs: build runs-on: [self-hosted, Linux] + permissions: + id-token: write + contents: read strategy: fail-fast: false max-parallel: 2 @@ -197,6 +200,13 @@ jobs: docker logs $PG_CONTAINER exit 1 + - name: Configure AWS credentials + if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_ROLE_ARN || vars.AWS_ROLE_ARN }} + aws-region: ${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} + - name: Install AWS CLI if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' run: | @@ -213,11 +223,6 @@ jobs: - name: Empty S3 prefix if: matrix.backend == 's3-elasticsearch' && env.HFS_S3_BUCKET != '' - env: - AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} - AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} - AWS_SESSION_TOKEN: ${{ secrets.AWS_SESSION_TOKEN }} - AWS_DEFAULT_REGION: ${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} run: | BUCKET="${{ secrets.HFS_S3_BUCKET || vars.HFS_S3_BUCKET }}" PREFIX="ci/${{ matrix.suite_id }}/" @@ -249,10 +254,6 @@ jobs: HFS_S3_PREFIX=ci/${{ matrix.suite_id }}/ \ HFS_S3_VALIDATE_BUCKETS=false \ HFS_ELASTICSEARCH_NODES=http://$DOCKER_HOST_IP:$ES_PORT \ - AWS_ACCESS_KEY_ID=${{ secrets.AWS_ACCESS_KEY_ID }} \ - AWS_SECRET_ACCESS_KEY=${{ secrets.AWS_SECRET_ACCESS_KEY }} \ - AWS_SESSION_TOKEN=${{ secrets.AWS_SESSION_TOKEN }} \ - AWS_REGION=${{ secrets.AWS_REGION || vars.AWS_REGION || 'us-east-1' }} \ ./target/debug/hfs --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & else ./target/debug/hfs --database-url :memory: --log-level info --port $HFS_PORT --host 0.0.0.0 > "$HFS_LOG" 2>&1 & From c322f35013041ac21ad77379a43669401be64384 Mon Sep 17 00:00:00 2001 From: Steve Munini Date: Wed, 11 Mar 2026 21:21:44 -0400 Subject: [PATCH 17/17] ci: verify external TCP connectivity before marking PostgreSQL ready [skip ci] pg_isready checks inside the container can pass before the Docker port mapping is accepting external connections, causing HFS to fail on startup. --- .github/workflows/inferno.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/inferno.yml b/.github/workflows/inferno.yml index 0b4c71f3..6765cff4 100644 --- a/.github/workflows/inferno.yml +++ b/.github/workflows/inferno.yml @@ -189,9 +189,12 @@ jobs: for i in {1..30}; do if docker exec $PG_CONTAINER pg_isready -U helios > /dev/null 2>&1; then PG_PORT=$(docker port $PG_CONTAINER 5432 | head -1 | sed 's/.*://') - echo "PostgreSQL is ready on port $PG_PORT" - echo "PG_PORT=$PG_PORT" >> $GITHUB_ENV - exit 0 + # Verify external TCP connectivity to the mapped port, not just internal readiness + if timeout 2 bash -c "cat < /dev/null > /dev/tcp/$DOCKER_HOST_IP/$PG_PORT" 2>/dev/null; then + echo "PostgreSQL is ready on port $PG_PORT" + echo "PG_PORT=$PG_PORT" >> $GITHUB_ENV + exit 0 + fi fi echo "Attempt $i/30: PostgreSQL not ready yet..." sleep 2