From f8e5500aae5be74900927474da1890b5778d319c Mon Sep 17 00:00:00 2001 From: Benjamin Rabier Date: Tue, 6 Aug 2024 15:21:24 +0200 Subject: [PATCH 1/4] chore(engine-v2): Remove now useless response metadata & simplify caching Now that operation checks rely on ClickHouse, no need for response metadata anymore. Also removed the trusted document cache, the operation cache already fills that role and having two in-memory caches means twice the monitoring. --- .../federated-dev/src/dev/gateway_nanny.rs | 4 +- engine/crates/engine-v2/src/engine.rs | 37 ++--- engine/crates/engine-v2/src/engine/cache.rs | 45 ++---- engine/crates/engine-v2/src/engine/runtime.rs | 4 +- .../engine-v2/src/engine/trusted_documents.rs | 142 ++++++------------ .../crates/engine-v2/src/http_response/mod.rs | 25 +-- .../src/federation/builder/bench.rs | 4 +- .../src/federation/builder/test_runtime.rs | 10 +- engine/crates/runtime-local/src/hot_cache.rs | 66 -------- engine/crates/runtime-local/src/lib.rs | 4 +- .../runtime-local/src/operation_cache.rs | 59 ++++++++ engine/crates/runtime/src/lib.rs | 2 +- .../src/{hot_cache.rs => operation_cache.rs} | 21 +-- .../federated-server/src/server/gateway.rs | 4 +- 14 files changed, 157 insertions(+), 270 deletions(-) delete mode 100644 engine/crates/runtime-local/src/hot_cache.rs create mode 100644 engine/crates/runtime-local/src/operation_cache.rs rename engine/crates/runtime/src/{hot_cache.rs => operation_cache.rs} (77%) diff --git a/cli/crates/federated-dev/src/dev/gateway_nanny.rs b/cli/crates/federated-dev/src/dev/gateway_nanny.rs index a5ae9a185..4d49e3624 100644 --- a/cli/crates/federated-dev/src/dev/gateway_nanny.rs +++ b/cli/crates/federated-dev/src/dev/gateway_nanny.rs @@ -108,7 +108,7 @@ pub struct CliRuntime { impl engine_v2::Runtime for CliRuntime { type Hooks = (); - type CacheFactory = (); + type OperationCacheFactory = (); fn fetcher(&self) -> &runtime::fetch::Fetcher { &self.fetcher @@ -130,7 +130,7 @@ impl engine_v2::Runtime for CliRuntime { &() } - fn cache_factory(&self) -> &() { + fn operation_cache_factory(&self) -> &() { &() } diff --git a/engine/crates/engine-v2/src/engine.rs b/engine/crates/engine-v2/src/engine.rs index 02f846a62..45ef724ab 100644 --- a/engine/crates/engine-v2/src/engine.rs +++ b/engine/crates/engine-v2/src/engine.rs @@ -1,7 +1,7 @@ use ::runtime::{ auth::AccessToken, hooks::Hooks, - hot_cache::{CachedDataKind, HotCache, HotCacheFactory}, + operation_cache::{OperationCache, OperationCacheFactory}, rate_limiting::RateLimitKey, }; use async_runtime::stream::StreamExt as _; @@ -27,7 +27,7 @@ use web_time::Instant; use crate::{ execution::{ExecutableOperation, PreExecutionContext}, - http_response::{HttpGraphqlResponse, HttpGraphqlResponseExtraMetadata}, + http_response::HttpGraphqlResponse, operation::{Operation, PreparedOperation, Variables}, response::{ErrorCode, GraphqlError, Response}, websocket, @@ -59,8 +59,7 @@ pub struct Engine { operation_metrics: GraphqlOperationMetrics, auth: AuthService, retry_budgets: RetryBudgets, - trusted_documents_cache: ::Cache, - operation_cache: ::Cache>, + operation_cache: ::Cache>, } impl Engine { @@ -90,8 +89,7 @@ impl Engine { auth, retry_budgets: RetryBudgets::build(&schema), operation_metrics: GraphqlOperationMetrics::build(runtime.meter()), - trusted_documents_cache: runtime.cache_factory().create(CachedDataKind::TrustedDocument).await, - operation_cache: runtime.cache_factory().create(CachedDataKind::Operation).await, + operation_cache: runtime.operation_cache_factory().create().await, schema, runtime, } @@ -107,14 +105,13 @@ impl Engine { let format = headers.typed_get::(); let request_context = match self.create_request_context(headers).await { Ok(context) => context, - Err(response) => return HttpGraphqlResponse::build(response, format, Default::default()), + Err(response) => return HttpGraphqlResponse::build(response, format), }; if let Err(err) = self.runtime.rate_limiter().limit(&RateLimitKey::Global).await { return HttpGraphqlResponse::build( Response::pre_execution_error(GraphqlError::new(err.to_string(), ErrorCode::RateLimited)), format, - Default::default(), ); } @@ -128,7 +125,6 @@ impl Engine { HttpGraphqlResponse::build( Response::execution_error(GraphqlError::new("Gateway timeout", ErrorCode::GatewayTimeout)), format, - Default::default(), ) } .boxed(), @@ -239,22 +235,11 @@ impl Engine { let (operation_metrics_attributes, response) = ctx.execute_single(request).await; let status = response.status(); - let mut response_metadata = HttpGraphqlResponseExtraMetadata { - operation_name: None, - operation_type: None, - has_errors: !status.is_success(), - }; - let elapsed = start.elapsed(); if let Some(operation_metrics_attributes) = operation_metrics_attributes { tracing::Span::current().record_gql_request((&operation_metrics_attributes).into()); - response_metadata - .operation_name - .clone_from(&operation_metrics_attributes.name); - response_metadata.operation_type = Some(operation_metrics_attributes.ty.as_str()); - self.operation_metrics.record( GraphqlRequestMetricsAttributes { operation: operation_metrics_attributes, @@ -279,7 +264,7 @@ impl Engine { tracing::debug!(target: GRAFBASE_TARGET, "{message}") } - HttpGraphqlResponse::build(response, None, response_metadata) + HttpGraphqlResponse::build(response, None) } .instrument(span) .await @@ -428,9 +413,9 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { if let Some(operation) = self.operation_cache.get(&cache_key).await { Ok(operation) - } else if let Some(persisted_query) = document_fut { - match persisted_query.await { - Ok(query) => Err((cache_key, Some(query))), + } else if let Some(document_fut) = document_fut { + match document_fut.await { + Ok(document) => Err((cache_key, Some(document))), Err(err) => return Err((None, Response::pre_execution_error(err))), } } else { @@ -440,8 +425,8 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { let operation = match result { Ok(operation) => operation, - Err((cache_key, query)) => { - if let Some(query) = query { + Err((cache_key, document)) => { + if let Some(query) = document { request.query = query } let operation = Operation::build(&self.schema, &request) diff --git a/engine/crates/engine-v2/src/engine/cache.rs b/engine/crates/engine-v2/src/engine/cache.rs index 1378b86ee..807cabea0 100644 --- a/engine/crates/engine-v2/src/engine/cache.rs +++ b/engine/crates/engine-v2/src/engine/cache.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD}; use engine::PersistedQueryRequestExtension; use schema::Schema; @@ -6,8 +8,6 @@ use super::SchemaVersion; mod namespaces { pub const OPERATION: &str = "op"; - pub const TRUSTED_DOCUMENT: &str = "tdoc"; - pub const APQ: &str = "apq"; } /// Unique cache key that generates a URL-safe string. @@ -18,18 +18,11 @@ pub(super) enum Key<'a> { schema_version: &'a SchemaVersion, document: Document<'a>, }, - TrustedDocument { - client_name: &'a str, - document_id: &'a str, - }, - Apq { - ext: &'a PersistedQueryRequestExtension, - }, } pub(super) enum Document<'a> { - PersistedQueryExt(&'a PersistedQueryRequestExtension), - Id(&'a str), + AutomaticallyPersistedQuery(&'a PersistedQueryRequestExtension), + TrustedDocumentId { client_name: &'a str, doc_id: Cow<'a, str> }, Text(&'a str), } @@ -56,21 +49,24 @@ impl std::fmt::Display for Key<'_> { // operation name. hasher.update(&[0x00]); match document { - Document::PersistedQueryExt(ext) => { + Document::AutomaticallyPersistedQuery(ext) => { hasher.update(b"apq"); hasher.update(&[0x00]); hasher.update(&ext.version.to_ne_bytes()); hasher.update(&ext.sha256_hash); } - Document::Id(doc_id) => { + Document::TrustedDocumentId { client_name, doc_id } => { hasher.update(b"docid"); hasher.update(&[0x00]); + hasher.update(&client_name.len().to_ne_bytes()); + hasher.update(client_name.as_bytes()); + hasher.update(&doc_id.len().to_ne_bytes()); hasher.update(doc_id.as_bytes()); } - Document::Text(query) => { - hasher.update(b"query"); + Document::Text(document) => { + hasher.update(b"doc"); hasher.update(&[0x00]); - hasher.update(query.as_bytes()); + hasher.update(document.as_bytes()); } } let hash = hasher.finalize(); @@ -81,23 +77,6 @@ impl std::fmt::Display for Key<'_> { Base64Display::new(hash.as_bytes(), &URL_SAFE_NO_PAD) )) } - Key::TrustedDocument { - client_name, - document_id, - } => f.write_fmt(format_args!( - "{}.{}.{}", - namespaces::TRUSTED_DOCUMENT, - Base64Display::new(client_name.as_bytes(), &URL_SAFE_NO_PAD), - Base64Display::new(document_id.as_bytes(), &URL_SAFE_NO_PAD) - )), - Key::Apq { - ext: PersistedQueryRequestExtension { version, sha256_hash }, - } => f.write_fmt(format_args!( - "{}.{}.{}", - namespaces::APQ, - version, - Base64Display::new(sha256_hash, &URL_SAFE_NO_PAD) - )), } } } diff --git a/engine/crates/engine-v2/src/engine/runtime.rs b/engine/crates/engine-v2/src/engine/runtime.rs index 70fd0c1ac..fb8ab6845 100644 --- a/engine/crates/engine-v2/src/engine/runtime.rs +++ b/engine/crates/engine-v2/src/engine/runtime.rs @@ -4,14 +4,14 @@ use runtime::{entity_cache::EntityCache, fetch::Fetcher, kv::KvStore, rate_limit pub trait Runtime: Send + Sync + 'static { type Hooks: runtime::hooks::Hooks; - type CacheFactory: runtime::hot_cache::HotCacheFactory; + type OperationCacheFactory: runtime::operation_cache::OperationCacheFactory; fn fetcher(&self) -> &Fetcher; fn kv(&self) -> &KvStore; fn trusted_documents(&self) -> &runtime::trusted_documents_client::Client; fn meter(&self) -> &Meter; fn hooks(&self) -> &Self::Hooks; - fn cache_factory(&self) -> &Self::CacheFactory; + fn operation_cache_factory(&self) -> &Self::OperationCacheFactory; fn rate_limiter(&self) -> &RateLimiter; fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()>; fn entity_cache(&self) -> &dyn EntityCache; diff --git a/engine/crates/engine-v2/src/engine/trusted_documents.rs b/engine/crates/engine-v2/src/engine/trusted_documents.rs index ef8cdfea7..15eef455d 100644 --- a/engine/crates/engine-v2/src/engine/trusted_documents.rs +++ b/engine/crates/engine-v2/src/engine/trusted_documents.rs @@ -8,17 +8,17 @@ use crate::{ use engine::{PersistedQueryRequestExtension, Request}; use futures::{future::BoxFuture, FutureExt}; use grafbase_telemetry::grafbase_client::X_GRAFBASE_CLIENT_NAME; -use runtime::{hot_cache::HotCache, trusted_documents_client::TrustedDocumentsError}; +use runtime::trusted_documents_client::TrustedDocumentsError; use std::borrow::Cow; use tracing::instrument; use super::cache::{Document, Key}; -type PersistedQueryFuture<'a> = BoxFuture<'a, Result>; +type DocumentFuture<'a> = BoxFuture<'a, Result>; pub(crate) struct PreparedOperationDocument<'a> { pub cache_key: String, - pub document_fut: Option>, + pub document_fut: Option>, } impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { @@ -65,23 +65,47 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { Err(graphql_error) } } - (true, Some(ext), _) => Ok(PreparedOperationDocument { - cache_key: Key::Operation { - name, - schema_version, - document: Document::PersistedQueryExt(ext), - } - .to_string(), - document_fut: Some(self.handle_apollo_client_style_trusted_document_query(ext, client_name)?), - }), - (true, _, Some(document_id)) => Ok(PreparedOperationDocument { + // Apollo Client style trusted document query + (true, maybe_ext, maybe_doc_id) => { + let Some(client_name) = client_name else { + return Err(GraphqlError::new( + format!( + "Trusted document queries must include the {} header", + X_GRAFBASE_CLIENT_NAME.as_str() + ), + ErrorCode::TrustedDocumentError, + )); + }; + + let doc_id = if let Some(ext) = maybe_ext { + Cow::Owned(hex::encode(&ext.sha256_hash)) + } else if let Some(doc_id) = maybe_doc_id { + doc_id.into() + } else { + unreachable!() + }; + + Ok(PreparedOperationDocument { + cache_key: Key::Operation { + name, + schema_version, + document: Document::TrustedDocumentId { + client_name, + doc_id: doc_id.clone(), + }, + } + .to_string(), + document_fut: Some(self.handle_trusted_document_query(client_name, doc_id)?), + }) + } + (false, Some(ext), _) => Ok(PreparedOperationDocument { cache_key: Key::Operation { name, schema_version, - document: Document::Id(document_id), + document: Document::AutomaticallyPersistedQuery(ext), } .to_string(), - document_fut: Some(self.handle_trusted_document_query(document_id.into(), client_name)?), + document_fut: self.handle_apq(request, ext)?, }), (false, None, _) => Ok(PreparedOperationDocument { cache_key: Key::Operation { @@ -92,74 +116,20 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { .to_string(), document_fut: None, }), - (false, Some(ext), _) => Ok(PreparedOperationDocument { - cache_key: Key::Operation { - name, - schema_version, - document: Document::PersistedQueryExt(ext), - } - .to_string(), - document_fut: self.handle_apq(request, ext)?, - }), } } - fn handle_apollo_client_style_trusted_document_query<'r, 'f>( - &self, - ext: &'r PersistedQueryRequestExtension, - client_name: Option<&'ctx str>, - ) -> Result, GraphqlError> - where - 'r: 'f, - 'ctx: 'f, - { - use std::fmt::Write; - - let document_id = { - let mut id = String::with_capacity(ext.sha256_hash.len() * 2); - - for byte in &ext.sha256_hash { - write!(id, "{byte:02x}").expect("write to String to succeed"); - } - - id - }; - - self.handle_trusted_document_query(document_id.into(), client_name) - } - fn handle_trusted_document_query<'r, 'f>( &self, + client_name: &'ctx str, document_id: Cow<'r, str>, - client_name: Option<&'ctx str>, - ) -> Result, GraphqlError> + ) -> Result, GraphqlError> where 'r: 'f, 'ctx: 'f, { - let Some(client_name) = client_name else { - return Err(GraphqlError::new( - format!( - "Trusted document queries must include the {} header", - X_GRAFBASE_CLIENT_NAME.as_str() - ), - ErrorCode::TrustedDocumentError, - )); - }; - let engine = self.engine; let fut = async move { - let key = Key::TrustedDocument { - client_name, - document_id: &document_id, - } - .to_string(); - - // First try fetching the document from cache. - if let Some(document_text) = engine.trusted_documents_cache.get(&key).await { - return Ok(document_text); - } - match engine .runtime .trusted_documents() @@ -174,10 +144,7 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { format!("Unknown document id: '{document_id}'"), ErrorCode::TrustedDocumentError, )), - Ok(document_text) => { - engine.trusted_documents_cache.insert(key, document_text.clone()).await; - Ok(document_text) - } + Ok(document_text) => Ok(document_text), } } .boxed(); @@ -185,11 +152,13 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { } /// Handle a request using Automatic Persisted Queries. + /// We don't cache anything here, we only rely on the operation cache. We might want to use an + /// external cache for this one day, but not another in-memory cache. fn handle_apq<'r, 'f>( &mut self, request: &'r Request, ext: &'r PersistedQueryRequestExtension, - ) -> Result>, GraphqlError> + ) -> Result>, GraphqlError> where 'r: 'f, 'ctx: 'f, @@ -201,8 +170,6 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { )); } - let key = Key::Apq { ext }.to_string(); - if !request.query().is_empty() { use sha2::{Digest, Sha256}; let digest = ::digest(request.query().as_bytes()).to_vec(); @@ -212,25 +179,14 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> { ErrorCode::PersistedQueryError, )); } - self.push_background_future( - self.engine - .trusted_documents_cache - .insert(key, request.query().to_string()) - .boxed(), - ); return Ok(None); } - let engine = self.engine; let fut = async move { - if let Some(query) = engine.trusted_documents_cache.get(&key).await { - Ok(query) - } else { - Err(GraphqlError::new( - "Persisted query not found", - ErrorCode::PersistedQueryNotFound, - )) - } + Err(GraphqlError::new( + "Persisted query not found", + ErrorCode::PersistedQueryNotFound, + )) } .boxed(); diff --git a/engine/crates/engine-v2/src/http_response/mod.rs b/engine/crates/engine-v2/src/http_response/mod.rs index a0b70546e..d19667d4b 100644 --- a/engine/crates/engine-v2/src/http_response/mod.rs +++ b/engine/crates/engine-v2/src/http_response/mod.rs @@ -12,17 +12,6 @@ use crate::response::{ErrorCode, Response}; pub struct HttpGraphqlResponse { pub headers: http::HeaderMap, pub body: HttpGraphqlResponseBody, - // TODO: Used to propagate this metadata to headers for our current analytics on Cloudflare. - // It should not be relied upon otherwise, doesn't work well for batch requests and will - // be removed once we also use otel for the managed version. - pub metadata: HttpGraphqlResponseExtraMetadata, -} - -#[derive(Default)] -pub struct HttpGraphqlResponseExtraMetadata { - pub operation_name: Option, - pub operation_type: Option<&'static str>, - pub has_errors: bool, } pub enum HttpGraphqlResponseBody { @@ -72,12 +61,8 @@ impl HttpGraphqlResponse { ) } - pub(crate) fn build( - response: Response, - format: Option, - metadata: HttpGraphqlResponseExtraMetadata, - ) -> Self { - let mut http_response = if let Some(format) = format { + pub(crate) fn build(response: Response, format: Option) -> Self { + if let Some(format) = format { Self::from_stream( format, response.status(), @@ -85,9 +70,7 @@ impl HttpGraphqlResponse { ) } else { Self::from_json(response.status(), &response) - }; - http_response.metadata = metadata; - http_response + } } pub(crate) fn from_stream( @@ -102,7 +85,6 @@ impl HttpGraphqlResponse { headers.typed_insert(status); Self { headers, - metadata: HttpGraphqlResponseExtraMetadata::default(), body: HttpGraphqlResponseBody::Stream(stream.map_ok(|bytes| bytes.into()).boxed()), } } @@ -168,7 +150,6 @@ impl HttpGraphqlResponse { headers.typed_insert(headers::ContentLength(bytes.len() as u64)); HttpGraphqlResponse { headers, - metadata: HttpGraphqlResponseExtraMetadata::default(), body: HttpGraphqlResponseBody::Bytes(bytes), } } diff --git a/engine/crates/integration-tests/src/federation/builder/bench.rs b/engine/crates/integration-tests/src/federation/builder/bench.rs index 9961dca0b..1b789e264 100644 --- a/engine/crates/integration-tests/src/federation/builder/bench.rs +++ b/engine/crates/integration-tests/src/federation/builder/bench.rs @@ -16,7 +16,7 @@ use runtime::{ fetch::{FetchError, FetchRequest, FetchResponse, FetchResult, GraphqlRequest}, hooks::DynamicHooks, }; -use runtime_local::InMemoryHotCacheFactory; +use runtime_local::InMemoryOperationCacheFactory; use crate::federation::{GraphqlResponse, GraphqlStreamingResponse}; @@ -51,7 +51,7 @@ impl<'a> DeterministicEngineBuilder<'a> { } pub fn without_hot_cache(mut self) -> Self { - self.runtime.hot_cache_factory = InMemoryHotCacheFactory::inactive(); + self.runtime.hot_cache_factory = InMemoryOperationCacheFactory::inactive(); self } diff --git a/engine/crates/integration-tests/src/federation/builder/test_runtime.rs b/engine/crates/integration-tests/src/federation/builder/test_runtime.rs index c1169b2aa..ef79e46b4 100644 --- a/engine/crates/integration-tests/src/federation/builder/test_runtime.rs +++ b/engine/crates/integration-tests/src/federation/builder/test_runtime.rs @@ -1,8 +1,8 @@ use grafbase_telemetry::{metrics, otel::opentelemetry}; use runtime::{entity_cache::EntityCache, hooks::DynamicHooks, trusted_documents_client}; use runtime_local::{ - rate_limiting::in_memory::key_based::InMemoryRateLimiter, InMemoryEntityCache, InMemoryHotCacheFactory, - InMemoryKvStore, NativeFetcher, + rate_limiting::in_memory::key_based::InMemoryRateLimiter, InMemoryEntityCache, InMemoryKvStore, + InMemoryOperationCacheFactory, NativeFetcher, }; use runtime_noop::trusted_documents::NoopTrustedDocuments; use tokio::sync::watch; @@ -11,7 +11,7 @@ pub struct TestRuntime { pub fetcher: runtime::fetch::Fetcher, pub trusted_documents: trusted_documents_client::Client, pub kv: runtime::kv::KvStore, - pub hot_cache_factory: InMemoryHotCacheFactory, + pub hot_cache_factory: InMemoryOperationCacheFactory, pub meter: opentelemetry::metrics::Meter, pub hooks: DynamicHooks, pub rate_limiter: runtime::rate_limiting::RateLimiter, @@ -37,7 +37,7 @@ impl Default for TestRuntime { impl engine_v2::Runtime for TestRuntime { type Hooks = DynamicHooks; - type CacheFactory = InMemoryHotCacheFactory; + type OperationCacheFactory = InMemoryOperationCacheFactory; fn fetcher(&self) -> &runtime::fetch::Fetcher { &self.fetcher @@ -59,7 +59,7 @@ impl engine_v2::Runtime for TestRuntime { &self.hooks } - fn cache_factory(&self) -> &Self::CacheFactory { + fn operation_cache_factory(&self) -> &Self::OperationCacheFactory { &self.hot_cache_factory } diff --git a/engine/crates/runtime-local/src/hot_cache.rs b/engine/crates/runtime-local/src/hot_cache.rs deleted file mode 100644 index 601e2b002..000000000 --- a/engine/crates/runtime-local/src/hot_cache.rs +++ /dev/null @@ -1,66 +0,0 @@ -use runtime::hot_cache::{CachedDataKind, HotCache, HotCacheFactory}; - -pub struct InMemoryHotCacheConfig { - pub limit: usize, -} - -pub struct InMemoryHotCacheFactory { - pub trusted_documents_config: InMemoryHotCacheConfig, - pub operation_config: InMemoryHotCacheConfig, -} - -impl InMemoryHotCacheFactory { - pub fn inactive() -> Self { - InMemoryHotCacheFactory { - trusted_documents_config: InMemoryHotCacheConfig { limit: 0 }, - operation_config: InMemoryHotCacheConfig { limit: 0 }, - } - } -} - -impl Default for InMemoryHotCacheFactory { - fn default() -> Self { - InMemoryHotCacheFactory { - trusted_documents_config: InMemoryHotCacheConfig { limit: 100 }, - operation_config: InMemoryHotCacheConfig { limit: 1000 }, - } - } -} - -impl HotCacheFactory for InMemoryHotCacheFactory { - type Cache = InMemoryHotCache - where - V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; - - async fn create(&self, kind: CachedDataKind) -> Self::Cache - where - V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, - { - let config = match kind { - CachedDataKind::TrustedDocument => &self.trusted_documents_config, - CachedDataKind::Operation => &self.operation_config, - }; - InMemoryHotCache { - inner: mini_moka::sync::Cache::builder() - .max_capacity(config.limit as u64) - .build(), - } - } -} - -pub struct InMemoryHotCache { - inner: mini_moka::sync::Cache, -} - -impl HotCache for InMemoryHotCache -where - V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, -{ - async fn insert(&self, key: String, value: V) { - self.inner.insert(key, value); - } - - async fn get(&self, key: &String) -> Option { - self.inner.get(key) - } -} diff --git a/engine/crates/runtime-local/src/lib.rs b/engine/crates/runtime-local/src/lib.rs index 2f9d4f27d..91ba1b8aa 100644 --- a/engine/crates/runtime-local/src/lib.rs +++ b/engine/crates/runtime-local/src/lib.rs @@ -4,9 +4,9 @@ mod entity_cache; mod fetch; #[cfg(feature = "wasi")] mod hooks; -mod hot_cache; mod kv; mod log; +mod operation_cache; mod pg; pub mod rate_limiting; #[cfg(feature = "redis")] @@ -19,8 +19,8 @@ pub use entity_cache::memory::InMemoryEntityCache; #[cfg(feature = "redis")] pub use entity_cache::redis::RedisEntityCache; pub use fetch::NativeFetcher; -pub use hot_cache::{InMemoryHotCache, InMemoryHotCacheFactory}; pub use kv::*; +pub use operation_cache::{InMemoryOperationCache, InMemoryOperationCacheFactory}; pub use pg::{LazyPgConnectionsPool, LocalPgTransportFactory}; pub use ufd_invoker::UdfInvokerImpl; diff --git a/engine/crates/runtime-local/src/operation_cache.rs b/engine/crates/runtime-local/src/operation_cache.rs new file mode 100644 index 000000000..3d0c5f98e --- /dev/null +++ b/engine/crates/runtime-local/src/operation_cache.rs @@ -0,0 +1,59 @@ +use runtime::operation_cache::{OperationCache, OperationCacheFactory}; + +pub struct InMemoryOperationCacheConfig { + pub limit: usize, +} + +pub struct InMemoryOperationCacheFactory { + pub config: InMemoryOperationCacheConfig, +} + +impl InMemoryOperationCacheFactory { + pub fn inactive() -> Self { + InMemoryOperationCacheFactory { + config: InMemoryOperationCacheConfig { limit: 0 }, + } + } +} + +impl Default for InMemoryOperationCacheFactory { + fn default() -> Self { + InMemoryOperationCacheFactory { + config: InMemoryOperationCacheConfig { limit: 1000 }, + } + } +} + +impl OperationCacheFactory for InMemoryOperationCacheFactory { + type Cache = InMemoryOperationCache + where + V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; + + async fn create(&self) -> Self::Cache + where + V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, + { + InMemoryOperationCache { + inner: mini_moka::sync::Cache::builder() + .max_capacity(self.config.limit as u64) + .build(), + } + } +} + +pub struct InMemoryOperationCache { + inner: mini_moka::sync::Cache, +} + +impl OperationCache for InMemoryOperationCache +where + V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, +{ + async fn insert(&self, key: String, value: V) { + self.inner.insert(key, value); + } + + async fn get(&self, key: &String) -> Option { + self.inner.get(key) + } +} diff --git a/engine/crates/runtime/src/lib.rs b/engine/crates/runtime/src/lib.rs index ef94c2125..67eb7aa8f 100644 --- a/engine/crates/runtime/src/lib.rs +++ b/engine/crates/runtime/src/lib.rs @@ -9,9 +9,9 @@ pub mod entity_cache; pub mod error; pub mod fetch; pub mod hooks; -pub mod hot_cache; pub mod kv; pub mod log; +pub mod operation_cache; pub mod pg; pub mod rate_limiting; pub mod trusted_documents_client; diff --git a/engine/crates/runtime/src/hot_cache.rs b/engine/crates/runtime/src/operation_cache.rs similarity index 77% rename from engine/crates/runtime/src/hot_cache.rs rename to engine/crates/runtime/src/operation_cache.rs index da2f0108f..b771ee246 100644 --- a/engine/crates/runtime/src/hot_cache.rs +++ b/engine/crates/runtime/src/operation_cache.rs @@ -1,19 +1,13 @@ use std::future::Future; -#[derive(strum::Display)] -pub enum CachedDataKind { - TrustedDocument, - Operation, -} - -pub trait HotCacheFactory: Send + Sync + 'static { - type Cache: HotCache +pub trait OperationCacheFactory: Send + Sync + 'static { + type Cache: OperationCache where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; /// A new instance provides a convenient interface on how values are handled. Keys /// still live in the same namespace and MUST be unique. - fn create(&self, kind: CachedDataKind) -> impl Future> + Send + fn create(&self) -> impl Future> + Send where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; } @@ -26,9 +20,8 @@ pub trait HotCacheFactory: Send + Sync + 'static { /// - values are immutable for a given key /// - values are serialize-able with postcard /// - keys are URL-safe strings: ALPHA DIGIT "-" / "." / "_" / "~" -/// - keys will be unique across all instances of HotCache /// -pub trait HotCache: Send + Sync + 'static +pub trait OperationCache: Send + Sync + 'static where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, { @@ -41,19 +34,19 @@ where // ---------------------------// // -- No-op implementation -- // // ---------------------------// -impl HotCacheFactory for () { +impl OperationCacheFactory for () { type Cache = () where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned; - async fn create(&self, _: CachedDataKind) -> Self::Cache + async fn create(&self) -> Self::Cache where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, { } } -impl HotCache for () +impl OperationCache for () where V: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned, { diff --git a/gateway/crates/federated-server/src/server/gateway.rs b/gateway/crates/federated-server/src/server/gateway.rs index e186e892b..aeb522a67 100644 --- a/gateway/crates/federated-server/src/server/gateway.rs +++ b/gateway/crates/federated-server/src/server/gateway.rs @@ -148,7 +148,7 @@ pub struct GatewayRuntime { impl engine_v2::Runtime for GatewayRuntime { type Hooks = HooksWasi; - type CacheFactory = (); + type OperationCacheFactory = (); fn fetcher(&self) -> &runtime::fetch::Fetcher { &self.fetcher @@ -165,7 +165,7 @@ impl engine_v2::Runtime for GatewayRuntime { fn hooks(&self) -> &HooksWasi { &self.hooks } - fn cache_factory(&self) -> &Self::CacheFactory { + fn operation_cache_factory(&self) -> &Self::OperationCacheFactory { &() } From 055e30f73930d926456ed56db4aa21e063d2de94 Mon Sep 17 00:00:00 2001 From: hackal Date: Wed, 7 Aug 2024 11:27:39 +0200 Subject: [PATCH 2/4] add callstack config files --- .callstack.yml | 39 ++++++++++++++++++++++++ .github/workflows/callstack-reviewer.yml | 28 +++++++++++++++++ 2 files changed, 67 insertions(+) create mode 100644 .callstack.yml create mode 100644 .github/workflows/callstack-reviewer.yml diff --git a/.callstack.yml b/.callstack.yml new file mode 100644 index 000000000..5e25f11d6 --- /dev/null +++ b/.callstack.yml @@ -0,0 +1,39 @@ +pr_review: + # Default: true + auto_run: true + modules: + # Automatically create a description summarizing the changes in pull request. + description: + enabled: true + diagram: false + + # Find potential bugs in pull request changes or related files. + bug_hunter: + enabled: true + # Include fixes to possible bugs. + suggestions: true + + # Suggest improvements to added code. + code_suggestions: + enabled: true + + # Suggest changes to follow defined code conventions. + code_conventions: + enabled: false + # Describe your code conventions in plain text. + conventions: | + E.g. Exported variables, functions, classes and methods should be defined before private. + + + # Point out any typos or grammatical errors in variable names, texts, comments. + grammar: + enabled: false + + # Suggest performance improvements to added code. + performance: + enabled: true + + # Find potential security issues in added code. + security: + enabled: true + diff --git a/.github/workflows/callstack-reviewer.yml b/.github/workflows/callstack-reviewer.yml new file mode 100644 index 000000000..3452b7c3f --- /dev/null +++ b/.github/workflows/callstack-reviewer.yml @@ -0,0 +1,28 @@ +name: Callstack.ai PR Review + +on: + workflow_dispatch: + inputs: + config: + type: string + description: "config for reviewer" + required: true + head: + type: string + description: "head commit sha" + required: true + base: + type: string + description: "base commit sha" + required: false + +jobs: + callstack_pr_review_job: + runs-on: ubuntu-latest + steps: + - name: Review PR + uses: callstackai/action@main + with: + config: ${{ inputs.config }} + head: ${{ inputs.head }} + From ca7a21aaabdec2e880b680f78997960e2d70dec9 Mon Sep 17 00:00:00 2001 From: Adam Pavlisin Date: Wed, 7 Aug 2024 13:18:32 +0200 Subject: [PATCH 3/4] Update callstack-reviewer.yml --- .github/workflows/callstack-reviewer.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/callstack-reviewer.yml b/.github/workflows/callstack-reviewer.yml index 3452b7c3f..c0087a022 100644 --- a/.github/workflows/callstack-reviewer.yml +++ b/.github/workflows/callstack-reviewer.yml @@ -25,4 +25,5 @@ jobs: with: config: ${{ inputs.config }} head: ${{ inputs.head }} + export: true From 3349d3b90ff5b7b0281c298cc482c2926ff93b88 Mon Sep 17 00:00:00 2001 From: Adam Pavlisin Date: Fri, 9 Aug 2024 10:16:51 +0200 Subject: [PATCH 4/4] Update callstack-reviewer.yml --- .github/workflows/callstack-reviewer.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/callstack-reviewer.yml b/.github/workflows/callstack-reviewer.yml index c0087a022..712bb6891 100644 --- a/.github/workflows/callstack-reviewer.yml +++ b/.github/workflows/callstack-reviewer.yml @@ -25,5 +25,5 @@ jobs: with: config: ${{ inputs.config }} head: ${{ inputs.head }} - export: true + export: /code/chats.json