diff --git a/.callstack.yml b/.callstack.yml new file mode 100644 index 000000000..5e25f11d6 --- /dev/null +++ b/.callstack.yml @@ -0,0 +1,39 @@ +pr_review: + # Default: true + auto_run: true + modules: + # Automatically create a description summarizing the changes in pull request. + description: + enabled: true + diagram: false + + # Find potential bugs in pull request changes or related files. + bug_hunter: + enabled: true + # Include fixes to possible bugs. + suggestions: true + + # Suggest improvements to added code. + code_suggestions: + enabled: true + + # Suggest changes to follow defined code conventions. + code_conventions: + enabled: false + # Describe your code conventions in plain text. + conventions: | + E.g. Exported variables, functions, classes and methods should be defined before private. + + + # Point out any typos or grammatical errors in variable names, texts, comments. + grammar: + enabled: false + + # Suggest performance improvements to added code. + performance: + enabled: true + + # Find potential security issues in added code. + security: + enabled: true + diff --git a/.github/workflows/callstack-reviewer.yml b/.github/workflows/callstack-reviewer.yml new file mode 100644 index 000000000..712bb6891 --- /dev/null +++ b/.github/workflows/callstack-reviewer.yml @@ -0,0 +1,29 @@ +name: Callstack.ai PR Review + +on: + workflow_dispatch: + inputs: + config: + type: string + description: "config for reviewer" + required: true + head: + type: string + description: "head commit sha" + required: true + base: + type: string + description: "base commit sha" + required: false + +jobs: + callstack_pr_review_job: + runs-on: ubuntu-latest + steps: + - name: Review PR + uses: callstackai/action@main + with: + config: ${{ inputs.config }} + head: ${{ inputs.head }} + export: /code/chats.json + diff --git a/cli/crates/federated-dev/src/dev/gateway_nanny.rs b/cli/crates/federated-dev/src/dev/gateway_nanny.rs index a5ae9a185..4d49e3624 100644 --- a/cli/crates/federated-dev/src/dev/gateway_nanny.rs +++ b/cli/crates/federated-dev/src/dev/gateway_nanny.rs @@ -108,7 +108,7 @@ pub struct CliRuntime { impl engine_v2::Runtime for CliRuntime { type Hooks = (); - type CacheFactory = (); + type OperationCacheFactory = (); fn fetcher(&self) -> &runtime::fetch::Fetcher { &self.fetcher @@ -130,7 +130,7 @@ impl engine_v2::Runtime for CliRuntime { &() } - fn cache_factory(&self) -> &() { + fn operation_cache_factory(&self) -> &() { &() } diff --git a/engine/crates/engine-v2/src/engine.rs b/engine/crates/engine-v2/src/engine.rs index 02f846a62..45ef724ab 100644 --- a/engine/crates/engine-v2/src/engine.rs +++ b/engine/crates/engine-v2/src/engine.rs @@ -1,7 +1,7 @@ use ::runtime::{ auth::AccessToken, hooks::Hooks, - hot_cache::{CachedDataKind, HotCache, HotCacheFactory}, + operation_cache::{OperationCache, OperationCacheFactory}, rate_limiting::RateLimitKey, }; use async_runtime::stream::StreamExt as _; @@ -27,7 +27,7 @@ use web_time::Instant; use crate::{ execution::{ExecutableOperation, PreExecutionContext}, - http_response::{HttpGraphqlResponse, HttpGraphqlResponseExtraMetadata}, + http_response::HttpGraphqlResponse, operation::{Operation, PreparedOperation, Variables}, response::{ErrorCode, GraphqlError, Response}, websocket, @@ -59,8 +59,7 @@ pub struct Engine { operation_metrics: GraphqlOperationMetrics, auth: AuthService, retry_budgets: RetryBudgets, - trusted_documents_cache: ::Cache, - operation_cache: ::Cache>, + operation_cache: ::Cache>, } impl Engine { @@ -90,8 +89,7 @@ impl Engine { auth, retry_budgets: RetryBudgets::build(&schema), operation_metrics: GraphqlOperationMetrics::build(runtime.meter()), - trusted_documents_cache: runtime.cache_factory().create(CachedDataKind::TrustedDocument).await, - operation_cache: runtime.cache_factory().create(CachedDataKind::Operation).await, + operation_cache: runtime.operation_cache_factory().create().await, schema, runtime, } @@ -107,14 +105,13 @@ impl Engine { let format = headers.typed_get::(); let request_context = match self.create_request_context(headers).await { Ok(context) => context, - Err(response) => return HttpGraphqlResponse::build(response, format, Default::default()), + Err(response) => return HttpGraphqlResponse::build(response, format), }; if let Err(err) = self.runtime.rate_limiter().limit(&RateLimitKey::Global).await { return HttpGraphqlResponse::build( Response::pre_execution_error(GraphqlError::new(err.to_string(), ErrorCode::RateLimited)), format, - Default::default(), ); } @@ -128,7 +125,6 @@ impl Engine { HttpGraphqlResponse::build( Response::execution_error(GraphqlError::new("Gateway timeout", ErrorCode::GatewayTimeout)), format, - Default::default(), ) } .boxed(), @@ -239,22 +235,11 @@ impl Engine { let (operation_metrics_attributes, response) = ctx.execute_single(request).await; let status = response.status(); - let mut response_metadata = HttpGraphqlResponseExtraMetadata { - operation_name: None, - operation_type: None, - has_errors: !status.is_success(), - }; - let elapsed = start.elapsed(); if let Some(operation_metrics_attributes) = operation_metrics_attributes { tracing::Span::current().record_gql_request((&operation_metrics_attributes).into()); - response_metadata - .operation_name - .clone_from(&operation_metrics_attributes.name); - response_metadata.operation_type = Some(operation_metrics_attributes.ty.as_str()); - self.operation_metrics.record( GraphqlRequestMetricsAttributes { operation: operation_metrics_attributes, @@ -279,7 +264,7 @@ impl Engine { tracing::debug!(target: GRAFBASE_TARGET, "{message}") } - HttpGraphqlResponse::build(response, None, response_metadata) + HttpGraphqlResponse::build(response, None) } .instrument(span) .await @@ -428,9 +413,9 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { if let Some(operation) = self.operation_cache.get(&cache_key).await { Ok(operation) - } else if let Some(persisted_query) = document_fut { - match persisted_query.await { - Ok(query) => Err((cache_key, Some(query))), + } else if let Some(document_fut) = document_fut { + match document_fut.await { + Ok(document) => Err((cache_key, Some(document))), Err(err) => return Err((None, Response::pre_execution_error(err))), } } else { @@ -440,8 +425,8 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { let operation = match result { Ok(operation) => operation, - Err((cache_key, query)) => { - if let Some(query) = query { + Err((cache_key, document)) => { + if let Some(query) = document { request.query = query } let operation = Operation::build(&self.schema, &request) diff --git a/engine/crates/engine-v2/src/engine/cache.rs b/engine/crates/engine-v2/src/engine/cache.rs index 1378b86ee..807cabea0 100644 --- a/engine/crates/engine-v2/src/engine/cache.rs +++ b/engine/crates/engine-v2/src/engine/cache.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD}; use engine::PersistedQueryRequestExtension; use schema::Schema; @@ -6,8 +8,6 @@ use super::SchemaVersion; mod namespaces { pub const OPERATION: &str = "op"; - pub const TRUSTED_DOCUMENT: &str = "tdoc"; - pub const APQ: &str = "apq"; } /// Unique cache key that generates a URL-safe string. @@ -18,18 +18,11 @@ pub(super) enum Key<'a> { schema_version: &'a SchemaVersion, document: Document<'a>, }, - TrustedDocument { - client_name: &'a str, - document_id: &'a str, - }, - Apq { - ext: &'a PersistedQueryRequestExtension, - }, } pub(super) enum Document<'a> { - PersistedQueryExt(&'a PersistedQueryRequestExtension), - Id(&'a str), + AutomaticallyPersistedQuery(&'a PersistedQueryRequestExtension), + TrustedDocumentId { client_name: &'a str, doc_id: Cow<'a, str> }, Text(&'a str), } @@ -56,21 +49,24 @@ impl std::fmt::Display for Key<'_> { // operation name. hasher.update(&[0x00]); match document { - Document::PersistedQueryExt(ext) => { + Document::AutomaticallyPersistedQuery(ext) => { hasher.update(b"apq"); hasher.update(&[0x00]); hasher.update(&ext.version.to_ne_bytes()); hasher.update(&ext.sha256_hash); } - Document::Id(doc_id) => { + Document::TrustedDocumentId { client_name, doc_id } => { hasher.update(b"docid"); hasher.update(&[0x00]); + hasher.update(&client_name.len().to_ne_bytes()); + hasher.update(client_name.as_bytes()); + hasher.update(&doc_id.len().to_ne_bytes()); hasher.update(doc_id.as_bytes()); } - Document::Text(query) => { - hasher.update(b"query"); + Document::Text(document) => { + hasher.update(b"doc"); hasher.update(&[0x00]); - hasher.update(query.as_bytes()); + hasher.update(document.as_bytes()); } } let hash = hasher.finalize(); @@ -81,23 +77,6 @@ impl std::fmt::Display for Key<'_> { Base64Display::new(hash.as_bytes(), &URL_SAFE_NO_PAD) )) } - Key::TrustedDocument { - client_name, - document_id, - } => f.write_fmt(format_args!( - "{}.{}.{}", - namespaces::TRUSTED_DOCUMENT, - Base64Display::new(client_name.as_bytes(), &URL_SAFE_NO_PAD), - Base64Display::new(document_id.as_bytes(), &URL_SAFE_NO_PAD) - )), - Key::Apq { - ext: PersistedQueryRequestExtension { version, sha256_hash }, - } => f.write_fmt(format_args!( - "{}.{}.{}", - namespaces::APQ, - version, - Base64Display::new(sha256_hash, &URL_SAFE_NO_PAD) - )), } } } diff --git a/engine/crates/engine-v2/src/engine/runtime.rs b/engine/crates/engine-v2/src/engine/runtime.rs index 70fd0c1ac..fb8ab6845 100644 --- a/engine/crates/engine-v2/src/engine/runtime.rs +++ b/engine/crates/engine-v2/src/engine/runtime.rs @@ -4,14 +4,14 @@ use runtime::{entity_cache::EntityCache, fetch::Fetcher, kv::KvStore, rate_limit pub trait Runtime: Send + Sync + 'static { type Hooks: runtime::hooks::Hooks; - type CacheFactory: runtime::hot_cache::HotCacheFactory; + type OperationCacheFactory: runtime::operation_cache::OperationCacheFactory; fn fetcher(&self) -> &Fetcher; fn kv(&self) -> &KvStore; fn trusted_documents(&self) -> &runtime::trusted_documents_client::Client; fn meter(&self) -> &Meter; fn hooks(&self) -> &Self::Hooks; - fn cache_factory(&self) -> &Self::CacheFactory; + fn operation_cache_factory(&self) -> &Self::OperationCacheFactory; fn rate_limiter(&self) -> &RateLimiter; fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()>; fn entity_cache(&self) -> &dyn EntityCache; diff --git a/engine/crates/engine-v2/src/engine/trusted_documents.rs b/engine/crates/engine-v2/src/engine/trusted_documents.rs index ef8cdfea7..15eef455d 100644 --- a/engine/crates/engine-v2/src/engine/trusted_documents.rs +++ b/engine/crates/engine-v2/src/engine/trusted_documents.rs @@ -8,17 +8,17 @@ use crate::{ use engine::{PersistedQueryRequestExtension, Request}; use futures::{future::BoxFuture, FutureExt}; use grafbase_telemetry::grafbase_client::X_GRAFBASE_CLIENT_NAME; -use runtime::{hot_cache::HotCache, trusted_documents_client::TrustedDocumentsError}; +use runtime::trusted_documents_client::TrustedDocumentsError; use std::borrow::Cow; use tracing::instrument; use super::cache::{Document, Key}; -type PersistedQueryFuture<'a> = BoxFuture<'a, Result>; +type DocumentFuture<'a> = BoxFuture<'a, Result>; pub(crate) struct PreparedOperationDocument<'a> { pub cache_key: String, - pub document_fut: Option>, + pub document_fut: Option>, } impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { @@ -65,23 +65,47 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { Err(graphql_error) } } - (true, Some(ext), _) => Ok(PreparedOperationDocument { - cache_key: Key::Operation { - name, - schema_version, - document: Document::PersistedQueryExt(ext), - } - .to_string(), - document_fut: Some(self.handle_apollo_client_style_trusted_document_query(ext, client_name)?), - }), - (true, _, Some(document_id)) => Ok(PreparedOperationDocument { + // Apollo Client style trusted document query + (true, maybe_ext, maybe_doc_id) => { + let Some(client_name) = client_name else { + return Err(GraphqlError::new( + format!( + "Trusted document queries must include the {} header", + X_GRAFBASE_CLIENT_NAME.as_str() + ), + ErrorCode::TrustedDocumentError, + )); + }; + + let doc_id = if let Some(ext) = maybe_ext { + Cow::Owned(hex::encode(&ext.sha256_hash)) + } else if let Some(doc_id) = maybe_doc_id { + doc_id.into() + } else { + unreachable!() + }; + + Ok(PreparedOperationDocument { + cache_key: Key::Operation { + name, + schema_version, + document: Document::TrustedDocumentId { + client_name, + doc_id: doc_id.clone(), + }, + } + .to_string(), + document_fut: Some(self.handle_trusted_document_query(client_name, doc_id)?), + }) + } + (false, Some(ext), _) => Ok(PreparedOperationDocument { cache_key: Key::Operation { name, schema_version, - document: Document::Id(document_id), + document: Document::AutomaticallyPersistedQuery(ext), } .to_string(), - document_fut: Some(self.handle_trusted_document_query(document_id.into(), client_name)?), + document_fut: self.handle_apq(request, ext)?, }), (false, None, _) => Ok(PreparedOperationDocument { cache_key: Key::Operation { @@ -92,74 +116,20 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { .to_string(), document_fut: None, }), - (false, Some(ext), _) => Ok(PreparedOperationDocument { - cache_key: Key::Operation { - name, - schema_version, - document: Document::PersistedQueryExt(ext), - } - .to_string(), - document_fut: self.handle_apq(request, ext)?, - }), } } - fn handle_apollo_client_style_trusted_document_query<'r, 'f>( - &self, - ext: &'r PersistedQueryRequestExtension, - client_name: Option<&'ctx str>, - ) -> Result, GraphqlError> - where - 'r: 'f, - 'ctx: 'f, - { - use std::fmt::Write; - - let document_id = { - let mut id = String::with_capacity(ext.sha256_hash.len() * 2); - - for byte in &ext.sha256_hash { - write!(id, "{byte:02x}").expect("write to String to succeed"); - } - - id - }; - - self.handle_trusted_document_query(document_id.into(), client_name) - } - fn handle_trusted_document_query<'r, 'f>( &self, + client_name: &'ctx str, document_id: Cow<'r, str>, - client_name: Option<&'ctx str>, - ) -> Result, GraphqlError> + ) -> Result, GraphqlError> where 'r: 'f, 'ctx: 'f, { - let Some(client_name) = client_name else { - return Err(GraphqlError::new( - format!( - "Trusted document queries must include the {} header", - X_GRAFBASE_CLIENT_NAME.as_str() - ), - ErrorCode::TrustedDocumentError, - )); - }; - let engine = self.engine; let fut = async move { - let key = Key::TrustedDocument { - client_name, - document_id: &document_id, - } - .to_string(); - - // First try fetching the document from cache. - if let Some(document_text) = engine.trusted_documents_cache.get(&key).await { - return Ok(document_text); - } - match engine .runtime .trusted_documents() @@ -174,10 +144,7 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { format!("Unknown document id: '{document_id}'"), ErrorCode::TrustedDocumentError, )), - Ok(document_text) => { - engine.trusted_documents_cache.insert(key, document_text.clone()).await; - Ok(document_text) - } + Ok(document_text) => Ok(document_text), } } .boxed(); @@ -185,11 +152,13 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { } /// Handle a request using Automatic Persisted Queries. + /// We don't cache anything here, we only rely on the operation cache. We might want to use an + /// external cache for this one day, but not another in-memory cache. fn handle_apq<'r, 'f>( &mut self, request: &'r Request, ext: &'r PersistedQueryRequestExtension, - ) -> Result>, GraphqlError> + ) -> Result>, GraphqlError> where 'r: 'f, 'ctx: 'f, @@ -201,8 +170,6 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { )); } - let key = Key::Apq { ext }.to_string(); - if !request.query().is_empty() { use sha2::{Digest, Sha256}; let digest = ::digest(request.query().as_bytes()).to_vec(); @@ -212,25 +179,14 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { ErrorCode::PersistedQueryError, )); } - self.push_background_future( - self.engine - .trusted_documents_cache - .insert(key, request.query().to_string()) - .boxed(), - ); return Ok(None); } - let engine = self.engine; let fut = async move { - if let Some(query) = engine.trusted_documents_cache.get(&key).await { - Ok(query) - } else { - Err(GraphqlError::new( - "Persisted query not found", - ErrorCode::PersistedQueryNotFound, - )) - } + Err(GraphqlError::new( + "Persisted query not found", + ErrorCode::PersistedQueryNotFound, + )) } .boxed(); diff --git a/engine/crates/engine-v2/src/http_response/mod.rs b/engine/crates/engine-v2/src/http_response/mod.rs index a0b70546e..d19667d4b 100644 --- a/engine/crates/engine-v2/src/http_response/mod.rs +++ b/engine/crates/engine-v2/src/http_response/mod.rs @@ -12,17 +12,6 @@ use crate::response::{ErrorCode, Response}; pub struct HttpGraphqlResponse { pub headers: http::HeaderMap, pub body: HttpGraphqlResponseBody, - // TODO: Used to propagate this metadata to headers for our current analytics on Cloudflare. - // It should not be relied upon otherwise, doesn't work well for batch requests and will - // be removed once we also use otel for the managed version. - pub metadata: HttpGraphqlResponseExtraMetadata, -} - -#[derive(Default)] -pub struct HttpGraphqlResponseExtraMetadata { - pub operation_name: Option, - pub operation_type: Option<&'static str>, - pub has_errors: bool, } pub enum HttpGraphqlResponseBody { @@ -72,12 +61,8 @@ impl HttpGraphqlResponse { ) } - pub(crate) fn build( - response: Response, - format: Option, - metadata: HttpGraphqlResponseExtraMetadata, - ) -> Self { - let mut http_response = if let Some(format) = format { + pub(crate) fn build(response: Response, format: Option) -> Self { + if let Some(format) = format { Self::from_stream( format, response.status(), @@ -85,9 +70,7 @@ impl HttpGraphqlResponse { ) } else { Self::from_json(response.status(), &response) - }; - http_response.metadata = metadata; - http_response + } } pub(crate) fn from_stream( @@ -102,7 +85,6 @@ impl HttpGraphqlResponse { headers.typed_insert(status); Self { headers, - metadata: HttpGraphqlResponseExtraMetadata::default(), body: HttpGraphqlResponseBody::Stream(stream.map_ok(|bytes| bytes.into()).boxed()), } } @@ -168,7 +150,6 @@ impl HttpGraphqlResponse { headers.typed_insert(headers::ContentLength(bytes.len() as u64)); HttpGraphqlResponse { headers, - metadata: HttpGraphqlResponseExtraMetadata::default(), body: HttpGraphqlResponseBody::Bytes(bytes), } } diff --git a/engine/crates/integration-tests/src/federation/builder/bench.rs b/engine/crates/integration-tests/src/federation/builder/bench.rs index 9961dca0b..1b789e264 100644 --- a/engine/crates/integration-tests/src/federation/builder/bench.rs +++ b/engine/crates/integration-tests/src/federation/builder/bench.rs @@ -16,7 +16,7 @@ use runtime::{ fetch::{FetchError, FetchRequest, FetchResponse, FetchResult, GraphqlRequest}, hooks::DynamicHooks, }; -use runtime_local::InMemoryHotCacheFactory; +use runtime_local::InMemoryOperationCacheFactory; use crate::federation::{GraphqlResponse, GraphqlStreamingResponse}; @@ -51,7 +51,7 @@ impl<'a> DeterministicEngineBuilder<'a> { } pub fn without_hot_cache(mut self) -> Self { - self.runtime.hot_cache_factory = InMemoryHotCacheFactory::inactive(); + self.runtime.hot_cache_factory = InMemoryOperationCacheFactory::inactive(); self } diff --git a/engine/crates/integration-tests/src/federation/builder/test_runtime.rs b/engine/crates/integration-tests/src/federation/builder/test_runtime.rs index c1169b2aa..ef79e46b4 100644 --- a/engine/crates/integration-tests/src/federation/builder/test_runtime.rs +++ b/engine/crates/integration-tests/src/federation/builder/test_runtime.rs @@ -1,8 +1,8 @@ use grafbase_telemetry::{metrics, otel::opentelemetry}; use runtime::{entity_cache::EntityCache, hooks::DynamicHooks, trusted_documents_client}; use runtime_local::{ - rate_limiting::in_memory::key_based::InMemoryRateLimiter, InMemoryEntityCache, InMemoryHotCacheFactory, - InMemoryKvStore, NativeFetcher, + rate_limiting::in_memory::key_based::InMemoryRateLimiter, InMemoryEntityCache, InMemoryKvStore, + InMemoryOperationCacheFactory, NativeFetcher, }; use runtime_noop::trusted_documents::NoopTrustedDocuments; use tokio::sync::watch; @@ -11,7 +11,7 @@ pub struct TestRuntime { pub fetcher: runtime::fetch::Fetcher, pub trusted_documents: trusted_documents_client::Client, pub kv: runtime::kv::KvStore, - pub hot_cache_factory: InMemoryHotCacheFactory, + pub hot_cache_factory: InMemoryOperationCacheFactory, pub meter: opentelemetry::metrics::Meter, pub hooks: DynamicHooks, pub rate_limiter: runtime::rate_limiting::RateLimiter, @@ -37,7 +37,7 @@ impl Default for TestRuntime { impl engine_v2::Runtime for TestRuntime { type Hooks = DynamicHooks; - type CacheFactory = InMemoryHotCacheFactory; + type OperationCacheFactory = InMemoryOperationCacheFactory; fn fetcher(&self) -> &runtime::fetch::Fetcher { &self.fetcher @@ -59,7 +59,7 @@ impl engine_v2::Runtime for TestRuntime { &self.hooks } - fn cache_factory(&self) -> &Self::CacheFactory { + fn operation_cache_factory(&self) -> &Self::OperationCacheFactory { &self.hot_cache_factory } diff --git a/engine/crates/runtime-local/src/hot_cache.rs b/engine/crates/runtime-local/src/hot_cache.rs deleted file mode 100644 index 601e2b002..000000000 --- a/engine/crates/runtime-local/src/hot_cache.rs +++ /dev/null @@ -1,66 +0,0 @@ -use runtime::hot_cache::{CachedDataKind, HotCache, HotCacheFactory}; - -pub struct InMemoryHotCacheConfig { - pub limit: usize, -} - -pub struct InMemoryHotCacheFactory { - pub trusted_documents_config: InMemoryHotCacheConfig, - pub operation_config: InMemoryHotCacheConfig, -} - -impl InMemoryHotCacheFactory { - pub fn inactive() -> Self { - InMemoryHotCacheFactory { - trusted_documents_config: InMemoryHotCacheConfig { limit: 0 }, - operation_config: InMemoryHotCacheConfig { limit: 0 }, - } - } -} - -impl Default for InMemoryHotCacheFactory { - fn default() -> Self { - InMemoryHotCacheFactory { - trusted_documents_config: InMemoryHotCacheConfig { limit: 100 }, - operation_config: InMemoryHotCacheConfig { limit: 1000 }, - } - } -} - -impl HotCacheFactory for InMemoryHotCacheFactory { - type Cache = InMemoryHotCache - where - V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; - - async fn create(&self, kind: CachedDataKind) -> Self::Cache - where - V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, - { - let config = match kind { - CachedDataKind::TrustedDocument => &self.trusted_documents_config, - CachedDataKind::Operation => &self.operation_config, - }; - InMemoryHotCache { - inner: mini_moka::sync::Cache::builder() - .max_capacity(config.limit as u64) - .build(), - } - } -} - -pub struct InMemoryHotCache { - inner: mini_moka::sync::Cache, -} - -impl HotCache for InMemoryHotCache -where - V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, -{ - async fn insert(&self, key: String, value: V) { - self.inner.insert(key, value); - } - - async fn get(&self, key: &String) -> Option { - self.inner.get(key) - } -} diff --git a/engine/crates/runtime-local/src/lib.rs b/engine/crates/runtime-local/src/lib.rs index 2f9d4f27d..91ba1b8aa 100644 --- a/engine/crates/runtime-local/src/lib.rs +++ b/engine/crates/runtime-local/src/lib.rs @@ -4,9 +4,9 @@ mod entity_cache; mod fetch; #[cfg(feature = "wasi")] mod hooks; -mod hot_cache; mod kv; mod log; +mod operation_cache; mod pg; pub mod rate_limiting; #[cfg(feature = "redis")] @@ -19,8 +19,8 @@ pub use entity_cache::memory::InMemoryEntityCache; #[cfg(feature = "redis")] pub use entity_cache::redis::RedisEntityCache; pub use fetch::NativeFetcher; -pub use hot_cache::{InMemoryHotCache, InMemoryHotCacheFactory}; pub use kv::*; +pub use operation_cache::{InMemoryOperationCache, InMemoryOperationCacheFactory}; pub use pg::{LazyPgConnectionsPool, LocalPgTransportFactory}; pub use ufd_invoker::UdfInvokerImpl; diff --git a/engine/crates/runtime-local/src/operation_cache.rs b/engine/crates/runtime-local/src/operation_cache.rs new file mode 100644 index 000000000..3d0c5f98e --- /dev/null +++ b/engine/crates/runtime-local/src/operation_cache.rs @@ -0,0 +1,59 @@ +use runtime::operation_cache::{OperationCache, OperationCacheFactory}; + +pub struct InMemoryOperationCacheConfig { + pub limit: usize, +} + +pub struct InMemoryOperationCacheFactory { + pub config: InMemoryOperationCacheConfig, +} + +impl InMemoryOperationCacheFactory { + pub fn inactive() -> Self { + InMemoryOperationCacheFactory { + config: InMemoryOperationCacheConfig { limit: 0 }, + } + } +} + +impl Default for InMemoryOperationCacheFactory { + fn default() -> Self { + InMemoryOperationCacheFactory { + config: InMemoryOperationCacheConfig { limit: 1000 }, + } + } +} + +impl OperationCacheFactory for InMemoryOperationCacheFactory { + type Cache = InMemoryOperationCache + where + V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; + + async fn create(&self) -> Self::Cache + where + V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, + { + InMemoryOperationCache { + inner: mini_moka::sync::Cache::builder() + .max_capacity(self.config.limit as u64) + .build(), + } + } +} + +pub struct InMemoryOperationCache { + inner: mini_moka::sync::Cache, +} + +impl OperationCache for InMemoryOperationCache +where + V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, +{ + async fn insert(&self, key: String, value: V) { + self.inner.insert(key, value); + } + + async fn get(&self, key: &String) -> Option { + self.inner.get(key) + } +} diff --git a/engine/crates/runtime/src/lib.rs b/engine/crates/runtime/src/lib.rs index ef94c2125..67eb7aa8f 100644 --- a/engine/crates/runtime/src/lib.rs +++ b/engine/crates/runtime/src/lib.rs @@ -9,9 +9,9 @@ pub mod entity_cache; pub mod error; pub mod fetch; pub mod hooks; -pub mod hot_cache; pub mod kv; pub mod log; +pub mod operation_cache; pub mod pg; pub mod rate_limiting; pub mod trusted_documents_client; diff --git a/engine/crates/runtime/src/hot_cache.rs b/engine/crates/runtime/src/operation_cache.rs similarity index 77% rename from engine/crates/runtime/src/hot_cache.rs rename to engine/crates/runtime/src/operation_cache.rs index da2f0108f..b771ee246 100644 --- a/engine/crates/runtime/src/hot_cache.rs +++ b/engine/crates/runtime/src/operation_cache.rs @@ -1,19 +1,13 @@ use std::future::Future; -#[derive(strum::Display)] -pub enum CachedDataKind { - TrustedDocument, - Operation, -} - -pub trait HotCacheFactory: Send + Sync + 'static { - type Cache: HotCache +pub trait OperationCacheFactory: Send + Sync + 'static { + type Cache: OperationCache where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; /// A new instance provides a convenient interface on how values are handled. Keys /// still live in the same namespace and MUST be unique. - fn create(&self, kind: CachedDataKind) -> impl Future> + Send + fn create(&self) -> impl Future> + Send where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; } @@ -26,9 +20,8 @@ pub trait HotCacheFactory: Send + Sync + 'static { /// - values are immutable for a given key /// - values are serialize-able with postcard /// - keys are URL-safe strings: ALPHA DIGIT "-" / "." / "_" / "~" -/// - keys will be unique across all instances of HotCache /// -pub trait HotCache: Send + Sync + 'static +pub trait OperationCache: Send + Sync + 'static where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, { @@ -41,19 +34,19 @@ where // ---------------------------// // -- No-op implementation -- // // ---------------------------// -impl HotCacheFactory for () { +impl OperationCacheFactory for () { type Cache = () where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; - async fn create(&self, _: CachedDataKind) -> Self::Cache + async fn create(&self) -> Self::Cache where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, { } } -impl HotCache for () +impl OperationCache for () where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, { diff --git a/gateway/crates/federated-server/src/server/gateway.rs b/gateway/crates/federated-server/src/server/gateway.rs index e186e892b..aeb522a67 100644 --- a/gateway/crates/federated-server/src/server/gateway.rs +++ b/gateway/crates/federated-server/src/server/gateway.rs @@ -148,7 +148,7 @@ pub struct GatewayRuntime { impl engine_v2::Runtime for GatewayRuntime { type Hooks = HooksWasi; - type CacheFactory = (); + type OperationCacheFactory = (); fn fetcher(&self) -> &runtime::fetch::Fetcher { &self.fetcher @@ -165,7 +165,7 @@ impl engine_v2::Runtime for GatewayRuntime { fn hooks(&self) -> &HooksWasi { &self.hooks } - fn cache_factory(&self) -> &Self::CacheFactory { + fn operation_cache_factory(&self) -> &Self::OperationCacheFactory { &() }