diff --git a/Cargo.lock b/Cargo.lock index d889b8a8..89a92ff9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -566,6 +566,12 @@ dependencies = [ "shlex", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cfg-if" version = "1.0.4" @@ -708,6 +714,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "compression-codecs" version = "0.4.36" @@ -1754,7 +1770,6 @@ dependencies = [ "tokio", "tokio-rustls", "tower-service", - "webpki-roots 1.0.5", ] [[package]] @@ -2048,6 +2063,28 @@ dependencies = [ "jiff-tzdb", ] +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.34" @@ -2540,7 +2577,10 @@ dependencies = [ "hipstr", "hmac", "jiff", - "reqwest", + "reqwest 0.13.2", + "reqwest-middleware", + "reqwest-retry", + "reqwest-tracing", "schemars", "serde", "serde_json", @@ -2576,7 +2616,7 @@ dependencies = [ "percent-encoding", "quick-xml", "rand 0.9.2", - "reqwest", + "reqwest 0.12.28", "ring", "rustls-pki-types", "serde", @@ -2954,6 +2994,7 @@ version = "0.11.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1906b49b0c3bc04b5fe5d86a77925ae6524a19b816ae38ce1e426255f1d8a31" dependencies = [ + "aws-lc-rs", "bytes", "getrandom 0.3.4", "lru-slab", @@ -3173,7 +3214,97 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 1.0.5", +] + +[[package]] +name = "reqwest" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "mime_guess", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "serde", + "serde_json", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "reqwest-middleware" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "199dda04a536b532d0cc04d7979e39b1c763ea749bf91507017069c00b96056f" +dependencies = [ + "anyhow", + "async-trait", + "http", + "reqwest 0.13.2", + "serde", + "thiserror 2.0.18", + "tower-service", +] + +[[package]] +name = "reqwest-retry" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe2412db2af7d2268e7a5406be0431f37d9eb67ff390f35b395716f5f06c2eaa" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "getrandom 0.2.17", + "http", + "hyper", + "reqwest 0.13.2", + "reqwest-middleware", + "retry-policies", + "thiserror 2.0.18", + "tokio", + "tracing", + "wasmtimer", +] + +[[package]] +name = "reqwest-tracing" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5c1a1510677d43dce9e9c0c07fc5db8772c0e5a43e4f9cef75a11affa05a578" +dependencies = [ + "anyhow", + "async-trait", + "getrandom 0.2.17", + "http", + "matchit", + "reqwest 0.13.2", + "reqwest-middleware", + "tracing", ] [[package]] @@ -3185,6 +3316,15 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "retry-policies" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46a4bd6027df676bcb752d3724db0ea3c0c5fc1dd0376fec51ac7dcaf9cc69be" +dependencies = [ + "rand 0.9.2", +] + [[package]] name = "ring" version = "0.17.14" @@ -3301,6 +3441,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +dependencies = [ + "core-foundation 0.10.1", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs 0.8.3", + "rustls-platform-verifier-android", + "rustls-webpki 0.103.9", + "security-framework 3.5.1", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.102.8" @@ -4554,6 +4721,20 @@ dependencies = [ "semver", ] +[[package]] +name = "wasmtimer" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c598d6b99ea013e35844697fc4670d08339d5cda15588f193c6beedd12f644b" +dependencies = [ + "futures", + "js-sys", + "parking_lot", + "pin-utils", + "slab", + "wasm-bindgen", +] + [[package]] name = "web-sys" version = "0.3.85" @@ -4574,6 +4755,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-root-certs" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "0.26.11" @@ -4671,6 +4861,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -4698,6 +4897,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -4731,6 +4945,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -4743,6 +4963,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -4755,6 +4981,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -4779,6 +5011,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -4791,6 +5029,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -4803,6 +5047,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -4815,6 +5065,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 75661eb4..a8df20c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,12 @@ async-stream = { version = "0.3", features = [] } async-trait = { version = "0.1", features = [] } # HTTP client -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] } +reqwest = { version = "0.13", default-features = false, features = ["rustls"] } + +# HTTP middleware +reqwest-middleware = { version = "0.5", features = ["json", "multipart"] } +reqwest-retry = { version = "0.9", features = [] } +reqwest-tracing = { version = "0.7", features = [] } # HTTP server axum = { version = "0.8", features = [] } diff --git a/README.md b/README.md index fdd31ee0..ed4375f6 100644 --- a/README.md +++ b/README.md @@ -3,11 +3,11 @@ [![Build](https://img.shields.io/github/actions/workflow/status/nvisycom/server/build.yml?branch=main&label=build%20%26%20test&style=flat-square)](https://github.com/nvisycom/server/actions/workflows/build.yml) Open-source multimodal redaction API. Detect and redact PII and sensitive data -across documents, images, audio, and video. +across documents, images, and audio. ## Features -- **Multimodal Redaction:** Detect and remove sensitive data across PDFs, images, audio, and video +- **Multimodal Redaction:** Detect and remove sensitive data across PDFs, images, and audio - **AI-Powered Detection:** LLM-driven PII and entity recognition with configurable redaction policies - **Workspace Isolation:** Multi-tenant workspaces with HKDF-derived credential encryption - **Real-Time Collaboration:** WebSocket and NATS pub/sub for live document editing diff --git a/crates/nvisy-nats/src/client/nats_client.rs b/crates/nvisy-nats/src/client/nats_client.rs index 64f3e582..5c123f63 100644 --- a/crates/nvisy-nats/src/client/nats_client.rs +++ b/crates/nvisy-nats/src/client/nats_client.rs @@ -42,8 +42,8 @@ use crate::kv::{ ApiToken, ApiTokensBucket, ChatHistoryBucket, KvBucket, KvKey, KvStore, SessionKey, TokenKey, }; use crate::object::{ - AccountKey, AvatarsBucket, FileKey, FilesBucket, IntermediatesBucket, ObjectBucket, ObjectKey, - ObjectStore, ThumbnailsBucket, + AccountKey, AvatarsBucket, ContextFilesBucket, ContextKey, FileKey, FilesBucket, + IntermediatesBucket, ObjectBucket, ObjectKey, ObjectStore, ThumbnailsBucket, }; use crate::stream::{EventPublisher, EventStream, EventSubscriber, FileStream, WebhookStream}; use crate::{Error, Result, TRACING_TARGET_CLIENT, TRACING_TARGET_CONNECTION}; @@ -255,6 +255,12 @@ impl NatsClient { pub async fn avatar_store(&self) -> Result> { self.object_store().await } + + /// Get or create a context file store for encrypted workspace contexts. + #[tracing::instrument(skip(self), target = TRACING_TARGET_CLIENT)] + pub async fn context_file_store(&self) -> Result> { + self.object_store().await + } } // Stream getters diff --git a/crates/nvisy-nats/src/object/mod.rs b/crates/nvisy-nats/src/object/mod.rs index d880736a..790a440a 100644 --- a/crates/nvisy-nats/src/object/mod.rs +++ b/crates/nvisy-nats/src/object/mod.rs @@ -30,8 +30,9 @@ mod object_key; mod object_store; pub use object_bucket::{ - AvatarsBucket, FilesBucket, IntermediatesBucket, ObjectBucket, ThumbnailsBucket, + AvatarsBucket, ContextFilesBucket, FilesBucket, IntermediatesBucket, ObjectBucket, + ThumbnailsBucket, }; pub use object_data::{GetResult, PutResult}; -pub use object_key::{AccountKey, FileKey, ObjectKey}; +pub use object_key::{AccountKey, ContextKey, FileKey, ObjectKey}; pub use object_store::ObjectStore; diff --git a/crates/nvisy-nats/src/object/object_bucket.rs b/crates/nvisy-nats/src/object/object_bucket.rs index c4acc3f5..d954ec3f 100644 --- a/crates/nvisy-nats/src/object/object_bucket.rs +++ b/crates/nvisy-nats/src/object/object_bucket.rs @@ -59,6 +59,17 @@ impl ObjectBucket for AvatarsBucket { const NAME: &'static str = "ACCOUNT_AVATARS"; } +/// Storage for encrypted workspace context files. +/// +/// No expiration, context files are retained indefinitely. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)] +pub struct ContextFilesBucket; + +impl ObjectBucket for ContextFilesBucket { + const MAX_AGE: Option = None; + const NAME: &'static str = "CONTEXT_FILES"; +} + #[cfg(test)] mod tests { use super::*; @@ -69,6 +80,7 @@ mod tests { assert_eq!(IntermediatesBucket::NAME, "DOCUMENT_INTERMEDIATES"); assert_eq!(ThumbnailsBucket::NAME, "DOCUMENT_THUMBNAILS"); assert_eq!(AvatarsBucket::NAME, "ACCOUNT_AVATARS"); + assert_eq!(ContextFilesBucket::NAME, "CONTEXT_FILES"); } #[test] @@ -80,5 +92,6 @@ mod tests { ); assert_eq!(ThumbnailsBucket::MAX_AGE, None); assert_eq!(AvatarsBucket::MAX_AGE, None); + assert_eq!(ContextFilesBucket::MAX_AGE, None); } } diff --git a/crates/nvisy-nats/src/object/object_key.rs b/crates/nvisy-nats/src/object/object_key.rs index 16fe2af2..2fd5aae9 100644 --- a/crates/nvisy-nats/src/object/object_key.rs +++ b/crates/nvisy-nats/src/object/object_key.rs @@ -164,6 +164,81 @@ impl From for AccountKey { } } +/// A validated key for context file objects in NATS object storage. +/// +/// The key is encoded as `ctx_` prefix followed by URL-safe base64 of the +/// concatenated workspace ID and context ID. This produces a key like +/// `ctx_ABC123...` from two UUIDs (32 bytes -> base64). +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ContextKey { + pub workspace_id: Uuid, + pub context_id: Uuid, +} + +impl ObjectKey for ContextKey { + const PREFIX: &'static str = "ctx_"; +} + +impl ContextKey { + /// Creates a new context key from workspace and context IDs. + pub fn new(workspace_id: Uuid, context_id: Uuid) -> Self { + Self { + workspace_id, + context_id, + } + } + + /// Encodes the key payload as URL-safe base64. + fn encode_payload(&self) -> String { + let mut bytes = [0u8; 32]; + bytes[..16].copy_from_slice(self.workspace_id.as_bytes()); + bytes[16..].copy_from_slice(self.context_id.as_bytes()); + BASE64_URL_SAFE_NO_PAD.encode(bytes) + } + + /// Decodes a key payload from URL-safe base64. + fn decode_payload(s: &str) -> Result { + let bytes = BASE64_URL_SAFE_NO_PAD.decode(s).map_err(|e| { + Error::operation("parse_key", format!("Invalid base64 encoding: {}", e)) + })?; + + if bytes.len() != 32 { + return Err(Error::operation( + "parse_key", + format!("Invalid key length: expected 32 bytes, got {}", bytes.len()), + )); + } + + let workspace_id = Uuid::from_slice(&bytes[..16]) + .map_err(|e| Error::operation("parse_key", format!("Invalid workspace UUID: {}", e)))?; + + let context_id = Uuid::from_slice(&bytes[16..]) + .map_err(|e| Error::operation("parse_key", format!("Invalid context UUID: {}", e)))?; + + Ok(Self::new(workspace_id, context_id)) + } +} + +impl fmt::Display for ContextKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}{}", Self::PREFIX, self.encode_payload()) + } +} + +impl FromStr for ContextKey { + type Err = Error; + + fn from_str(s: &str) -> Result { + let payload = s.strip_prefix(Self::PREFIX).ok_or_else(|| { + Error::operation( + "parse_key", + format!("Invalid key prefix: expected '{}'", Self::PREFIX), + ) + })?; + Self::decode_payload(payload) + } +} + #[cfg(test)] mod tests { use super::*; @@ -279,4 +354,51 @@ mod tests { assert!(AccountKey::from_str("account_not-a-uuid").is_err()); } } + + mod context_key { + use super::*; + + #[test] + fn test_prefix() { + assert_eq!(ContextKey::PREFIX, "ctx_"); + } + + #[test] + fn test_new() { + let workspace_id = Uuid::new_v4(); + let context_id = Uuid::new_v4(); + let key = ContextKey::new(workspace_id, context_id); + assert_eq!(key.workspace_id, workspace_id); + assert_eq!(key.context_id, context_id); + } + + #[test] + fn test_display_has_prefix() { + let key = ContextKey::new(Uuid::new_v4(), Uuid::new_v4()); + let encoded = key.to_string(); + assert!(encoded.starts_with("ctx_")); + // prefix (4) + base64 (43) = 47 + assert_eq!(encoded.len(), 47); + } + + #[test] + fn test_roundtrip() { + let workspace_id = Uuid::new_v4(); + let context_id = Uuid::new_v4(); + + let key = ContextKey::new(workspace_id, context_id); + let encoded = key.to_string(); + let decoded: ContextKey = encoded.parse().unwrap(); + + assert_eq!(decoded.workspace_id, workspace_id); + assert_eq!(decoded.context_id, context_id); + assert_eq!(key, decoded); + } + + #[test] + fn test_from_str_invalid_prefix() { + assert!(ContextKey::from_str("file_abc").is_err()); + assert!(ContextKey::from_str("abc").is_err()); + } + } } diff --git a/crates/nvisy-object/src/client/mod.rs b/crates/nvisy-object/src/client/mod.rs index e8af0e5b..ebf84505 100644 --- a/crates/nvisy-object/src/client/mod.rs +++ b/crates/nvisy-object/src/client/mod.rs @@ -36,7 +36,7 @@ impl ObjectStoreClient { /// Verify that the backing store is reachable. /// - /// Issues a HEAD for a probe key — a not-found response is treated as + /// Issues a HEAD for a probe key: a not-found response is treated as /// success (the bucket/container exists), any other error is propagated. #[tracing::instrument(name = "object.verify", skip(self))] pub async fn verify_reachable(&self) -> Result<(), Error> { diff --git a/crates/nvisy-postgres/src/model/mod.rs b/crates/nvisy-postgres/src/model/mod.rs index bef91290..340c3bf3 100644 --- a/crates/nvisy-postgres/src/model/mod.rs +++ b/crates/nvisy-postgres/src/model/mod.rs @@ -10,6 +10,7 @@ mod account_notification; mod workspace; mod workspace_activity; mod workspace_connection; +mod workspace_context; mod workspace_file; mod workspace_file_annotation; mod workspace_file_chunk; @@ -35,6 +36,7 @@ pub use workspace_activity::{NewWorkspaceActivity, WorkspaceActivity}; pub use workspace_connection::{ NewWorkspaceConnection, UpdateWorkspaceConnection, WorkspaceConnection, }; +pub use workspace_context::{NewWorkspaceContext, UpdateWorkspaceContext, WorkspaceContext}; pub use workspace_file::{NewWorkspaceFile, UpdateWorkspaceFile, WorkspaceFile}; // File models pub use workspace_file_annotation::{ diff --git a/crates/nvisy-postgres/src/model/workspace_context.rs b/crates/nvisy-postgres/src/model/workspace_context.rs new file mode 100644 index 00000000..d7209c0b --- /dev/null +++ b/crates/nvisy-postgres/src/model/workspace_context.rs @@ -0,0 +1,118 @@ +//! Workspace context model for PostgreSQL database operations. + +use diesel::prelude::*; +use jiff_diesel::Timestamp; +use serde_json::Value as JsonValue; +use uuid::Uuid; + +use crate::schema::workspace_contexts; +use crate::types::{HasCreatedAt, HasDeletedAt, HasUpdatedAt}; + +/// Workspace context model representing metadata for encrypted context files. +/// +/// The actual encrypted content is stored in NATS object storage. +/// This record holds the metadata and storage reference. +#[derive(Debug, Clone, PartialEq, Queryable, Selectable)] +#[diesel(table_name = workspace_contexts)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct WorkspaceContext { + /// Unique context identifier. + pub id: Uuid, + /// Reference to the workspace this context belongs to. + pub workspace_id: Uuid, + /// Reference to the account that created this context. + pub account_id: Uuid, + /// Human-readable context name. + pub name: String, + /// Context description. + pub description: Option, + /// Content MIME type. + pub mime_type: String, + /// NATS object store key for the encrypted content. + pub storage_key: String, + /// Size of the encrypted content in bytes. + pub content_size: i64, + /// SHA-256 hash of the encrypted content. + pub content_hash: Vec, + /// Non-encrypted metadata for filtering/display. + pub metadata: JsonValue, + /// Timestamp when the context was created. + pub created_at: Timestamp, + /// Timestamp when the context was last updated. + pub updated_at: Timestamp, + /// Timestamp when the context was soft-deleted. + pub deleted_at: Option, +} + +/// Data for creating a new workspace context. +#[derive(Debug, Clone, Insertable)] +#[diesel(table_name = workspace_contexts)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct NewWorkspaceContext { + /// Workspace ID (required). + pub workspace_id: Uuid, + /// Account ID (required). + pub account_id: Uuid, + /// Context name. + pub name: String, + /// Context description. + pub description: Option, + /// Content MIME type. + pub mime_type: String, + /// NATS object store key. + pub storage_key: String, + /// Size of the encrypted content in bytes. + pub content_size: i64, + /// SHA-256 hash of the encrypted content. + pub content_hash: Vec, + /// Non-encrypted metadata for filtering/display. + pub metadata: Option, +} + +/// Data for updating a workspace context. +#[derive(Debug, Clone, Default, AsChangeset)] +#[diesel(table_name = workspace_contexts)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct UpdateWorkspaceContext { + /// Context name. + pub name: Option, + /// Context description. + pub description: Option>, + /// Content MIME type. + pub mime_type: Option, + /// NATS object store key (updated on content replacement). + pub storage_key: Option, + /// Size of the encrypted content in bytes. + pub content_size: Option, + /// SHA-256 hash of the encrypted content. + pub content_hash: Option>, + /// Non-encrypted metadata for filtering/display. + pub metadata: Option, + /// Soft delete timestamp. + pub deleted_at: Option>, +} + +impl WorkspaceContext { + /// Returns whether the context is deleted. + pub fn is_deleted(&self) -> bool { + self.deleted_at.is_some() + } +} + +impl HasCreatedAt for WorkspaceContext { + fn created_at(&self) -> jiff::Timestamp { + self.created_at.into() + } +} + +impl HasUpdatedAt for WorkspaceContext { + fn updated_at(&self) -> jiff::Timestamp { + self.updated_at.into() + } +} + +impl HasDeletedAt for WorkspaceContext { + fn deleted_at(&self) -> Option { + self.deleted_at.map(Into::into) + } +} diff --git a/crates/nvisy-postgres/src/query/mod.rs b/crates/nvisy-postgres/src/query/mod.rs index be8c8fcf..291327bd 100644 --- a/crates/nvisy-postgres/src/query/mod.rs +++ b/crates/nvisy-postgres/src/query/mod.rs @@ -20,6 +20,7 @@ mod account_notification; mod workspace; mod workspace_activity; mod workspace_connection; +mod workspace_context; mod workspace_file; mod workspace_file_annotation; mod workspace_file_chunk; @@ -37,6 +38,7 @@ pub use account_notification::AccountNotificationRepository; pub use workspace::WorkspaceRepository; pub use workspace_activity::WorkspaceActivityRepository; pub use workspace_connection::WorkspaceConnectionRepository; +pub use workspace_context::WorkspaceContextRepository; pub use workspace_file::WorkspaceFileRepository; pub use workspace_file_annotation::WorkspaceFileAnnotationRepository; pub use workspace_file_chunk::WorkspaceFileChunkRepository; diff --git a/crates/nvisy-postgres/src/query/workspace_context.rs b/crates/nvisy-postgres/src/query/workspace_context.rs new file mode 100644 index 00000000..368bee72 --- /dev/null +++ b/crates/nvisy-postgres/src/query/workspace_context.rs @@ -0,0 +1,248 @@ +//! Workspace contexts repository for managing context file metadata. + +use std::future::Future; + +use diesel::prelude::*; +use diesel_async::RunQueryDsl; +use uuid::Uuid; + +use crate::model::{NewWorkspaceContext, UpdateWorkspaceContext, WorkspaceContext}; +use crate::types::{CursorPage, CursorPagination, OffsetPagination}; +use crate::{PgConnection, PgError, PgResult, schema}; + +/// Repository for workspace context database operations. +pub trait WorkspaceContextRepository { + /// Creates a new workspace context record. + fn create_workspace_context( + &mut self, + new_context: NewWorkspaceContext, + ) -> impl Future> + Send; + + /// Finds a context by its unique identifier. + fn find_workspace_context_by_id( + &mut self, + context_id: Uuid, + ) -> impl Future>> + Send; + + /// Finds a context by ID within a specific workspace. + fn find_context_in_workspace( + &mut self, + workspace_id: Uuid, + context_id: Uuid, + ) -> impl Future>> + Send; + + /// Lists all contexts in a workspace with offset pagination. + fn offset_list_workspace_contexts( + &mut self, + workspace_id: Uuid, + pagination: OffsetPagination, + ) -> impl Future>> + Send; + + /// Lists all contexts in a workspace with cursor pagination. + fn cursor_list_workspace_contexts( + &mut self, + workspace_id: Uuid, + pagination: CursorPagination, + ) -> impl Future>> + Send; + + /// Updates a context with new data. + fn update_workspace_context( + &mut self, + context_id: Uuid, + updates: UpdateWorkspaceContext, + ) -> impl Future> + Send; + + /// Soft deletes a context by setting the deletion timestamp. + fn delete_workspace_context( + &mut self, + context_id: Uuid, + ) -> impl Future> + Send; + + /// Counts contexts in a workspace. + fn count_workspace_contexts( + &mut self, + workspace_id: Uuid, + ) -> impl Future> + Send; +} + +impl WorkspaceContextRepository for PgConnection { + async fn create_workspace_context( + &mut self, + new_context: NewWorkspaceContext, + ) -> PgResult { + use schema::workspace_contexts; + + let context = diesel::insert_into(workspace_contexts::table) + .values(&new_context) + .returning(WorkspaceContext::as_returning()) + .get_result(self) + .await + .map_err(PgError::from)?; + + Ok(context) + } + + async fn find_workspace_context_by_id( + &mut self, + context_id: Uuid, + ) -> PgResult> { + use schema::workspace_contexts::{self, dsl}; + + let context = workspace_contexts::table + .filter(dsl::id.eq(context_id)) + .filter(dsl::deleted_at.is_null()) + .select(WorkspaceContext::as_select()) + .first(self) + .await + .optional() + .map_err(PgError::from)?; + + Ok(context) + } + + async fn find_context_in_workspace( + &mut self, + workspace_id: Uuid, + context_id: Uuid, + ) -> PgResult> { + use schema::workspace_contexts::{self, dsl}; + + let context = workspace_contexts::table + .filter(dsl::id.eq(context_id)) + .filter(dsl::workspace_id.eq(workspace_id)) + .filter(dsl::deleted_at.is_null()) + .select(WorkspaceContext::as_select()) + .first(self) + .await + .optional() + .map_err(PgError::from)?; + + Ok(context) + } + + async fn offset_list_workspace_contexts( + &mut self, + workspace_id: Uuid, + pagination: OffsetPagination, + ) -> PgResult> { + use schema::workspace_contexts::{self, dsl}; + + let contexts = workspace_contexts::table + .filter(dsl::workspace_id.eq(workspace_id)) + .filter(dsl::deleted_at.is_null()) + .order(dsl::created_at.desc()) + .limit(pagination.limit) + .offset(pagination.offset) + .select(WorkspaceContext::as_select()) + .load(self) + .await + .map_err(PgError::from)?; + + Ok(contexts) + } + + async fn cursor_list_workspace_contexts( + &mut self, + workspace_id: Uuid, + pagination: CursorPagination, + ) -> PgResult> { + use schema::workspace_contexts::{self, dsl}; + + let total = if pagination.include_count { + Some( + workspace_contexts::table + .filter(dsl::workspace_id.eq(workspace_id)) + .filter(dsl::deleted_at.is_null()) + .count() + .get_result::(self) + .await + .map_err(PgError::from)?, + ) + } else { + None + }; + + let query = workspace_contexts::table + .filter(dsl::workspace_id.eq(workspace_id)) + .filter(dsl::deleted_at.is_null()) + .into_boxed(); + + let limit = pagination.limit + 1; + + let items: Vec = if let Some(cursor) = &pagination.after { + let cursor_time = jiff_diesel::Timestamp::from(cursor.timestamp); + + query + .filter( + dsl::created_at + .lt(&cursor_time) + .or(dsl::created_at.eq(&cursor_time).and(dsl::id.lt(cursor.id))), + ) + .select(WorkspaceContext::as_select()) + .order((dsl::created_at.desc(), dsl::id.desc())) + .limit(limit) + .load(self) + .await + .map_err(PgError::from)? + } else { + query + .select(WorkspaceContext::as_select()) + .order((dsl::created_at.desc(), dsl::id.desc())) + .limit(limit) + .load(self) + .await + .map_err(PgError::from)? + }; + + Ok(CursorPage::new( + items, + total, + pagination.limit, + |c: &WorkspaceContext| (c.created_at.into(), c.id), + )) + } + + async fn update_workspace_context( + &mut self, + context_id: Uuid, + updates: UpdateWorkspaceContext, + ) -> PgResult { + use schema::workspace_contexts::{self, dsl}; + + let context = diesel::update(workspace_contexts::table.filter(dsl::id.eq(context_id))) + .set(&updates) + .returning(WorkspaceContext::as_returning()) + .get_result(self) + .await + .map_err(PgError::from)?; + + Ok(context) + } + + async fn delete_workspace_context(&mut self, context_id: Uuid) -> PgResult<()> { + use diesel::dsl::now; + use schema::workspace_contexts::{self, dsl}; + + diesel::update(workspace_contexts::table.filter(dsl::id.eq(context_id))) + .set(dsl::deleted_at.eq(now)) + .execute(self) + .await + .map_err(PgError::from)?; + + Ok(()) + } + + async fn count_workspace_contexts(&mut self, workspace_id: Uuid) -> PgResult { + use schema::workspace_contexts::{self, dsl}; + + let count = workspace_contexts::table + .filter(dsl::workspace_id.eq(workspace_id)) + .filter(dsl::deleted_at.is_null()) + .count() + .get_result(self) + .await + .map_err(PgError::from)?; + + Ok(count) + } +} diff --git a/crates/nvisy-postgres/src/schema.rs b/crates/nvisy-postgres/src/schema.rs index 9154bc35..2ab28971 100644 --- a/crates/nvisy-postgres/src/schema.rs +++ b/crates/nvisy-postgres/src/schema.rs @@ -163,6 +163,27 @@ diesel::table! { } } +diesel::table! { + use diesel::sql_types::*; + use pgvector::sql_types::*; + + workspace_contexts (id) { + id -> Uuid, + workspace_id -> Uuid, + account_id -> Uuid, + name -> Text, + description -> Nullable, + mime_type -> Text, + storage_key -> Text, + content_size -> Int8, + content_hash -> Bytea, + metadata -> Jsonb, + created_at -> Timestamptz, + updated_at -> Timestamptz, + deleted_at -> Nullable, + } +} + diesel::table! { use diesel::sql_types::*; use pgvector::sql_types::*; @@ -402,6 +423,8 @@ diesel::joinable!(workspace_activities -> accounts (account_id)); diesel::joinable!(workspace_activities -> workspaces (workspace_id)); diesel::joinable!(workspace_connections -> accounts (account_id)); diesel::joinable!(workspace_connections -> workspaces (workspace_id)); +diesel::joinable!(workspace_contexts -> accounts (account_id)); +diesel::joinable!(workspace_contexts -> workspaces (workspace_id)); diesel::joinable!(workspace_file_annotations -> accounts (account_id)); diesel::joinable!(workspace_file_annotations -> workspace_files (file_id)); diesel::joinable!(workspace_file_chunks -> workspace_files (file_id)); @@ -426,6 +449,7 @@ diesel::allow_tables_to_appear_in_same_query!( accounts, workspace_activities, workspace_connections, + workspace_contexts, workspace_file_annotations, workspace_file_chunks, workspace_files, diff --git a/crates/nvisy-postgres/src/types/constraint/mod.rs b/crates/nvisy-postgres/src/types/constraint/mod.rs index 6417df98..8ac36326 100644 --- a/crates/nvisy-postgres/src/types/constraint/mod.rs +++ b/crates/nvisy-postgres/src/types/constraint/mod.rs @@ -27,6 +27,7 @@ mod pipeline_runs; mod pipelines; mod workspace_connections; +mod workspace_contexts; use std::fmt; @@ -44,6 +45,7 @@ pub use self::pipeline_runs::PipelineRunConstraints; pub use self::pipelines::PipelineConstraints; pub use self::workspace_activities::WorkspaceActivitiesConstraints; pub use self::workspace_connections::WorkspaceConnectionConstraints; +pub use self::workspace_contexts::WorkspaceContextConstraints; pub use self::workspace_invites::WorkspaceInviteConstraints; pub use self::workspace_members::WorkspaceMemberConstraints; pub use self::workspace_webhooks::WorkspaceWebhookConstraints; @@ -80,6 +82,7 @@ pub enum ConstraintViolation { PipelineRun(PipelineRunConstraints), PipelineArtifact(PipelineArtifactConstraints), WorkspaceConnection(WorkspaceConnectionConstraints), + WorkspaceContext(WorkspaceContextConstraints), } /// Categories of database constraint violations. @@ -146,6 +149,7 @@ impl ConstraintViolation { WorkspaceActivitiesConstraints::new => WorkspaceActivityLog, WorkspaceWebhookConstraints::new => WorkspaceWebhook, WorkspaceConnectionConstraints::new => WorkspaceConnection, + WorkspaceContextConstraints::new => WorkspaceContext, FileAnnotationConstraints::new => FileAnnotation, FileChunkConstraints::new => FileChunk, PipelineRunConstraints::new => PipelineRun, @@ -185,6 +189,7 @@ impl ConstraintViolation { ConstraintViolation::PipelineRun(_) => "workspace_pipeline_runs", ConstraintViolation::PipelineArtifact(_) => "workspace_pipeline_artifacts", ConstraintViolation::WorkspaceConnection(_) => "workspace_connections", + ConstraintViolation::WorkspaceContext(_) => "workspace_contexts", } } @@ -213,6 +218,7 @@ impl ConstraintViolation { | ConstraintViolation::PipelineArtifact(_) => "pipelines", ConstraintViolation::WorkspaceConnection(_) => "connections", + ConstraintViolation::WorkspaceContext(_) => "contexts", } } @@ -240,6 +246,7 @@ impl ConstraintViolation { ConstraintViolation::PipelineRun(c) => c.categorize(), ConstraintViolation::PipelineArtifact(c) => c.categorize(), ConstraintViolation::WorkspaceConnection(c) => c.categorize(), + ConstraintViolation::WorkspaceContext(c) => c.categorize(), } } @@ -272,6 +279,7 @@ impl fmt::Display for ConstraintViolation { ConstraintViolation::PipelineRun(c) => write!(f, "{}", c), ConstraintViolation::PipelineArtifact(c) => write!(f, "{}", c), ConstraintViolation::WorkspaceConnection(c) => write!(f, "{}", c), + ConstraintViolation::WorkspaceContext(c) => write!(f, "{}", c), } } } diff --git a/crates/nvisy-postgres/src/types/constraint/workspace_contexts.rs b/crates/nvisy-postgres/src/types/constraint/workspace_contexts.rs new file mode 100644 index 00000000..6a1f34c2 --- /dev/null +++ b/crates/nvisy-postgres/src/types/constraint/workspace_contexts.rs @@ -0,0 +1,89 @@ +//! Workspace contexts table constraint violations. + +use serde::{Deserialize, Serialize}; +use strum::{Display, EnumIter, EnumString}; + +use super::ConstraintCategory; + +/// Workspace contexts table constraint violations. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Serialize, Deserialize, Display, EnumIter, EnumString)] +#[serde(into = "String", try_from = "String")] +pub enum WorkspaceContextConstraints { + // Name validation constraints + #[strum(serialize = "workspace_contexts_name_length")] + NameLength, + + // Description validation constraints + #[strum(serialize = "workspace_contexts_description_length")] + DescriptionLength, + + // MIME type validation constraints + #[strum(serialize = "workspace_contexts_mime_type_length")] + MimeTypeLength, + + // Storage key validation constraints + #[strum(serialize = "workspace_contexts_storage_key_length")] + StorageKeyLength, + + // Content validation constraints + #[strum(serialize = "workspace_contexts_content_size_positive")] + ContentSizePositive, + #[strum(serialize = "workspace_contexts_content_hash_length")] + ContentHashLength, + + // Metadata validation constraints + #[strum(serialize = "workspace_contexts_metadata_size")] + MetadataSize, + + // Uniqueness constraints + #[strum(serialize = "workspace_contexts_name_unique_idx")] + NameUnique, + + // Chronological constraints + #[strum(serialize = "workspace_contexts_updated_after_created")] + UpdatedAfterCreated, + #[strum(serialize = "workspace_contexts_deleted_after_created")] + DeletedAfterCreated, +} + +impl WorkspaceContextConstraints { + /// Creates a new [`WorkspaceContextConstraints`] from the constraint name. + pub fn new(constraint: &str) -> Option { + constraint.parse().ok() + } + + /// Returns the category of this constraint violation. + pub fn categorize(&self) -> ConstraintCategory { + match self { + WorkspaceContextConstraints::NameLength + | WorkspaceContextConstraints::DescriptionLength + | WorkspaceContextConstraints::MimeTypeLength + | WorkspaceContextConstraints::StorageKeyLength + | WorkspaceContextConstraints::ContentSizePositive + | WorkspaceContextConstraints::ContentHashLength + | WorkspaceContextConstraints::MetadataSize => ConstraintCategory::Validation, + + WorkspaceContextConstraints::NameUnique => ConstraintCategory::Uniqueness, + + WorkspaceContextConstraints::UpdatedAfterCreated + | WorkspaceContextConstraints::DeletedAfterCreated => ConstraintCategory::Chronological, + } + } +} + +impl From for String { + #[inline] + fn from(val: WorkspaceContextConstraints) -> Self { + val.to_string() + } +} + +impl TryFrom for WorkspaceContextConstraints { + type Error = strum::ParseError; + + #[inline] + fn try_from(value: String) -> Result { + value.parse() + } +} diff --git a/crates/nvisy-postgres/src/types/mod.rs b/crates/nvisy-postgres/src/types/mod.rs index 1c7a743c..c7a10a6b 100644 --- a/crates/nvisy-postgres/src/types/mod.rs +++ b/crates/nvisy-postgres/src/types/mod.rs @@ -18,8 +18,8 @@ pub use constraint::{ AccountNotificationConstraints, ConstraintCategory, ConstraintViolation, FileAnnotationConstraints, FileChunkConstraints, FileConstraints, PipelineArtifactConstraints, PipelineConstraints, PipelineRunConstraints, WorkspaceActivitiesConstraints, - WorkspaceConnectionConstraints, WorkspaceConstraints, WorkspaceInviteConstraints, - WorkspaceMemberConstraints, WorkspaceWebhookConstraints, + WorkspaceConnectionConstraints, WorkspaceConstraints, WorkspaceContextConstraints, + WorkspaceInviteConstraints, WorkspaceMemberConstraints, WorkspaceWebhookConstraints, }; pub use enums::{ ActionTokenType, ActivityCategory, ActivityType, AnnotationType, ApiTokenType, ArtifactType, diff --git a/crates/nvisy-server/src/extract/auth/permission.rs b/crates/nvisy-server/src/extract/auth/permission.rs index 6df78075..45651437 100644 --- a/crates/nvisy-server/src/extract/auth/permission.rs +++ b/crates/nvisy-server/src/extract/auth/permission.rs @@ -65,6 +65,12 @@ pub enum Permission { /// Can create, modify, and manage workspace connections. ManageConnections, + // Context permissions + /// Can view workspace contexts. + ViewContexts, + /// Can create, modify, and manage workspace contexts. + ManageContexts, + // Webhook permissions /// Can view workspace webhooks. ViewWebhooks, @@ -98,6 +104,7 @@ impl Permission { | Self::ViewPipelines | Self::ViewMembers | Self::ViewConnections + | Self::ViewContexts | Self::ViewWebhooks => WorkspaceRole::Guest, // Member-level permissions (create and modify own resources) @@ -116,6 +123,7 @@ impl Permission { | Self::InviteMembers | Self::RemoveMembers | Self::ManageConnections + | Self::ManageContexts | Self::CreateWebhooks | Self::UpdateWebhooks | Self::DeleteWebhooks diff --git a/crates/nvisy-server/src/handler/contexts.rs b/crates/nvisy-server/src/handler/contexts.rs new file mode 100644 index 00000000..1f9b246b --- /dev/null +++ b/crates/nvisy-server/src/handler/contexts.rs @@ -0,0 +1,491 @@ +//! Workspace context file management handlers. +//! +//! Context files are encrypted JSON documents stored in NATS object storage. +//! The metadata (name, size, hash) is stored in PostgreSQL while the actual +//! content is encrypted with workspace-derived keys and stored as objects. + +use aide::axum::ApiRouter; +use aide::transform::TransformOperation; +use axum::extract::{DefaultBodyLimit, State}; +use axum::http::StatusCode; +use nvisy_nats::NatsClient; +use nvisy_nats::object::{ContextFilesBucket, ContextKey}; +use nvisy_postgres::PgClient; +use nvisy_postgres::model::{NewWorkspaceContext, UpdateWorkspaceContext}; +use nvisy_postgres::query::WorkspaceContextRepository; +use uuid::Uuid; + +use crate::extract::{AuthProvider, AuthState, Json, Multipart, Path, Permission, Query}; +use crate::handler::request::{ContextPathParams, CursorPagination, WorkspacePathParams}; +use crate::handler::response::{Context, ContextsPage, ErrorResponse}; +use crate::handler::{ErrorKind, Result}; +use crate::middleware::DEFAULT_MAX_FILE_BODY_SIZE; +use crate::service::crypto::encrypt; +use crate::service::{MasterKey, ServiceState}; + +/// Tracing target for workspace context operations. +const TRACING_TARGET: &str = "nvisy_server::handler::contexts"; + +/// Creates a new workspace context from a multipart upload. +/// +/// Expects a multipart form with: +/// - `name`: Context name (text field) +/// - `file`: The JSON context file (file field) +/// - `description`: Optional description (text field) +#[tracing::instrument( + skip_all, + fields( + account_id = %auth_state.account_id, + workspace_id = %path_params.workspace_id, + ) +)] +async fn create_context( + State(pg_client): State, + State(nats_client): State, + State(master_key): State, + AuthState(auth_state): AuthState, + Path(path_params): Path, + Multipart(mut multipart): Multipart, +) -> Result<(StatusCode, Json)> { + tracing::debug!(target: TRACING_TARGET, "Creating workspace context"); + + let mut conn = pg_client.get_connection().await?; + + auth_state + .authorize_workspace( + &mut conn, + path_params.workspace_id, + Permission::ManageContexts, + ) + .await?; + + let mut name: Option = None; + let mut description: Option = None; + let mut file_content: Option> = None; + + while let Some(field) = multipart.next_field().await.map_err(|err| { + tracing::error!(target: TRACING_TARGET, error = %err, "Failed to read multipart field"); + ErrorKind::BadRequest + .with_message("Invalid multipart data") + .with_context(format!("Failed to parse multipart form: {}", err)) + })? { + let field_name = field.name().unwrap_or_default().to_string(); + + match field_name.as_str() { + "name" => { + name = Some(field.text().await.map_err(|err| { + ErrorKind::BadRequest + .with_message("Failed to read name field") + .with_context(err.to_string()) + })?); + } + "description" => { + description = Some(field.text().await.map_err(|err| { + ErrorKind::BadRequest + .with_message("Failed to read description field") + .with_context(err.to_string()) + })?); + } + "file" => { + let bytes = field.bytes().await.map_err(|err| { + ErrorKind::BadRequest + .with_message("Failed to read file content") + .with_context(err.to_string()) + })?; + + // Validate it's valid JSON + serde_json::from_slice::(&bytes).map_err(|err| { + ErrorKind::BadRequest + .with_message("Context file must be valid JSON") + .with_context(err.to_string()) + })?; + + file_content = Some(bytes.to_vec()); + } + _ => { + tracing::debug!( + target: TRACING_TARGET, + field = %field_name, + "Skipping unknown multipart field" + ); + } + } + } + + let name = + name.ok_or_else(|| ErrorKind::BadRequest.with_message("Missing required 'name' field"))?; + let content = file_content + .ok_or_else(|| ErrorKind::BadRequest.with_message("Missing required 'file' field"))?; + + // Encrypt the content with workspace-derived key + let workspace_key = master_key.derive_workspace_key(path_params.workspace_id); + let encrypted_content = + encrypt(&workspace_key, &content).map_err(|e: crate::service::crypto::CryptoError| { + ErrorKind::InternalServerError + .with_message("Failed to encrypt context content") + .with_context(e.to_string()) + })?; + + // Generate the object store key + let context_id = Uuid::now_v7(); + let context_key = ContextKey::new(path_params.workspace_id, context_id); + + // Store encrypted content in NATS + let context_store = nats_client + .object_store::() + .await?; + + let reader = std::io::Cursor::new(encrypted_content); + let put_result = context_store.put(&context_key, reader).await?; + + tracing::debug!( + target: TRACING_TARGET, + context_key = %context_key, + size = put_result.size(), + "Context file stored in NATS" + ); + + // Create metadata record in PostgreSQL + let new_context = NewWorkspaceContext { + workspace_id: path_params.workspace_id, + account_id: auth_state.account_id, + name, + description, + mime_type: "application/json".to_string(), + storage_key: context_key.to_string(), + content_size: put_result.size() as i64, + content_hash: put_result.sha256().to_vec(), + metadata: None, + }; + + let context = conn.create_workspace_context(new_context).await?; + + tracing::info!( + target: TRACING_TARGET, + context_id = %context.id, + "Context created", + ); + + Ok((StatusCode::CREATED, Json(Context::from_model(context)))) +} + +fn create_context_docs(op: TransformOperation) -> TransformOperation { + op.summary("Create context") + .description( + "Creates a new context file for the workspace via multipart upload. \ + The file content is encrypted and stored in NATS object storage. \ + Expects fields: 'name' (text), 'file' (JSON file), 'description' (optional text).", + ) + .response::<201, Json>() + .response::<400, Json>() + .response::<401, Json>() + .response::<403, Json>() +} + +/// Lists all contexts for a workspace. +#[tracing::instrument( + skip_all, + fields( + account_id = %auth_state.account_id, + workspace_id = %path_params.workspace_id, + ) +)] +async fn list_contexts( + State(pg_client): State, + AuthState(auth_state): AuthState, + Path(path_params): Path, + Query(pagination): Query, +) -> Result<(StatusCode, Json)> { + tracing::debug!(target: TRACING_TARGET, "Listing workspace contexts"); + + let mut conn = pg_client.get_connection().await?; + + auth_state + .authorize_workspace( + &mut conn, + path_params.workspace_id, + Permission::ViewContexts, + ) + .await?; + + let page = conn + .cursor_list_workspace_contexts(path_params.workspace_id, pagination.into()) + .await?; + + tracing::debug!( + target: TRACING_TARGET, + context_count = page.items.len(), + "Workspace contexts listed", + ); + + Ok(( + StatusCode::OK, + Json(ContextsPage::from_cursor_page(page, Context::from_model)), + )) +} + +fn list_contexts_docs(op: TransformOperation) -> TransformOperation { + op.summary("List contexts") + .description("Returns all context files for the workspace with metadata.") + .response::<200, Json>() + .response::<401, Json>() + .response::<403, Json>() +} + +/// Retrieves a specific workspace context. +#[tracing::instrument( + skip_all, + fields( + account_id = %auth_state.account_id, + context_id = %path_params.context_id, + ) +)] +async fn read_context( + State(pg_client): State, + AuthState(auth_state): AuthState, + Path(path_params): Path, +) -> Result<(StatusCode, Json)> { + tracing::debug!(target: TRACING_TARGET, "Reading workspace context"); + + let mut conn = pg_client.get_connection().await?; + + let context = find_context(&mut conn, path_params.context_id).await?; + + auth_state + .authorize_workspace(&mut conn, context.workspace_id, Permission::ViewContexts) + .await?; + + tracing::debug!(target: TRACING_TARGET, "Workspace context read"); + + Ok((StatusCode::OK, Json(Context::from_model(context)))) +} + +fn read_context_docs(op: TransformOperation) -> TransformOperation { + op.summary("Get context") + .description("Returns context file metadata.") + .response::<200, Json>() + .response::<401, Json>() + .response::<403, Json>() + .response::<404, Json>() +} + +/// Updates a workspace context. +/// +/// Updates context metadata and optionally replaces the content via multipart. +/// Expects a multipart form with optional fields: +/// - `name`: New context name (text field) +/// - `description`: New description (text field) +/// - `file`: Replacement JSON context file (file field) +#[tracing::instrument( + skip_all, + fields( + account_id = %auth_state.account_id, + context_id = %path_params.context_id, + ) +)] +async fn update_context( + State(pg_client): State, + State(nats_client): State, + State(master_key): State, + AuthState(auth_state): AuthState, + Path(path_params): Path, + Multipart(mut multipart): Multipart, +) -> Result<(StatusCode, Json)> { + tracing::debug!(target: TRACING_TARGET, "Updating workspace context"); + + let mut conn = pg_client.get_connection().await?; + + let existing = find_context(&mut conn, path_params.context_id).await?; + + auth_state + .authorize_workspace(&mut conn, existing.workspace_id, Permission::ManageContexts) + .await?; + + let mut name: Option = None; + let mut description: Option> = None; + let mut file_content: Option> = None; + + while let Some(field) = multipart.next_field().await.map_err(|err| { + ErrorKind::BadRequest + .with_message("Invalid multipart data") + .with_context(format!("Failed to parse multipart form: {}", err)) + })? { + let field_name = field.name().unwrap_or_default().to_string(); + + match field_name.as_str() { + "name" => { + name = Some(field.text().await.map_err(|err| { + ErrorKind::BadRequest + .with_message("Failed to read name field") + .with_context(err.to_string()) + })?); + } + "description" => { + let text = field.text().await.map_err(|err| { + ErrorKind::BadRequest + .with_message("Failed to read description field") + .with_context(err.to_string()) + })?; + description = Some(if text.is_empty() { None } else { Some(text) }); + } + "file" => { + let bytes = field.bytes().await.map_err(|err| { + ErrorKind::BadRequest + .with_message("Failed to read file content") + .with_context(err.to_string()) + })?; + + serde_json::from_slice::(&bytes).map_err(|err| { + ErrorKind::BadRequest + .with_message("Context file must be valid JSON") + .with_context(err.to_string()) + })?; + + file_content = Some(bytes.to_vec()); + } + _ => {} + } + } + + let mut updates = UpdateWorkspaceContext { + name, + description, + ..Default::default() + }; + + // If file content was provided, encrypt and store new content + if let Some(content) = file_content { + let workspace_key = master_key.derive_workspace_key(existing.workspace_id); + let encrypted_content = encrypt(&workspace_key, &content).map_err( + |e: crate::service::crypto::CryptoError| { + ErrorKind::InternalServerError + .with_message("Failed to encrypt context content") + .with_context(e.to_string()) + }, + )?; + + let context_key = ContextKey::new(existing.workspace_id, existing.id); + let context_store = nats_client + .object_store::() + .await?; + + let reader = std::io::Cursor::new(encrypted_content); + let put_result = context_store.put(&context_key, reader).await?; + + updates.storage_key = Some(context_key.to_string()); + updates.content_size = Some(put_result.size() as i64); + updates.content_hash = Some(put_result.sha256().to_vec()); + } + + let context = conn + .update_workspace_context(path_params.context_id, updates) + .await?; + + tracing::info!(target: TRACING_TARGET, "Context updated"); + + Ok((StatusCode::OK, Json(Context::from_model(context)))) +} + +fn update_context_docs(op: TransformOperation) -> TransformOperation { + op.summary("Update context") + .description( + "Updates context metadata and optionally replaces the content via multipart upload. \ + All fields are optional. If a 'file' field is provided, the content is re-encrypted \ + and stored.", + ) + .response::<200, Json>() + .response::<400, Json>() + .response::<401, Json>() + .response::<403, Json>() + .response::<404, Json>() +} + +/// Deletes a workspace context. +#[tracing::instrument( + skip_all, + fields( + account_id = %auth_state.account_id, + context_id = %path_params.context_id, + ) +)] +async fn delete_context( + State(pg_client): State, + State(nats_client): State, + AuthState(auth_state): AuthState, + Path(path_params): Path, +) -> Result { + tracing::debug!(target: TRACING_TARGET, "Deleting workspace context"); + + let mut conn = pg_client.get_connection().await?; + + let context = find_context(&mut conn, path_params.context_id).await?; + + auth_state + .authorize_workspace(&mut conn, context.workspace_id, Permission::ManageContexts) + .await?; + + // Delete the object from NATS (best effort, context may already be gone) + let context_store = nats_client + .object_store::() + .await?; + let context_key = ContextKey::new(context.workspace_id, context.id); + if let Err(err) = context_store.delete(&context_key).await { + tracing::warn!( + target: TRACING_TARGET, + error = %err, + context_id = %path_params.context_id, + "Failed to delete context object from NATS (proceeding with soft delete)" + ); + } + + conn.delete_workspace_context(path_params.context_id) + .await?; + + tracing::info!(target: TRACING_TARGET, "Context deleted"); + + Ok(StatusCode::NO_CONTENT) +} + +fn delete_context_docs(op: TransformOperation) -> TransformOperation { + op.summary("Delete context") + .description("Soft-deletes the context from the workspace and removes the encrypted content from NATS.") + .response::<204, ()>() + .response::<401, Json>() + .response::<403, Json>() + .response::<404, Json>() +} + +/// Finds a context by ID or returns NotFound error. +async fn find_context( + conn: &mut nvisy_postgres::PgConn, + context_id: Uuid, +) -> Result { + conn.find_workspace_context_by_id(context_id) + .await? + .ok_or_else(|| { + ErrorKind::NotFound + .with_message("Context not found") + .with_resource("context") + }) +} + +/// Returns routes for workspace context management. +pub fn routes() -> ApiRouter { + use aide::axum::routing::*; + + ApiRouter::new() + .api_route( + "/workspaces/{workspaceId}/contexts/", + post_with(create_context, create_context_docs) + .layer(DefaultBodyLimit::max(DEFAULT_MAX_FILE_BODY_SIZE)) + .get_with(list_contexts, list_contexts_docs), + ) + .api_route( + "/contexts/{contextId}/", + get_with(read_context, read_context_docs) + .put_with(update_context, update_context_docs) + .layer(DefaultBodyLimit::max(DEFAULT_MAX_FILE_BODY_SIZE)) + .delete_with(delete_context, delete_context_docs), + ) + .with_path_items(|item| item.tag("Contexts")) +} diff --git a/crates/nvisy-server/src/handler/error/pg_error.rs b/crates/nvisy-server/src/handler/error/pg_error.rs index 1e097a24..8904a577 100644 --- a/crates/nvisy-server/src/handler/error/pg_error.rs +++ b/crates/nvisy-server/src/handler/error/pg_error.rs @@ -33,6 +33,7 @@ impl From for Error<'static> { ConstraintViolation::PipelineRun(c) => c.into(), ConstraintViolation::PipelineArtifact(c) => c.into(), ConstraintViolation::WorkspaceConnection(c) => c.into(), + ConstraintViolation::WorkspaceContext(c) => c.into(), } } } diff --git a/crates/nvisy-server/src/handler/error/pg_pipeline.rs b/crates/nvisy-server/src/handler/error/pg_pipeline.rs index c847f783..03efec7c 100644 --- a/crates/nvisy-server/src/handler/error/pg_pipeline.rs +++ b/crates/nvisy-server/src/handler/error/pg_pipeline.rs @@ -2,7 +2,7 @@ use nvisy_postgres::types::{ PipelineArtifactConstraints, PipelineConstraints, PipelineRunConstraints, - WorkspaceConnectionConstraints, + WorkspaceConnectionConstraints, WorkspaceContextConstraints, }; use crate::handler::{Error, ErrorKind}; @@ -85,3 +85,36 @@ impl From for Error<'static> { error.with_resource("workspace_connection") } } + +impl From for Error<'static> { + fn from(c: WorkspaceContextConstraints) -> Self { + let error = + match c { + WorkspaceContextConstraints::NameLength => ErrorKind::BadRequest + .with_message("Context name must be between 1 and 255 characters"), + WorkspaceContextConstraints::DescriptionLength => ErrorKind::BadRequest + .with_message("Context description must be at most 4096 characters"), + WorkspaceContextConstraints::MimeTypeLength => ErrorKind::BadRequest + .with_message("MIME type must be between 1 and 128 characters"), + WorkspaceContextConstraints::StorageKeyLength => { + ErrorKind::BadRequest.with_message("Storage key length exceeds maximum limit") + } + WorkspaceContextConstraints::ContentSizePositive => { + ErrorKind::BadRequest.with_message("Content size must be greater than zero") + } + WorkspaceContextConstraints::ContentHashLength => { + ErrorKind::BadRequest.with_message("Content hash must be exactly 32 bytes") + } + WorkspaceContextConstraints::MetadataSize => ErrorKind::BadRequest + .with_message("Context metadata size exceeds maximum limit"), + WorkspaceContextConstraints::NameUnique => ErrorKind::Conflict + .with_message("A context with this name already exists in the workspace"), + WorkspaceContextConstraints::UpdatedAfterCreated + | WorkspaceContextConstraints::DeletedAfterCreated => { + ErrorKind::InternalServerError.into_error() + } + }; + + error.with_resource("workspace_context") + } +} diff --git a/crates/nvisy-server/src/handler/files.rs b/crates/nvisy-server/src/handler/files.rs index 3df8f33e..4429b4e9 100644 --- a/crates/nvisy-server/src/handler/files.rs +++ b/crates/nvisy-server/src/handler/files.rs @@ -171,15 +171,14 @@ async fn process_single_file( // Step 3: Publish job to queue (use Postgres-generated file ID) let job = FileJob::new(created_file.id, file_key.to_string(), file_extension, ()); - ctx.publisher.publish(&job).await.map_err(|err| { + if let Err(err) = ctx.publisher.publish(&job).await { tracing::error!( target: TRACING_TARGET, error = %err, file_id = %created_file.id, - "Failed to publish file job" + "Failed to publish file job, file stored but not queued for processing" ); - ErrorKind::InternalServerError.with_message("Failed to queue file for processing") - })?; + } tracing::debug!( target: TRACING_TARGET, @@ -227,13 +226,21 @@ async fn upload_file( }; let mut uploaded_files = Vec::new(); - while let Some(field) = multipart.next_field().await.map_err(|err| { tracing::error!(target: TRACING_TARGET, error = %err, "Failed to read multipart field"); ErrorKind::BadRequest .with_message("Invalid multipart data") .with_context(format!("Failed to parse multipart form: {}", err)) })? { + if field.file_name().is_none() { + tracing::debug!( + target: TRACING_TARGET, + name = ?field.name(), + "Skipping non-file multipart field" + ); + continue; + } + let created_file = process_single_file(&mut conn, &ctx, field).await?; uploaded_files.push(response::File::from_model(created_file)); } diff --git a/crates/nvisy-server/src/handler/mod.rs b/crates/nvisy-server/src/handler/mod.rs index 39b96fc2..ed6e1efa 100644 --- a/crates/nvisy-server/src/handler/mod.rs +++ b/crates/nvisy-server/src/handler/mod.rs @@ -7,6 +7,7 @@ mod accounts; mod annotations; mod authentication; mod connections; +mod contexts; mod error; mod files; mod invites; @@ -46,6 +47,7 @@ fn private_routes( .merge(tokens::routes()) .merge(workspaces::routes()) .merge(connections::routes()) + .merge(contexts::routes()) .merge(invites::routes()) .merge(members::routes()) .merge(webhooks::routes()) diff --git a/crates/nvisy-server/src/handler/monitors.rs b/crates/nvisy-server/src/handler/monitors.rs index 761daf1a..d5ddac9f 100644 --- a/crates/nvisy-server/src/handler/monitors.rs +++ b/crates/nvisy-server/src/handler/monitors.rs @@ -9,10 +9,9 @@ use aide::transform::TransformOperation; use axum::extract::State; use axum::http::StatusCode; use jiff::Timestamp; -use nvisy_webhook::ServiceStatus; use super::request::CheckHealth; -use super::response::MonitorStatus; +use super::response::{MonitorStatus, ServiceStatus}; use crate::extract::{AuthState, Json, Version}; use crate::handler::Result; use crate::service::{HealthCache, ServiceState}; diff --git a/crates/nvisy-server/src/handler/request/contexts.rs b/crates/nvisy-server/src/handler/request/contexts.rs new file mode 100644 index 00000000..45fe2754 --- /dev/null +++ b/crates/nvisy-server/src/handler/request/contexts.rs @@ -0,0 +1,27 @@ +//! Context request types. + +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; +use validator::Validate; + +/// Path parameters for context operations. +#[must_use] +#[derive(Debug, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ContextPathParams { + /// Unique identifier of the context. + pub context_id: Uuid, +} + +/// Request payload for updating an existing workspace context. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Validate)] +#[serde(rename_all = "camelCase")] +pub struct UpdateContext { + /// Human-readable context name. + #[validate(length(min = 1, max = 255))] + pub name: Option, + /// Context description. + #[validate(length(max = 4096))] + pub description: Option>, +} diff --git a/crates/nvisy-server/src/handler/request/mod.rs b/crates/nvisy-server/src/handler/request/mod.rs index 29ed4fd4..66626073 100644 --- a/crates/nvisy-server/src/handler/request/mod.rs +++ b/crates/nvisy-server/src/handler/request/mod.rs @@ -4,6 +4,7 @@ mod accounts; mod annotations; mod authentications; mod connections; +mod contexts; mod files; mod invites; mod members; @@ -20,6 +21,7 @@ pub use accounts::*; pub use annotations::*; pub use authentications::*; pub use connections::*; +pub use contexts::*; pub use files::*; pub use invites::*; pub use members::*; diff --git a/crates/nvisy-server/src/handler/response/contexts.rs b/crates/nvisy-server/src/handler/response/contexts.rs new file mode 100644 index 00000000..ffed2b03 --- /dev/null +++ b/crates/nvisy-server/src/handler/response/contexts.rs @@ -0,0 +1,57 @@ +//! Context response types. + +use jiff::Timestamp; +use nvisy_postgres::model::WorkspaceContext; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::Page; + +/// Response type for a workspace context. +/// +/// Note: The encrypted content is stored in NATS and never exposed +/// in API responses. Only metadata is returned. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Context { + /// Unique context identifier. + pub id: Uuid, + /// Workspace this context belongs to. + pub workspace_id: Uuid, + /// Account that created this context. + pub account_id: Uuid, + /// Human-readable context name. + pub name: String, + /// Context description. + #[serde(skip_serializing_if = "Option::is_none")] + pub description: Option, + /// Content MIME type. + pub mime_type: String, + /// Size of the content in bytes. + pub content_size: i64, + /// When the context was created. + pub created_at: Timestamp, + /// When the context was last updated. + pub updated_at: Timestamp, +} + +/// Paginated list of contexts. +pub type ContextsPage = Page; + +impl Context { + /// Creates a response from a database model. + pub fn from_model(context: WorkspaceContext) -> Self { + Self { + id: context.id, + workspace_id: context.workspace_id, + account_id: context.account_id, + name: context.name, + description: context.description, + mime_type: context.mime_type, + content_size: context.content_size, + created_at: context.created_at.into(), + updated_at: context.updated_at.into(), + } + } +} diff --git a/crates/nvisy-server/src/handler/response/mod.rs b/crates/nvisy-server/src/handler/response/mod.rs index f61b64ed..185e8861 100644 --- a/crates/nvisy-server/src/handler/response/mod.rs +++ b/crates/nvisy-server/src/handler/response/mod.rs @@ -10,6 +10,7 @@ mod annotations; mod artifacts; mod authentications; mod connections; +mod contexts; mod errors; mod files; mod invites; @@ -28,6 +29,7 @@ pub use annotations::*; pub use artifacts::*; pub use authentications::*; pub use connections::*; +pub use contexts::*; pub use errors::*; pub use files::*; pub use invites::*; diff --git a/crates/nvisy-server/src/handler/response/monitors.rs b/crates/nvisy-server/src/handler/response/monitors.rs index 573423b1..f1ae9e83 100644 --- a/crates/nvisy-server/src/handler/response/monitors.rs +++ b/crates/nvisy-server/src/handler/response/monitors.rs @@ -1,10 +1,32 @@ //! Monitor response types. use jiff::Timestamp; -use nvisy_webhook::ServiceStatus; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +/// Represents the operational status of a service. +#[derive( + Debug, + Default, + Clone, + Copy, + PartialEq, + Eq, + Serialize, + Deserialize, + JsonSchema +)] +#[serde(rename_all = "snake_case")] +pub enum ServiceStatus { + /// Service is operating normally. + #[default] + Healthy, + /// Service is operating with some issues but still functional. + Degraded, + /// Service is not operational. + Unhealthy, +} + /// System monitoring status response. #[must_use] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] diff --git a/crates/nvisy-server/src/service/security/master_key.rs b/crates/nvisy-server/src/service/security/master_key.rs index 535150de..7a8120a9 100644 --- a/crates/nvisy-server/src/service/security/master_key.rs +++ b/crates/nvisy-server/src/service/security/master_key.rs @@ -45,7 +45,7 @@ impl MasterKeyConfig { /// /// This is a thin wrapper around [`EncryptionKey`] that adds file-based loading /// and tracing. The underlying key is used exclusively to derive per-workspace -/// keys via HKDF-SHA256 — it is never used for encryption directly. +/// keys via HKDF-SHA256: it is never used for encryption directly. #[derive(Clone)] pub struct MasterKey { inner: Arc, diff --git a/crates/nvisy-webhook/Cargo.toml b/crates/nvisy-webhook/Cargo.toml index ec7ebdea..c223b49b 100644 --- a/crates/nvisy-webhook/Cargo.toml +++ b/crates/nvisy-webhook/Cargo.toml @@ -25,7 +25,7 @@ rustdoc-args = ["--cfg", "docsrs"] default = [] # Reqwest-based HTTP client for webhook delivery -reqwest = ["dep:reqwest", "dep:hmac", "dep:sha2", "dep:hex"] +reqwest = ["dep:reqwest", "dep:reqwest-middleware", "dep:reqwest-retry", "dep:reqwest-tracing", "dep:hmac", "dep:sha2", "dep:hex"] # CLI configuration support: enables clap derives for config types config = ["reqwest", "dep:clap"] @@ -43,26 +43,25 @@ async-trait = { workspace = true } thiserror = { workspace = true } strum = { workspace = true, features = [] } -# String types -hipstr = { workspace = true, features = [] } - # Observability tracing = { workspace = true } # (De)serialization serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = [] } - -# OpenAPI documentation (optional) schemars = { workspace = true, optional = true } # Commonly used datatypes +hipstr = { workspace = true, features = [] } uuid = { workspace = true, features = ["serde", "v4", "v7"] } jiff = { workspace = true, features = ["serde"] } url = { workspace = true, features = ["serde"] } # Reqwest feature dependencies (optional) reqwest = { workspace = true, features = ["json"], optional = true } +reqwest-middleware = { workspace = true, optional = true } +reqwest-retry = { workspace = true, optional = true } +reqwest-tracing = { workspace = true, optional = true } # Security hmac = { workspace = true, optional = true, features = [] } diff --git a/crates/nvisy-webhook/src/client/health.rs b/crates/nvisy-webhook/src/client/health.rs new file mode 100644 index 00000000..03cdc0c8 --- /dev/null +++ b/crates/nvisy-webhook/src/client/health.rs @@ -0,0 +1,34 @@ +//! Health check result for webhook providers. + +use std::time::Duration; + +/// Health check result from a webhook provider. +#[derive(Debug, Clone)] +pub struct ServiceHealth { + healthy: bool, + /// How long the health check took. + pub latency: Duration, +} + +impl ServiceHealth { + /// Creates a healthy result. + pub fn healthy(latency: Duration) -> Self { + Self { + healthy: true, + latency, + } + } + + /// Creates an unhealthy result. + pub fn unhealthy(latency: Duration) -> Self { + Self { + healthy: false, + latency, + } + } + + /// Returns whether the service is operational. + pub fn is_healthy(&self) -> bool { + self.healthy + } +} diff --git a/crates/nvisy-webhook/src/client/mod.rs b/crates/nvisy-webhook/src/client/mod.rs new file mode 100644 index 00000000..626f10e9 --- /dev/null +++ b/crates/nvisy-webhook/src/client/mod.rs @@ -0,0 +1,25 @@ +//! Webhook client types (requests, responses, health, and service wrapper). + +mod health; +mod request; +mod response; +mod service; + +pub use health::ServiceHealth; +pub use request::{WebhookContext, WebhookPayload, WebhookRequest}; +pub use response::WebhookResponse; +pub use service::WebhookService; + +use crate::Result; + +/// Core trait for webhook delivery operations. +/// +/// Implement this trait to create custom webhook delivery providers. +#[async_trait::async_trait] +pub trait WebhookProvider: Send + Sync { + /// Delivers a webhook payload to the specified endpoint. + async fn deliver(&self, request: &WebhookRequest) -> Result; + + /// Performs a health check on the webhook provider. + async fn health_check(&self) -> Result; +} diff --git a/crates/nvisy-webhook/src/request.rs b/crates/nvisy-webhook/src/client/request.rs similarity index 92% rename from crates/nvisy-webhook/src/request.rs rename to crates/nvisy-webhook/src/client/request.rs index 417aeab1..de696286 100644 --- a/crates/nvisy-webhook/src/request.rs +++ b/crates/nvisy-webhook/src/client/request.rs @@ -9,7 +9,7 @@ use url::Url; use uuid::Uuid; /// A webhook delivery request. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] pub struct WebhookRequest { /// Unique identifier for this request. @@ -30,10 +30,26 @@ pub struct WebhookRequest { #[cfg_attr(feature = "schema", schemars(with = "Option"))] pub timeout: Option, /// HMAC-SHA256 signing secret for request authentication. - #[serde(default, skip_serializing_if = "Option::is_none")] + #[serde(default, skip_serializing)] + #[cfg_attr(feature = "schema", schemars(skip))] pub secret: Option, } +impl std::fmt::Debug for WebhookRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("WebhookRequest") + .field("request_id", &self.request_id) + .field("url", &self.url) + .field("event", &self.event) + .field("message", &self.message) + .field("context", &self.context) + .field("headers", &self.headers) + .field("timeout", &self.timeout) + .field("secret", &self.secret.as_ref().map(|_| "[REDACTED]")) + .finish() + } +} + impl WebhookRequest { /// Creates a new webhook request. pub fn new( diff --git a/crates/nvisy-webhook/src/response.rs b/crates/nvisy-webhook/src/client/response.rs similarity index 57% rename from crates/nvisy-webhook/src/response.rs rename to crates/nvisy-webhook/src/client/response.rs index 91fed3e7..90385897 100644 --- a/crates/nvisy-webhook/src/response.rs +++ b/crates/nvisy-webhook/src/client/response.rs @@ -40,19 +40,6 @@ impl WebhookResponse { pub fn duration(&self) -> jiff::Span { self.started_at.until(self.finished_at).unwrap_or_default() } - - /// Checks if the response indicates a retryable error. - pub fn is_retryable(&self) -> bool { - if self.is_success() { - return false; - } - - // Network errors (status 0) or server errors (5xx) or specific client errors are retryable - self.status_code == 0 - || self.status_code >= 500 - || self.status_code == 408 - || self.status_code == 429 - } } #[cfg(test)] @@ -71,27 +58,11 @@ mod tests { } #[test] - fn test_is_retryable() { + fn test_failure_response() { let started_at = Timestamp::now(); - // Success is not retryable - assert!(!WebhookResponse::new(Uuid::new_v4(), 200, started_at).is_retryable()); - - // 5xx errors are retryable - assert!(WebhookResponse::new(Uuid::new_v4(), 500, started_at).is_retryable()); - assert!(WebhookResponse::new(Uuid::new_v4(), 503, started_at).is_retryable()); - - // 429 Too Many Requests is retryable - assert!(WebhookResponse::new(Uuid::new_v4(), 429, started_at).is_retryable()); - - // 408 Request Timeout is retryable - assert!(WebhookResponse::new(Uuid::new_v4(), 408, started_at).is_retryable()); - - // 4xx errors (except 408, 429) are not retryable - assert!(!WebhookResponse::new(Uuid::new_v4(), 400, started_at).is_retryable()); - assert!(!WebhookResponse::new(Uuid::new_v4(), 404, started_at).is_retryable()); - - // Network errors (status 0) are retryable - assert!(WebhookResponse::new(Uuid::new_v4(), 0, started_at).is_retryable()); + assert!(!WebhookResponse::new(Uuid::new_v4(), 500, started_at).is_success()); + assert!(!WebhookResponse::new(Uuid::new_v4(), 404, started_at).is_success()); + assert!(!WebhookResponse::new(Uuid::new_v4(), 0, started_at).is_success()); } } diff --git a/crates/nvisy-webhook/src/client/service.rs b/crates/nvisy-webhook/src/client/service.rs new file mode 100644 index 00000000..352c0635 --- /dev/null +++ b/crates/nvisy-webhook/src/client/service.rs @@ -0,0 +1,43 @@ +//! Webhook service wrapper. + +use std::fmt; +use std::sync::Arc; + +use super::{ServiceHealth, WebhookProvider, WebhookRequest, WebhookResponse}; +use crate::Result; + +/// Webhook service wrapper for dependency injection. +/// +/// Wraps any [`WebhookProvider`] in an `Arc` for cheap cloning across tasks. +#[derive(Clone)] +pub struct WebhookService { + inner: Arc, +} + +impl fmt::Debug for WebhookService { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("WebhookService").finish_non_exhaustive() + } +} + +impl WebhookService { + /// Create a new webhook service wrapper. + pub fn new

(provider: P) -> Self + where + P: WebhookProvider + 'static, + { + Self { + inner: Arc::new(provider), + } + } + + /// Delivers a webhook payload to the specified endpoint. + pub async fn deliver(&self, request: &WebhookRequest) -> Result { + self.inner.deliver(request).await + } + + /// Performs a health check on the underlying webhook provider. + pub async fn health_check(&self) -> Result { + self.inner.health_check().await + } +} diff --git a/crates/nvisy-webhook/src/health.rs b/crates/nvisy-webhook/src/health.rs deleted file mode 100644 index 6c09d181..00000000 --- a/crates/nvisy-webhook/src/health.rs +++ /dev/null @@ -1,83 +0,0 @@ -//! Health monitoring types for webhook services. - -use std::collections::HashMap; -use std::time::Duration; - -use jiff::Timestamp; -#[cfg(feature = "schema")] -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -/// Represents the operational status of a service. -#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[cfg_attr(feature = "schema", derive(JsonSchema))] -#[serde(rename_all = "snake_case")] -pub enum ServiceStatus { - /// Service is operating normally - #[default] - Healthy, - /// Service is operating with some issues but still functional - Degraded, - /// Service is not operational - Unhealthy, -} - -/// Health information for a service. -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct ServiceHealth { - /// Current service status - pub status: ServiceStatus, - /// Response time for the health check - pub response: Option, - /// Optional message describing the current state - pub message: Option, - /// Timestamp when the health check was performed - pub checked_at: Timestamp, - /// Additional metrics about the service - pub metrics: HashMap, -} - -impl ServiceHealth { - /// Creates a new healthy service health report. - pub fn healthy() -> Self { - Self { - status: ServiceStatus::Healthy, - checked_at: Timestamp::now(), - ..Default::default() - } - } - - /// Creates a new degraded service health report. - pub fn degraded(message: impl Into) -> Self { - Self { - status: ServiceStatus::Degraded, - message: Some(message.into()), - checked_at: Timestamp::now(), - ..Default::default() - } - } - - /// Creates a new unhealthy service health report. - pub fn unhealthy(message: impl Into) -> Self { - Self { - status: ServiceStatus::Unhealthy, - message: Some(message.into()), - checked_at: Timestamp::now(), - metrics: HashMap::new(), - ..Default::default() - } - } - - /// Sets the response time for this health check. - pub fn with_response_time(mut self, response_time: Duration) -> Self { - self.response = Some(response_time); - self - } - - /// Adds a metric to the health report. - pub fn with_metric(mut self, key: impl Into, value: Value) -> Self { - self.metrics.insert(key.into(), value); - self - } -} diff --git a/crates/nvisy-webhook/src/lib.rs b/crates/nvisy-webhook/src/lib.rs index a8c493eb..050a7196 100644 --- a/crates/nvisy-webhook/src/lib.rs +++ b/crates/nvisy-webhook/src/lib.rs @@ -2,34 +2,18 @@ #![cfg_attr(docsrs, feature(doc_cfg))] #![doc = include_str!("../README.md")] +mod client; mod error; -mod health; -mod service; - -pub mod request; -pub mod response; #[cfg(feature = "reqwest")] #[cfg_attr(docsrs, doc(cfg(feature = "reqwest")))] pub mod reqwest; +pub use client::{ + ServiceHealth, WebhookContext, WebhookPayload, WebhookProvider, WebhookRequest, + WebhookResponse, WebhookService, +}; pub use error::{BoxedError, Error, ErrorKind, Result}; -pub use health::{ServiceHealth, ServiceStatus}; -pub use request::{WebhookContext, WebhookPayload, WebhookRequest}; -pub use response::WebhookResponse; -pub use service::WebhookService; /// Tracing target for webhook operations. pub const TRACING_TARGET: &str = "nvisy_service::webhook"; - -/// Core trait for webhook delivery operations. -/// -/// Implement this trait to create custom webhook delivery providers. -#[async_trait::async_trait] -pub trait WebhookProvider: Send + Sync { - /// Delivers a webhook payload to the specified endpoint. - async fn deliver(&self, request: &WebhookRequest) -> Result; - - /// Performs a health check on the webhook provider. - async fn health_check(&self) -> Result; -} diff --git a/crates/nvisy-webhook/src/reqwest/client.rs b/crates/nvisy-webhook/src/reqwest/client.rs index 139de987..c66cfb58 100644 --- a/crates/nvisy-webhook/src/reqwest/client.rs +++ b/crates/nvisy-webhook/src/reqwest/client.rs @@ -1,10 +1,15 @@ //! Reqwest-based HTTP client for webhook delivery. use std::sync::Arc; +use std::time::Duration; use hmac::{Hmac, Mac}; use jiff::Timestamp; use reqwest::Client; +use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; +use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; +use reqwest_tracing::TracingMiddleware; use sha2::Sha256; use super::{Error, ReqwestConfig, TRACING_TARGET}; @@ -12,16 +17,11 @@ use crate::{ServiceHealth, WebhookProvider, WebhookRequest, WebhookResponse, Web type HmacSha256 = Hmac; -/// Inner client that holds the HTTP client and configuration. -struct ReqwestClientInner { - http: Client, - config: ReqwestConfig, -} - /// Reqwest-based HTTP client for delivering webhook payloads to external endpoints. /// /// This client implements the [`WebhookProvider`] trait and provides HTTP-based -/// webhook delivery with request signing support. +/// webhook delivery with request signing, automatic retries with exponential +/// backoff, and distributed tracing. /// /// # Examples /// @@ -39,14 +39,12 @@ struct ReqwestClientInner { /// ``` #[derive(Clone)] pub struct ReqwestClient { - inner: Arc, + http: Arc, } impl std::fmt::Debug for ReqwestClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("ReqwestClient") - .field("config", &self.inner.config) - .finish_non_exhaustive() + f.debug_struct("ReqwestClient").finish_non_exhaustive() } } @@ -59,36 +57,33 @@ impl ReqwestClient { tracing::debug!( target: TRACING_TARGET, timeout_ms = timeout.as_millis(), + max_retries = config.max_retries, "Creating reqwest client" ); - let http = Client::builder() + let base_client = Client::builder() .timeout(timeout) .user_agent(&user_agent) .build() .expect("failed to create HTTP client"); - let inner = ReqwestClientInner { http, config }; - let client = Self { - inner: Arc::new(inner), - }; + let retry_policy = ExponentialBackoff::builder() + .retry_bounds(config.min_retry_interval(), config.max_retry_interval()) + .build_with_max_retries(config.max_retries); + + let http = ClientBuilder::new(base_client) + .with(TracingMiddleware::default()) + .with(RetryTransientMiddleware::new_with_policy(retry_policy)) + .build(); tracing::info!( target: TRACING_TARGET, "Reqwest client created successfully" ); - client - } - - /// Gets the underlying HTTP client. - pub(crate) fn http(&self) -> &Client { - &self.inner.http - } - - /// Gets the client configuration. - pub fn config(&self) -> &ReqwestConfig { - &self.inner.config + Self { + http: Arc::new(http), + } } /// Converts this client into a [`WebhookService`] for use with dependency injection. @@ -98,13 +93,15 @@ impl ReqwestClient { /// Signs a payload using HMAC-SHA256. /// - /// The signature is computed over: `{timestamp}.{payload}` + /// The signature is computed over the raw bytes: `{timestamp}.{payload}`. pub fn sign_payload(secret: &str, timestamp: i64, payload: &[u8]) -> String { - let signing_input = format!("{}.{}", timestamp, String::from_utf8_lossy(payload)); + let timestamp_bytes = timestamp.to_string(); let mut mac = HmacSha256::new_from_slice(secret.as_bytes()).expect("HMAC can take key of any size"); - mac.update(signing_input.as_bytes()); + mac.update(timestamp_bytes.as_bytes()); + mac.update(b"."); + mac.update(payload); let result = mac.finalize(); hex::encode(result.into_bytes()) @@ -123,36 +120,29 @@ impl WebhookProvider for ReqwestClient { let started_at = Timestamp::now(); let timestamp = started_at.as_second(); - tracing::debug!( - target: TRACING_TARGET, - request_id = %request.request_id, - url = %request.url, - event = %request.event, - "Delivering webhook" - ); - // Create the payload from the request let payload = request.to_payload(); let payload_bytes = serde_json::to_vec(&payload).map_err(Error::Serde)?; - // Determine the timeout to use - let timeout = request.timeout.unwrap_or_else(|| self.config().timeout()); - // Build the HTTP request let mut http_request = self - .http() + .http .post(request.url.as_str()) .header("Content-Type", "application/json") .header("X-Webhook-Event", &request.event) .header("X-Webhook-Timestamp", timestamp.to_string()) - .header("X-Webhook-Request-Id", request.request_id.to_string()) - .timeout(timeout); + .header("X-Webhook-Request-Id", request.request_id.to_string()); + + // Override timeout if the request specifies one + if let Some(timeout) = request.timeout { + http_request = http_request.timeout(timeout); + } // Add HMAC-SHA256 signature if secret is present if let Some(ref secret) = request.secret { let signature = Self::sign_payload(secret, timestamp, &payload_bytes); http_request = - http_request.header("X-Webhook-Signature", format!("sha256={}", signature)); + http_request.header("X-Webhook-Signature", format!("sha256={signature}")); } // Add custom headers @@ -170,27 +160,17 @@ impl WebhookProvider for ReqwestClient { let status_code = http_response.status().as_u16(); let response = WebhookResponse::new(request.request_id, status_code, started_at); - tracing::debug!( - target: TRACING_TARGET, - request_id = %request.request_id, - status_code, - success = response.is_success(), - "Webhook delivery completed" - ); - Ok(response) } async fn health_check(&self) -> crate::Result { - // The client is stateless and always healthy if it was created successfully - Ok(ServiceHealth::healthy()) + Ok(ServiceHealth::healthy(Duration::ZERO)) } } #[cfg(test)] mod tests { use super::*; - use crate::ServiceStatus; #[test] fn test_sign_payload() { @@ -205,17 +185,26 @@ mod tests { assert!(signature.chars().all(|c| c.is_ascii_hexdigit())); } + #[test] + fn test_sign_payload_deterministic() { + let secret = "secret"; + let timestamp = 100i64; + let payload = b"hello"; + + let sig1 = ReqwestClient::sign_payload(secret, timestamp, payload); + let sig2 = ReqwestClient::sign_payload(secret, timestamp, payload); + assert_eq!(sig1, sig2); + } + #[test] fn test_client_creation() { - let config = ReqwestConfig::default(); - let client = ReqwestClient::new(config); - assert!(client.config().user_agent.is_none()); + let _client = ReqwestClient::default(); } #[tokio::test] async fn test_health_check() { let client = ReqwestClient::default(); let health = client.health_check().await.unwrap(); - assert_eq!(health.status, ServiceStatus::Healthy); + assert!(health.is_healthy()); } } diff --git a/crates/nvisy-webhook/src/reqwest/config.rs b/crates/nvisy-webhook/src/reqwest/config.rs index 14598915..0e76b85b 100644 --- a/crates/nvisy-webhook/src/reqwest/config.rs +++ b/crates/nvisy-webhook/src/reqwest/config.rs @@ -9,13 +9,22 @@ use serde::{Deserialize, Serialize}; /// Default timeout for HTTP requests: 30 seconds. pub const DEFAULT_TIMEOUT_SECS: u64 = 30; +/// Default maximum number of retry attempts. +pub const DEFAULT_MAX_RETRIES: u32 = 3; + +/// Default minimum retry interval in milliseconds. +pub const DEFAULT_MIN_RETRY_INTERVAL_MS: u64 = 500; + +/// Default maximum retry interval in milliseconds. +pub const DEFAULT_MAX_RETRY_INTERVAL_MS: u64 = 30_000; + /// Configuration for the reqwest HTTP client. /// /// This configuration is used for webhook delivery and other HTTP operations. #[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr(feature = "config", derive(Args))] pub struct ReqwestConfig { - /// HTTP request timeout in seconds + /// HTTP request timeout in seconds. #[cfg_attr( feature = "config", arg(long = "http-timeout", env = "HTTP_TIMEOUT", default_value = "30") @@ -23,24 +32,75 @@ pub struct ReqwestConfig { #[serde(default = "default_timeout_secs")] pub http_timeout: u64, - /// User-Agent header to send with requests + /// User-Agent header to send with requests. #[cfg_attr( feature = "config", arg(long = "http-user-agent", env = "HTTP_USER_AGENT") )] #[serde(default)] pub user_agent: Option, + + /// Maximum number of retry attempts for transient failures. + #[cfg_attr( + feature = "config", + arg( + long = "http-max-retries", + env = "HTTP_MAX_RETRIES", + default_value = "3" + ) + )] + #[serde(default = "default_max_retries")] + pub max_retries: u32, + + /// Minimum retry interval in milliseconds. + #[cfg_attr( + feature = "config", + arg( + long = "http-min-retry-interval", + env = "HTTP_MIN_RETRY_INTERVAL_MS", + default_value = "500" + ) + )] + #[serde(default = "default_min_retry_interval_ms")] + pub min_retry_interval_ms: u64, + + /// Maximum retry interval in milliseconds. + #[cfg_attr( + feature = "config", + arg( + long = "http-max-retry-interval", + env = "HTTP_MAX_RETRY_INTERVAL_MS", + default_value = "30000" + ) + )] + #[serde(default = "default_max_retry_interval_ms")] + pub max_retry_interval_ms: u64, } fn default_timeout_secs() -> u64 { DEFAULT_TIMEOUT_SECS } +fn default_max_retries() -> u32 { + DEFAULT_MAX_RETRIES +} + +fn default_min_retry_interval_ms() -> u64 { + DEFAULT_MIN_RETRY_INTERVAL_MS +} + +fn default_max_retry_interval_ms() -> u64 { + DEFAULT_MAX_RETRY_INTERVAL_MS +} + impl Default for ReqwestConfig { fn default() -> Self { Self { http_timeout: default_timeout_secs(), user_agent: None, + max_retries: default_max_retries(), + min_retry_interval_ms: default_min_retry_interval_ms(), + max_retry_interval_ms: default_max_retry_interval_ms(), } } } @@ -50,15 +110,10 @@ impl ReqwestConfig { pub fn new(timeout_secs: u64) -> Self { Self { http_timeout: timeout_secs, - user_agent: None, + ..Default::default() } } - /// Returns the timeout as a Duration. - pub fn timeout(&self) -> Duration { - Duration::from_secs(self.http_timeout) - } - /// Returns the effective timeout, using default if zero. pub fn effective_timeout(&self) -> Duration { if self.http_timeout == 0 { @@ -80,6 +135,16 @@ impl ReqwestConfig { format!("nvisy/{}", env!("CARGO_PKG_VERSION")) } + /// Returns the minimum retry interval as a Duration. + pub fn min_retry_interval(&self) -> Duration { + Duration::from_millis(self.min_retry_interval_ms) + } + + /// Returns the maximum retry interval as a Duration. + pub fn max_retry_interval(&self) -> Duration { + Duration::from_millis(self.max_retry_interval_ms) + } + /// Set the timeout in seconds. #[must_use] pub fn with_timeout(mut self, timeout_secs: u64) -> Self { @@ -93,6 +158,21 @@ impl ReqwestConfig { self.user_agent = Some(user_agent.into()); self } + + /// Set the maximum number of retry attempts. + #[must_use] + pub fn with_max_retries(mut self, max_retries: u32) -> Self { + self.max_retries = max_retries; + self + } + + /// Set the retry interval bounds in milliseconds. + #[must_use] + pub fn with_retry_interval(mut self, min_ms: u64, max_ms: u64) -> Self { + self.min_retry_interval_ms = min_ms; + self.max_retry_interval_ms = max_ms; + self + } } #[cfg(test)] @@ -104,24 +184,33 @@ mod tests { let config = ReqwestConfig::default(); assert_eq!(config.http_timeout, 30); assert!(config.user_agent.is_none()); - assert_eq!(config.timeout(), Duration::from_secs(30)); + assert_eq!(config.effective_timeout(), Duration::from_secs(30)); + assert_eq!(config.max_retries, 3); + assert_eq!(config.min_retry_interval_ms, 500); + assert_eq!(config.max_retry_interval_ms, 30_000); } #[test] fn test_new_config() { let config = ReqwestConfig::new(60); assert_eq!(config.http_timeout, 60); - assert_eq!(config.timeout(), Duration::from_secs(60)); + assert_eq!(config.effective_timeout(), Duration::from_secs(60)); + assert_eq!(config.max_retries, DEFAULT_MAX_RETRIES); } #[test] fn test_builder_pattern() { let config = ReqwestConfig::default() .with_timeout(120) - .with_user_agent("custom-agent/1.0"); + .with_user_agent("custom-agent/1.0") + .with_max_retries(5) + .with_retry_interval(1000, 60_000); assert_eq!(config.http_timeout, 120); assert_eq!(config.user_agent, Some("custom-agent/1.0".to_string())); + assert_eq!(config.max_retries, 5); + assert_eq!(config.min_retry_interval_ms, 1000); + assert_eq!(config.max_retry_interval_ms, 60_000); } #[test] @@ -138,4 +227,11 @@ mod tests { let config = ReqwestConfig::default(); assert!(config.effective_user_agent().contains("nvisy")); } + + #[test] + fn test_retry_interval_durations() { + let config = ReqwestConfig::default(); + assert_eq!(config.min_retry_interval(), Duration::from_millis(500)); + assert_eq!(config.max_retry_interval(), Duration::from_millis(30_000)); + } } diff --git a/crates/nvisy-webhook/src/reqwest/error.rs b/crates/nvisy-webhook/src/reqwest/error.rs index cbff2af5..a48d0a7d 100644 --- a/crates/nvisy-webhook/src/reqwest/error.rs +++ b/crates/nvisy-webhook/src/reqwest/error.rs @@ -8,9 +8,9 @@ pub type Result = std::result::Result; /// Error type for reqwest operations. #[derive(Debug, Error)] pub enum Error { - /// HTTP request failed. + /// HTTP request or middleware error. #[error("HTTP error: {0}")] - Reqwest(#[from] reqwest::Error), + Middleware(#[from] reqwest_middleware::Error), /// Serialization error. #[error("Serialization error: {0}")] Serde(#[from] serde_json::Error), @@ -19,7 +19,7 @@ pub enum Error { impl From for crate::Error { fn from(err: Error) -> Self { match err { - Error::Reqwest(e) => { + Error::Middleware(e) => { if e.is_timeout() { crate::Error::new(crate::ErrorKind::Timeout) .with_message(e.to_string()) diff --git a/crates/nvisy-webhook/src/service.rs b/crates/nvisy-webhook/src/service.rs deleted file mode 100644 index abf41e02..00000000 --- a/crates/nvisy-webhook/src/service.rs +++ /dev/null @@ -1,121 +0,0 @@ -//! Webhook service wrapper with observability. - -use std::fmt; -use std::sync::Arc; -use std::time::Instant; - -use super::{ - Result, ServiceHealth, TRACING_TARGET, WebhookProvider, WebhookRequest, WebhookResponse, -}; - -/// Webhook service wrapper with observability. -/// -/// This wrapper adds structured logging to any webhook delivery implementation. -/// The inner service is wrapped in `Arc` for cheap cloning. -#[derive(Clone)] -pub struct WebhookService { - inner: Arc, -} - -impl fmt::Debug for WebhookService { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("WebhookService").finish_non_exhaustive() - } -} - -impl WebhookService { - /// Create a new webhook service wrapper. - pub fn new

(provider: P) -> Self - where - P: WebhookProvider + 'static, - { - Self { - inner: Arc::new(provider), - } - } - - /// Delivers a webhook payload to the specified endpoint. - pub async fn deliver(&self, request: &WebhookRequest) -> Result { - let started_at = Instant::now(); - - tracing::debug!( - target: TRACING_TARGET, - request_id = %request.request_id, - url = %request.url, - timeout_ms = ?request.timeout.map(|t| t.as_millis()), - "Delivering webhook" - ); - - let result = self.inner.deliver(request).await; - let elapsed = started_at.elapsed(); - - match &result { - Ok(response) => { - if response.is_success() { - tracing::debug!( - target: TRACING_TARGET, - request_id = %request.request_id, - response_id = %response.response_id, - status_code = ?response.status_code, - elapsed_ms = elapsed.as_millis(), - "Webhook delivered successfully" - ); - } else { - tracing::warn!( - target: TRACING_TARGET, - request_id = %request.request_id, - response_id = %response.response_id, - status_code = ?response.status_code, - elapsed_ms = elapsed.as_millis(), - "Webhook delivery failed" - ); - } - } - Err(error) => { - tracing::error!( - target: TRACING_TARGET, - request_id = %request.request_id, - error = %error, - elapsed_ms = elapsed.as_millis(), - "Webhook delivery error" - ); - } - } - - result - } - - /// Performs a health check on the underlying webhook provider. - pub async fn health_check(&self) -> Result { - let started_at = Instant::now(); - - tracing::debug!( - target: TRACING_TARGET, - "Performing webhook provider health check" - ); - - let result = self.inner.health_check().await; - let elapsed = started_at.elapsed(); - - match &result { - Ok(health) => { - tracing::debug!( - target: TRACING_TARGET, - status = ?health.status, - elapsed_ms = elapsed.as_millis(), - "Webhook health check completed" - ); - } - Err(error) => { - tracing::error!( - target: TRACING_TARGET, - error = %error, - elapsed_ms = elapsed.as_millis(), - "Webhook health check failed" - ); - } - } - - result - } -} diff --git a/docker/README.md b/docker/README.md index 7a1fdba3..e615a9eb 100644 --- a/docker/README.md +++ b/docker/README.md @@ -7,14 +7,14 @@ Docker configuration for the Nvisy server. Nvisy requires two external services: **PostgreSQL 18+** with the pgvector extension. PostgreSQL serves as the primary -data store for all application state — accounts, workspaces, pipelines, -connections, file metadata — and provides vector similarity search through +data store for all application state (accounts, workspaces, pipelines, +connections, file metadata) and provides vector similarity search through pgvector. The recommended image is `pgvector/pgvector:pg18`. **NATS 2.10+** with JetStream enabled. NATS handles three concerns: pub/sub messaging for real-time events, persistent job queues for asynchronous processing, and object storage for uploaded files. JetStream must be enabled -with sufficient storage allocation — the default configuration uses 1 GB of +with sufficient storage allocation: the default configuration uses 1 GB of memory store and 10 GB of file store. ## Quick Start @@ -86,10 +86,10 @@ variables above. The Dockerfile uses a multi-stage build: -1. **Planner** — generates a dependency recipe with cargo-chef -2. **Builder** — builds dependencies from the recipe (cached), then builds the +1. **Planner:** generates a dependency recipe with cargo-chef +2. **Builder:** builds dependencies from the recipe (cached), then builds the server binary and strips it -3. **Runtime** — minimal Debian image with only the binary and runtime libraries +3. **Runtime:** minimal Debian image with only the binary and runtime libraries The runtime image runs as a non-root user (`nvisy`, UID 1000) and includes a health check endpoint at `/health`. diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 76f92513..97ff4f3b 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -4,8 +4,8 @@ Nvisy is implemented as a workspace-based monorepo in Rust. The server handles HTTP serving, database access, messaging, and pipeline orchestration. Pipeline -execution — including AI transforms, provider integrations, and data -processing — runs in a separate TypeScript runtime, communicating with the +execution (including AI transforms, provider integrations, and data +processing) runs in a separate TypeScript runtime, communicating with the server over NATS. ## Crate Structure @@ -34,8 +34,8 @@ The Rust workspace contains six crates, each with a single responsibility: ## Pipeline Model -A Nvisy pipeline is a directed acyclic graph. Nodes represent operations — -reading data, transforming it, writing results — and edges define the flow +A Nvisy pipeline is a directed acyclic graph. Nodes represent operations: +reading data, transforming it, writing results. Edges define the flow between them. This graph is the central abstraction of the platform: users author it as a JSON document, the compiler validates and optimizes it, and the engine executes it. @@ -49,7 +49,7 @@ execution time for auditability. Every node in the graph falls into one of four categories: -**Source nodes** read data from external systems — relational databases, object +**Source nodes** read data from external systems: relational databases, object stores, or other providers connected through the platform's credential management system. Each source references a stored connection and carries provider-specific parameters. @@ -67,7 +67,7 @@ simultaneously writing embeddings to a vector store. **Switch nodes** route data conditionally. A switch evaluates a condition against each incoming item and directs it to one of two output branches. This -enables type-specific processing within a single pipeline — for example, routing +enables type-specific processing within a single pipeline: for example, routing images through OCR while routing text through NLP extraction. ### Cache Slots @@ -83,7 +83,7 @@ eliminating the indirection at runtime. Before execution, the workflow definition is compiled into an optimized runtime graph. Compilation proceeds in four phases: -1. **Validation** verifies structural correctness — all edge references resolve +1. **Validation** verifies structural correctness: all edge references resolve to existing nodes, at least one source and one sink exist, and the graph is acyclic. @@ -91,7 +91,7 @@ graph. Compilation proceeds in four phases: are wired directly to inputs reading from the same slot, and the intermediate cache nodes are removed from the graph. -3. **Node compilation** converts each definition node into its executable form — +3. **Node compilation** converts each definition node into its executable form: source nodes become input streams, sink nodes become output streams, transforms become processors, and switches become condition evaluators. @@ -107,10 +107,10 @@ before advancing to the next item. This design avoids buffering entire datasets in memory. A single document can expand into thousands of chunks at one transform and contract back into a single -summary at another — the engine handles both directions naturally. +summary at another: the engine handles both directions naturally. Every item carries its own resumption context. Runs can resume from the last -successfully processed item — whether recovering from a failure or continuing +successfully processed item: whether recovering from a failure or continuing incrementally after new data has been added to the source. This makes pipelines suitable for both batch reprocessing and ongoing incremental ingestion. @@ -124,24 +124,24 @@ artifact set. ## Data Model The data model is organized around workspaces. A workspace is the tenant -boundary — all resources belong to exactly one workspace, and all access is +boundary: all resources belong to exactly one workspace, and all access is scoped accordingly. **Workspaces** contain three primary resource types: -- **Pipelines** — Workflow definitions stored as JSON graphs. Each pipeline has +- **Pipelines:** Workflow definitions stored as JSON graphs. Each pipeline has a lifecycle status (draft, enabled, disabled) and optional cron scheduling. - When a pipeline executes, it produces a **run** — an immutable record of the + When a pipeline executes, it produces a **run**: an immutable record of the execution with a snapshot of the definition, execution logs, and timing. Runs produce **artifacts**, which link back to files and classify them as input, output, or intermediate. -- **Connections** — Encrypted references to external systems. Each connection +- **Connections:** Encrypted references to external systems. Each connection stores a provider identifier and an encrypted blob containing credentials and configuration. Encryption uses workspace-derived keys so that a compromise of one workspace cannot expose another's credentials. -- **Files** — Binary objects stored in NATS object storage with metadata in +- **Files:** Binary objects stored in NATS object storage with metadata in PostgreSQL. Files support versioning through parent-child chains and are classified by source: uploaded by a user, imported from an external system, or generated by a pipeline run. diff --git a/docs/INTELLIGENCE.md b/docs/INTELLIGENCE.md index 7fe47048..f1793d1a 100644 --- a/docs/INTELLIGENCE.md +++ b/docs/INTELLIGENCE.md @@ -10,8 +10,8 @@ that accepts input, produces output, and can be composed with any other node. Before content can be analyzed or enriched, it must be decomposed into structured elements. -**Partitioning** breaks documents into typed elements — paragraphs, tables, -images, headers, list items — using either fast rule-based heuristics or +**Partitioning** breaks documents into typed elements (paragraphs, tables, +images, headers, list items) using either fast rule-based heuristics or ML-based layout detection. For complex layouts, a vision-language model can be used to interpret page structure directly from rendered images. @@ -45,7 +45,7 @@ material. and per-column descriptions, making tabular data searchable and understandable without reading the raw data. -**Image enrichment** generates descriptions at varying levels of detail — brief +**Image enrichment** generates descriptions at varying levels of detail: brief summaries, detailed descriptions covering people, objects, text, colors, and layout. Generative OCR extracts text from images using vision models rather than traditional OCR engines. Object detection identifies and lists entities present @@ -70,7 +70,7 @@ Beyond per-element transforms, Nvisy provides higher-order intelligence that operates across content boundaries. **Entity resolution** identifies when the same real-world entity appears in -different forms across data — name variations ("IBM" vs. "International Business +different forms across data: name variations ("IBM" vs. "International Business Machines"), role changes, acquisitions, and abbreviations. The resolution pipeline extracts, clusters, disambiguates, links, and propagates entity identities. @@ -80,7 +80,7 @@ different sources, surfacing inconsistencies that would otherwise go unnoticed in large document sets. **Consistency checking** verifies that definitions and terms are used uniformly -across content — for example, ensuring that "confidential" is defined the same +across content: for example, ensuring that "confidential" is defined the same way in every contract. **Coverage analysis** determines whether required topics, clauses, or sections @@ -92,7 +92,7 @@ where and how a document has deviated from its expected form. **Semantic diffing** detects changes in meaning across versions of content, distinguishing substantive changes from cosmetic edits. -**Temporal queries** enable time-aware filtering and analysis — "what changed +**Temporal queries** enable time-aware filtering and analysis: "what changed since last quarter" or "show me the state of this document as of a specific date." @@ -101,9 +101,9 @@ date." Switch nodes route data conditionally within the workflow graph, enabling different processing paths based on data characteristics. -**File category routing** directs data based on content type — text, images, -audio, video, documents, spreadsheets, presentations, code, or archives — -allowing each type to flow through an appropriate processing branch. +**File category routing** directs data based on content type: text, images, +audio, documents, spreadsheets, presentations, code, or archives. This allows +each type to flow through an appropriate processing branch. **Language routing** directs data based on detected content language, with configurable confidence thresholds, enabling language-specific processing within diff --git a/docs/PROVIDERS.md b/docs/PROVIDERS.md index 94f345a5..b66e948e 100644 --- a/docs/PROVIDERS.md +++ b/docs/PROVIDERS.md @@ -4,13 +4,13 @@ An ETL platform is only as useful as the systems it can connect to. Relational databases, object stores, vector databases, document stores, message queues, -search engines, graph databases — each has its own wire protocol, authentication +search engines, graph databases: each has its own wire protocol, authentication model, pagination scheme, and SDK. Nvisy addresses this with a provider abstraction that decouples the core platform from specific external systems. The Rust server manages connections, -credentials, and pipeline orchestration. Provider integrations — the actual -reading from and writing to external systems — run in a separate TypeScript +credentials, and pipeline orchestration. Provider integrations (the actual +reading from and writing to external systems) run in a separate TypeScript runtime, communicating with the server over NATS. ## Provider Abstraction @@ -23,7 +23,7 @@ credentials and parameters. Credentials are encrypted at rest and decrypted only at execution time within the scope of a single run. **Reading** streams data from the source with resumable pagination. Each item -carries its own context — a cursor, token, or offset — so that processing can +carries its own context (a cursor, token, or offset) so that processing can resume from any point, whether recovering from a failure or continuing incrementally after new data has been added to the source. @@ -54,7 +54,7 @@ Adding a provider does not require modifying the Rust server. The process is: 3. Register the provider identifier so the runtime can dispatch to it This design keeps the provider surface area decoupled from the core. The server -does not know or care which specific systems are available — it manages +does not know or care which specific systems are available: it manages connections, credentials, and orchestration while the runtime handles execution. ## Design Principles diff --git a/docs/README.md b/docs/README.md index 8a8e0b2f..80b41662 100644 --- a/docs/README.md +++ b/docs/README.md @@ -9,13 +9,13 @@ execution as compiled workflow graphs. The platform is designed around three ideas: data should flow between any two systems without custom glue code, transformations should be composable and -reusable, and the intelligence applied during processing — extraction, -enrichment, analysis, reasoning — should be a first-class part of the pipeline, +reusable, and the intelligence applied during processing (extraction, +enrichment, analysis, reasoning) should be a first-class part of the pipeline, not a bolted-on afterthought. ## Problem -Organizations accumulate data across dozens of systems — relational databases, +Organizations accumulate data across dozens of systems: relational databases, object stores, vector databases, document repositories, message queues. Moving data between these systems typically requires either rigid, vendor-locked ETL tools that cannot accommodate AI workloads, or bespoke engineering that is @@ -30,7 +30,7 @@ orchestration. ## Design Principles **Workflow-first.** The pipeline is the unit of work. Each workflow is a -directed acyclic graph of typed nodes — sources, transforms, and sinks — +directed acyclic graph of typed nodes (sources, transforms, and sinks) connected by edges. This structure is serializable, versionable, and schedulable. @@ -39,13 +39,13 @@ object stores, vector databases, and other external systems. Adding a new provider requires implementing a small set of protocols without modifying the core engine. -**Intelligence-native.** LLM-powered transforms — extraction, enrichment, -summarization, entity resolution, contradiction detection — sit alongside +**Intelligence-native.** LLM-powered transforms (extraction, enrichment, +summarization, entity resolution, contradiction detection) sit alongside rule-based transforms as equal citizens in the graph. AI is not a separate layer; it is woven into the data flow. **Resumable streaming.** Every data item carries its own pagination context. -Runs can resume from the last processed item — whether recovering from a failure +Runs can resume from the last processed item: whether recovering from a failure or continuing incrementally after new data has been added to the source. **Workspace isolation.** Tenants are cryptographically isolated. Provider diff --git a/docs/SECURITY.md b/docs/SECURITY.md index c5cade33..934078fa 100644 --- a/docs/SECURITY.md +++ b/docs/SECURITY.md @@ -10,7 +10,7 @@ Token validation is multi-layered. The signature is verified against the Ed25519 public key, the expiration is checked, and the token is confirmed against the database to ensure the account still exists and is active. If the administrative status recorded in the token disagrees with the current database state, the -token is rejected — this prevents privilege persistence after role changes. +token is rejected: this prevents privilege persistence after role changes. Passwords are hashed with Argon2id, the OWASP-recommended algorithm for password storage. Each hash uses a unique cryptographically random salt. Password @@ -22,8 +22,8 @@ enumeration. ## Credential Encryption -Provider credentials — database connection strings, API keys, storage access -keys — are encrypted at rest using a two-tier key hierarchy. +Provider credentials (database connection strings, API keys, storage access +keys) are encrypted at rest using a two-tier key hierarchy. A **master encryption key** is a 32-byte secret loaded from a file at server startup. This key is never used directly for encryption. Instead, it derives a @@ -36,7 +36,7 @@ Each workspace key encrypts connection data using **XChaCha20-Poly1305**, an authenticated encryption scheme. The 24-byte nonce is randomly generated per encryption operation, which is safe for random generation given XChaCha20's large nonce space. The ciphertext includes the Poly1305 authentication tag, -which prevents tampering — any modification to the ciphertext causes decryption +which prevents tampering: any modification to the ciphertext causes decryption to fail. Encrypted credentials are decrypted only at pipeline execution time, within the @@ -46,10 +46,10 @@ scope of a single run. They are never exposed through the API. Access control uses a role-based model with four hierarchical workspace roles: -- **Guest** — read-only access to workspace resources -- **Member** — create and edit content, upload files, manage pipelines -- **Admin** — manage members, connections, webhooks, and workspace settings -- **Owner** — delete workspace, transfer ownership, manage all roles +- **Guest:** read-only access to workspace resources +- **Member:** create and edit content, upload files, manage pipelines +- **Admin:** manage members, connections, webhooks, and workspace settings +- **Owner:** delete workspace, transfer ownership, manage all roles Each API operation requires a specific permission, and each permission maps to a minimum required role. Administrators bypass workspace-level permission checks. @@ -65,26 +65,26 @@ purposes. The server applies standard HTTP security headers on all responses: -- **HSTS** — Strict-Transport-Security with a one-year max-age and subdomain +- **HSTS:** Strict-Transport-Security with a one-year max-age and subdomain inclusion, directing browsers to use HTTPS exclusively -- **CSP** — Content-Security-Policy restricting scripts, styles, images, and +- **CSP:** Content-Security-Policy restricting scripts, styles, images, and frame ancestors to prevent XSS and clickjacking -- **X-Frame-Options** — set to DENY, preventing the application from being +- **X-Frame-Options:** set to DENY, preventing the application from being embedded in frames -- **X-Content-Type-Options** — set to nosniff, preventing MIME type sniffing -- **Referrer-Policy** — strict-origin-when-cross-origin, limiting referrer +- **X-Content-Type-Options:** set to nosniff, preventing MIME type sniffing +- **Referrer-Policy:** strict-origin-when-cross-origin, limiting referrer leakage CORS is configurable per deployment, with explicit origin whitelisting, credential support, and preflight caching. -Request body sizes are limited at the middleware layer — separate limits for -JSON payloads and file uploads — to prevent resource exhaustion. +Request body sizes are limited at the middleware layer (separate limits for +JSON payloads and file uploads) to prevent resource exhaustion. ## Input Validation All API request bodies are validated before reaching handler logic. The -validation layer checks field constraints — length, format, range, pattern — and +validation layer checks field constraints (length, format, range, pattern) and returns structured error responses with field-specific messages. This prevents malformed data from reaching the database or business logic. @@ -104,5 +104,5 @@ ID (UUID v7 for traceability), and the signature. Security-relevant events are logged with structured fields using the tracing framework. Successful and failed authentication attempts, authorization decisions, token validation outcomes, and password operations are all recorded -with account IDs, resource IDs, and failure reasons. Sensitive data — passwords, -tokens, encryption keys — is never included in log output. +with account IDs, resource IDs, and failure reasons. Sensitive data (passwords, +tokens, encryption keys) is never included in log output. diff --git a/migrations/2026-01-19-045012_pipelines/down.sql b/migrations/2026-01-19-045012_pipelines/down.sql index 2ee9c88f..6f6f6f50 100644 --- a/migrations/2026-01-19-045012_pipelines/down.sql +++ b/migrations/2026-01-19-045012_pipelines/down.sql @@ -18,6 +18,9 @@ DROP TYPE IF EXISTS PIPELINE_TRIGGER_TYPE; DROP TYPE IF EXISTS PIPELINE_RUN_STATUS; DROP TYPE IF EXISTS PIPELINE_STATUS; +-- Workspace contexts +DROP TABLE IF EXISTS workspace_contexts; + -- Workspace connections DROP TABLE IF EXISTS workspace_connections; diff --git a/migrations/2026-01-19-045012_pipelines/up.sql b/migrations/2026-01-19-045012_pipelines/up.sql index 21d64bc7..f2439859 100644 --- a/migrations/2026-01-19-045012_pipelines/up.sql +++ b/migrations/2026-01-19-045012_pipelines/up.sql @@ -91,6 +91,84 @@ COMMENT ON COLUMN workspace_connections.created_at IS 'Creation timestamp'; COMMENT ON COLUMN workspace_connections.updated_at IS 'Last modification timestamp'; COMMENT ON COLUMN workspace_connections.deleted_at IS 'Soft deletion timestamp'; +-- Workspace context files table (metadata for encrypted context stored in NATS) +CREATE TABLE workspace_contexts ( + -- Primary identifier + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- References + workspace_id UUID NOT NULL REFERENCES workspaces (id) ON DELETE CASCADE, + account_id UUID NOT NULL REFERENCES accounts (id) ON DELETE CASCADE, + + -- Core attributes + name TEXT NOT NULL, + description TEXT DEFAULT NULL, + mime_type TEXT NOT NULL DEFAULT 'application/json', + + CONSTRAINT workspace_contexts_name_length CHECK (length(trim(name)) BETWEEN 1 AND 255), + CONSTRAINT workspace_contexts_description_length CHECK (description IS NULL OR length(description) <= 4096), + CONSTRAINT workspace_contexts_mime_type_length CHECK (length(trim(mime_type)) BETWEEN 1 AND 128), + + -- Storage reference (NATS object store key) + storage_key TEXT NOT NULL, + + CONSTRAINT workspace_contexts_storage_key_length CHECK (length(trim(storage_key)) BETWEEN 1 AND 512), + + -- Content metadata + content_size BIGINT NOT NULL, + content_hash BYTEA NOT NULL, + + CONSTRAINT workspace_contexts_content_size_positive CHECK (content_size > 0), + CONSTRAINT workspace_contexts_content_hash_length CHECK (length(content_hash) = 32), + + -- Metadata (non-encrypted, for filtering/display) + metadata JSONB NOT NULL DEFAULT '{}', + + CONSTRAINT workspace_contexts_metadata_size CHECK (length(metadata::TEXT) BETWEEN 2 AND 65536), + + -- Lifecycle timestamps + created_at TIMESTAMPTZ NOT NULL DEFAULT current_timestamp, + updated_at TIMESTAMPTZ NOT NULL DEFAULT current_timestamp, + deleted_at TIMESTAMPTZ DEFAULT NULL, + + CONSTRAINT workspace_contexts_updated_after_created CHECK (updated_at >= created_at), + CONSTRAINT workspace_contexts_deleted_after_created CHECK (deleted_at IS NULL OR deleted_at >= created_at) +); + +-- Triggers +SELECT setup_updated_at('workspace_contexts'); + +-- Indexes +CREATE INDEX workspace_contexts_workspace_idx + ON workspace_contexts (workspace_id, created_at DESC) + WHERE deleted_at IS NULL; + +CREATE INDEX workspace_contexts_account_idx + ON workspace_contexts (account_id, created_at DESC) + WHERE deleted_at IS NULL; + +CREATE UNIQUE INDEX workspace_contexts_name_unique_idx + ON workspace_contexts (workspace_id, lower(trim(name))) + WHERE deleted_at IS NULL; + +-- Comments +COMMENT ON TABLE workspace_contexts IS + 'Metadata for encrypted context files stored in NATS object storage.'; + +COMMENT ON COLUMN workspace_contexts.id IS 'Unique context identifier'; +COMMENT ON COLUMN workspace_contexts.workspace_id IS 'Parent workspace reference'; +COMMENT ON COLUMN workspace_contexts.account_id IS 'Creator account reference'; +COMMENT ON COLUMN workspace_contexts.name IS 'Human-readable context name (1-255 chars)'; +COMMENT ON COLUMN workspace_contexts.description IS 'Context description (up to 4096 chars)'; +COMMENT ON COLUMN workspace_contexts.mime_type IS 'Content MIME type'; +COMMENT ON COLUMN workspace_contexts.storage_key IS 'NATS object store key for the encrypted content'; +COMMENT ON COLUMN workspace_contexts.content_size IS 'Size of the encrypted content in bytes'; +COMMENT ON COLUMN workspace_contexts.content_hash IS 'SHA-256 hash of the encrypted content'; +COMMENT ON COLUMN workspace_contexts.metadata IS 'Non-encrypted metadata for filtering/display'; +COMMENT ON COLUMN workspace_contexts.created_at IS 'Creation timestamp'; +COMMENT ON COLUMN workspace_contexts.updated_at IS 'Last modification timestamp'; +COMMENT ON COLUMN workspace_contexts.deleted_at IS 'Soft deletion timestamp'; + -- Pipeline status enum CREATE TYPE PIPELINE_STATUS AS ENUM ( 'draft', -- Pipeline is being configured