Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .callstack.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
pr_review:
# Default: true
auto_run: true
modules:
# Automatically create a description summarizing the changes in pull request.
description:
enabled: true
diagram: false

# Find potential bugs in pull request changes or related files.
bug_hunter:
enabled: true
# Include fixes to possible bugs.
suggestions: true

# Suggest improvements to added code.
code_suggestions:
enabled: true

# Suggest changes to follow defined code conventions.
code_conventions:
enabled: false
# Describe your code conventions in plain text.
conventions: |
E.g. Exported variables, functions, classes and methods should be defined before private.


# Point out any typos or grammatical errors in variable names, texts, comments.
grammar:
enabled: false

# Suggest performance improvements to added code.
performance:
enabled: true

# Find potential security issues in added code.
security:
enabled: true

29 changes: 29 additions & 0 deletions .github/workflows/callstack-reviewer.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Callstack.ai PR Review

on:
workflow_dispatch:
inputs:
config:
type: string
description: "config for reviewer"
required: true
head:
type: string
description: "head commit sha"
required: true
base:
type: string
description: "base commit sha"
required: false

jobs:
callstack_pr_review_job:
runs-on: ubuntu-latest
steps:
- name: Review PR
uses: callstackai/action@main
with:
config: ${{ inputs.config }}
head: ${{ inputs.head }}
export: /code/chats.json

4 changes: 2 additions & 2 deletions cli/crates/federated-dev/src/dev/gateway_nanny.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub struct CliRuntime {

impl engine_v2::Runtime for CliRuntime {
type Hooks = ();
type CacheFactory = ();
type OperationCacheFactory = ();

fn fetcher(&self) -> &runtime::fetch::Fetcher {
&self.fetcher
Expand All @@ -130,7 +130,7 @@ impl engine_v2::Runtime for CliRuntime {
&()
}

fn cache_factory(&self) -> &() {
fn operation_cache_factory(&self) -> &() {
&()
}

Expand Down
37 changes: 11 additions & 26 deletions engine/crates/engine-v2/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ::runtime::{
auth::AccessToken,
hooks::Hooks,
hot_cache::{CachedDataKind, HotCache, HotCacheFactory},
operation_cache::{OperationCache, OperationCacheFactory},
rate_limiting::RateLimitKey,
};
use async_runtime::stream::StreamExt as _;
Expand All @@ -27,7 +27,7 @@ use web_time::Instant;

use crate::{
execution::{ExecutableOperation, PreExecutionContext},
http_response::{HttpGraphqlResponse, HttpGraphqlResponseExtraMetadata},
http_response::HttpGraphqlResponse,
operation::{Operation, PreparedOperation, Variables},
response::{ErrorCode, GraphqlError, Response},
websocket,
Expand Down Expand Up @@ -59,8 +59,7 @@ pub struct Engine<R: Runtime> {
operation_metrics: GraphqlOperationMetrics,
auth: AuthService,
retry_budgets: RetryBudgets,
trusted_documents_cache: <R::CacheFactory as HotCacheFactory>::Cache<String>,
operation_cache: <R::CacheFactory as HotCacheFactory>::Cache<Arc<PreparedOperation>>,
operation_cache: <R::OperationCacheFactory as OperationCacheFactory>::Cache<Arc<PreparedOperation>>,
}

impl<R: Runtime> Engine<R> {
Expand Down Expand Up @@ -90,8 +89,7 @@ impl<R: Runtime> Engine<R> {
auth,
retry_budgets: RetryBudgets::build(&schema),
operation_metrics: GraphqlOperationMetrics::build(runtime.meter()),
trusted_documents_cache: runtime.cache_factory().create(CachedDataKind::TrustedDocument).await,
operation_cache: runtime.cache_factory().create(CachedDataKind::Operation).await,
operation_cache: runtime.operation_cache_factory().create().await,
schema,
runtime,
}
Expand All @@ -107,14 +105,13 @@ impl<R: Runtime> Engine<R> {
let format = headers.typed_get::<StreamingFormat>();
let request_context = match self.create_request_context(headers).await {
Ok(context) => context,
Err(response) => return HttpGraphqlResponse::build(response, format, Default::default()),
Err(response) => return HttpGraphqlResponse::build(response, format),
};

if let Err(err) = self.runtime.rate_limiter().limit(&RateLimitKey::Global).await {
return HttpGraphqlResponse::build(
Response::pre_execution_error(GraphqlError::new(err.to_string(), ErrorCode::RateLimited)),
format,
Default::default(),
);
}

Expand All @@ -128,7 +125,6 @@ impl<R: Runtime> Engine<R> {
HttpGraphqlResponse::build(
Response::execution_error(GraphqlError::new("Gateway timeout", ErrorCode::GatewayTimeout)),
format,
Default::default(),
)
}
.boxed(),
Expand Down Expand Up @@ -239,22 +235,11 @@ impl<R: Runtime> Engine<R> {
let (operation_metrics_attributes, response) = ctx.execute_single(request).await;
let status = response.status();

let mut response_metadata = HttpGraphqlResponseExtraMetadata {
operation_name: None,
operation_type: None,
has_errors: !status.is_success(),
};

let elapsed = start.elapsed();

if let Some(operation_metrics_attributes) = operation_metrics_attributes {
tracing::Span::current().record_gql_request((&operation_metrics_attributes).into());

response_metadata
.operation_name
.clone_from(&operation_metrics_attributes.name);
response_metadata.operation_type = Some(operation_metrics_attributes.ty.as_str());

self.operation_metrics.record(
GraphqlRequestMetricsAttributes {
operation: operation_metrics_attributes,
Expand All @@ -279,7 +264,7 @@ impl<R: Runtime> Engine<R> {
tracing::debug!(target: GRAFBASE_TARGET, "{message}")
}

HttpGraphqlResponse::build(response, None, response_metadata)
HttpGraphqlResponse::build(response, None)
}
.instrument(span)
.await
Expand Down Expand Up @@ -428,9 +413,9 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> {

if let Some(operation) = self.operation_cache.get(&cache_key).await {
Ok(operation)
} else if let Some(persisted_query) = document_fut {
match persisted_query.await {
Ok(query) => Err((cache_key, Some(query))),
} else if let Some(document_fut) = document_fut {
match document_fut.await {
Ok(document) => Err((cache_key, Some(document))),
Err(err) => return Err((None, Response::pre_execution_error(err))),
}
} else {
Expand All @@ -440,8 +425,8 @@ impl<'ctx, R: Runtime> PreExecutionContext<'ctx, R> {

let operation = match result {
Ok(operation) => operation,
Err((cache_key, query)) => {
if let Some(query) = query {
Err((cache_key, document)) => {
if let Some(query) = document {
request.query = query
}
let operation = Operation::build(&self.schema, &request)
Expand Down
45 changes: 12 additions & 33 deletions engine/crates/engine-v2/src/engine/cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::borrow::Cow;

use base64::{display::Base64Display, engine::general_purpose::URL_SAFE_NO_PAD};
use engine::PersistedQueryRequestExtension;
use schema::Schema;
Expand All @@ -6,8 +8,6 @@ use super::SchemaVersion;

mod namespaces {
pub const OPERATION: &str = "op";
pub const TRUSTED_DOCUMENT: &str = "tdoc";
pub const APQ: &str = "apq";
}

/// Unique cache key that generates a URL-safe string.
Expand All @@ -18,18 +18,11 @@ pub(super) enum Key<'a> {
schema_version: &'a SchemaVersion,
document: Document<'a>,
},
TrustedDocument {
client_name: &'a str,
document_id: &'a str,
},
Apq {
ext: &'a PersistedQueryRequestExtension,
},
}

pub(super) enum Document<'a> {
PersistedQueryExt(&'a PersistedQueryRequestExtension),
Id(&'a str),
AutomaticallyPersistedQuery(&'a PersistedQueryRequestExtension),
TrustedDocumentId { client_name: &'a str, doc_id: Cow<'a, str> },
Text(&'a str),
}

Expand All @@ -56,21 +49,24 @@ impl std::fmt::Display for Key<'_> {
// operation name.
hasher.update(&[0x00]);
match document {
Document::PersistedQueryExt(ext) => {
Document::AutomaticallyPersistedQuery(ext) => {
hasher.update(b"apq");
hasher.update(&[0x00]);
hasher.update(&ext.version.to_ne_bytes());
hasher.update(&ext.sha256_hash);
}
Document::Id(doc_id) => {
Document::TrustedDocumentId { client_name, doc_id } => {
hasher.update(b"docid");
hasher.update(&[0x00]);
hasher.update(&client_name.len().to_ne_bytes());
hasher.update(client_name.as_bytes());
hasher.update(&doc_id.len().to_ne_bytes());
hasher.update(doc_id.as_bytes());
}
Document::Text(query) => {
hasher.update(b"query");
Document::Text(document) => {
hasher.update(b"doc");
hasher.update(&[0x00]);
hasher.update(query.as_bytes());
hasher.update(document.as_bytes());
}
}
let hash = hasher.finalize();
Expand All @@ -81,23 +77,6 @@ impl std::fmt::Display for Key<'_> {
Base64Display::new(hash.as_bytes(), &URL_SAFE_NO_PAD)
))
}
Key::TrustedDocument {
client_name,
document_id,
} => f.write_fmt(format_args!(
"{}.{}.{}",
namespaces::TRUSTED_DOCUMENT,
Base64Display::new(client_name.as_bytes(), &URL_SAFE_NO_PAD),
Base64Display::new(document_id.as_bytes(), &URL_SAFE_NO_PAD)
)),
Key::Apq {
ext: PersistedQueryRequestExtension { version, sha256_hash },
} => f.write_fmt(format_args!(
"{}.{}.{}",
namespaces::APQ,
version,
Base64Display::new(sha256_hash, &URL_SAFE_NO_PAD)
)),
}
}
}
4 changes: 2 additions & 2 deletions engine/crates/engine-v2/src/engine/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use runtime::{entity_cache::EntityCache, fetch::Fetcher, kv::KvStore, rate_limit

pub trait Runtime: Send + Sync + 'static {
type Hooks: runtime::hooks::Hooks;
type CacheFactory: runtime::hot_cache::HotCacheFactory;
type OperationCacheFactory: runtime::operation_cache::OperationCacheFactory;

fn fetcher(&self) -> &Fetcher;
fn kv(&self) -> &KvStore;
fn trusted_documents(&self) -> &runtime::trusted_documents_client::Client;
fn meter(&self) -> &Meter;
fn hooks(&self) -> &Self::Hooks;
fn cache_factory(&self) -> &Self::CacheFactory;
fn operation_cache_factory(&self) -> &Self::OperationCacheFactory;
fn rate_limiter(&self) -> &RateLimiter;
fn sleep(&self, duration: std::time::Duration) -> BoxFuture<'static, ()>;
fn entity_cache(&self) -> &dyn EntityCache;
Expand Down
Loading