Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0e6fc77
feat(worker): Add trigger to worker trait
VianneyRuhlmann Feb 11, 2026
305b853
feat(data_pipeline): add SharedRuntime
VianneyRuhlmann Feb 11, 2026
66e9e06
feat(worker): add initial trigger
VianneyRuhlmann Feb 13, 2026
b672262
feat(agent_info): use initial trigger
VianneyRuhlmann Feb 13, 2026
23022a6
feat(stats): implement stats worker
VianneyRuhlmann Feb 13, 2026
6c9c7f3
fix(shared_runtime): fix compile error
VianneyRuhlmann Feb 13, 2026
acc69ae
data-pipeline: pause all workers before joining in before_fork
VianneyRuhlmann Feb 13, 2026
17a9bb8
feat(exporter): use shared runtime in trace exporter
VianneyRuhlmann Feb 16, 2026
238a430
feat(shared_runtime): add worker handle
VianneyRuhlmann Feb 18, 2026
592c24c
refactor(worker): remove stopped status
VianneyRuhlmann Feb 18, 2026
ff6448c
chore(telemetry): move telemetry shutdown to worker
VianneyRuhlmann Feb 20, 2026
0dbce86
chore(shared-runtime): update error types
VianneyRuhlmann Feb 23, 2026
58bff21
chore(shared-runtime): return detailed errors in before_fork
VianneyRuhlmann Feb 26, 2026
16ab63b
chore(runtime): doc
VianneyRuhlmann Feb 27, 2026
3625c0a
test(telemetry): use shared runtime in tests
VianneyRuhlmann Mar 2, 2026
f86890a
test(telemetry): use client builder
VianneyRuhlmann Mar 2, 2026
3d285ae
chore(runtime): fix nit
VianneyRuhlmann Mar 4, 2026
1f8c0cf
refactor(runtime): move shutdown to runtime
VianneyRuhlmann Mar 5, 2026
62a63d6
test(telemetry): Add sleep after send
VianneyRuhlmann Mar 5, 2026
c1270f1
test(telemetry): fix deadlocks in telemetry tests
VianneyRuhlmann Mar 6, 2026
608556e
refactor(runtime): skip shutdown when runtime is None
VianneyRuhlmann Mar 9, 2026
34ec7d9
feat(runtime): add runtime to builder
VianneyRuhlmann Mar 9, 2026
a02eec6
chore(telemetry): remove macro
VianneyRuhlmann Mar 9, 2026
de8bce5
Merge branch 'main' into vianney/implement-shared-runtime
VianneyRuhlmann Mar 9, 2026
3119d5f
docs(runtime): add warnings
VianneyRuhlmann Mar 9, 2026
74425ba
feat(shared_runtime): add shared runtime ffi
VianneyRuhlmann Mar 10, 2026
2869d39
feat(trace_exporter): add shutdown method
VianneyRuhlmann Mar 10, 2026
4b1d0b6
fix(shared_runtime): add on_pause hook to release waker in info fetcher
VianneyRuhlmann Mar 10, 2026
0e74cad
fix(telemetry): add reset hook to telemetry
VianneyRuhlmann Mar 10, 2026
544b840
fix(telemetry): fix spawn and run loop for telemetry
VianneyRuhlmann Mar 10, 2026
a3e2c38
docs(info_fetcher): update doc for running the fetcher
VianneyRuhlmann Mar 10, 2026
67fcf2a
feat(runtime-ffi): remove redundant allocation of Box
VianneyRuhlmann Mar 11, 2026
018028f
Merge branch 'main' into vianney/implement-shared-runtime
VianneyRuhlmann Mar 11, 2026
6420f28
format
VianneyRuhlmann Mar 11, 2026
72f61f6
feat(runtime-ffi): use new handle in trace exporter builder
VianneyRuhlmann Mar 11, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libdd-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ bench = false

[dependencies]
anyhow = "1.0"
async-trait = "0.1"
futures = "0.3"
futures-core = { version = "0.3.0", default-features = false }
futures-util = { version = "0.3.0", default-features = false }
Expand Down
65 changes: 59 additions & 6 deletions libdd-common/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,65 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;

/// Trait representing a generic worker.
///
/// The worker runs an async looping function running periodic tasks.
///
/// This trait can be used to provide wrapper around a worker.
pub trait Worker {
/// Main worker loop
fn run(&mut self) -> impl std::future::Future<Output = ()> + Send;
/// # Lifecycle
/// The worker's `Self::run` method should be executed everytime the `Self::trigger` method returns.
/// On startup `Self::initial_trigger` should be called before `Self::run`.
#[async_trait]
pub trait Worker: std::fmt::Debug {
/// Main worker function
///
/// Code in this function should always use timeout on long-running await calls to avoid
/// blocking forks if an await call takes too long to complete.
async fn run(&mut self);

/// Function called between each `run` to wait for the next run
async fn trigger(&mut self);

/// Alternative trigger called on start to provide custom behavior
/// Defaults to `trigger` behavior.
async fn initial_trigger(&mut self) {
self.trigger().await
}

/// Reset the worker in the child after a fork
fn reset(&mut self) {}

/// Hook called after the worker has been paused (e.g. before a fork).
/// Default is a no-op.
async fn on_pause(&mut self) {}

/// Hook called when the app is shutting down. Can be used to flush remaining data.
async fn shutdown(&mut self) {}
}

// Blanket implementation for boxed trait objects
#[async_trait]
impl Worker for Box<dyn Worker + Send + Sync> {
async fn run(&mut self) {
(**self).run().await
}

async fn trigger(&mut self) {
(**self).trigger().await
}

async fn initial_trigger(&mut self) {
(**self).initial_trigger().await
}

fn reset(&mut self) {
(**self).reset()
}

async fn on_pause(&mut self) {
(**self).on_pause().await
}

async fn shutdown(&mut self) {
(**self).shutdown().await
}
}
1 change: 1 addition & 0 deletions libdd-data-pipeline-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

mod error;
mod response;
mod shared_runtime;
mod trace_exporter;

#[cfg(all(feature = "catch_panic", panic = "unwind"))]
Expand Down
273 changes: 273 additions & 0 deletions libdd-data-pipeline-ffi/src/shared_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
// Copyright 2025-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use libdd_data_pipeline::shared_runtime::{SharedRuntime, SharedRuntimeError};
use std::ffi::{c_char, CString};
use std::ptr::NonNull;
use std::sync::Arc;

/// Error codes for SharedRuntime FFI operations.
#[repr(C)]
#[derive(Copy, Clone, Debug, PartialEq)]
pub enum SharedRuntimeErrorCode {
/// Invalid argument provided (e.g. null handle).
InvalidArgument,
/// The runtime is not available or in an invalid state.
RuntimeUnavailable,
/// Failed to acquire a lock on internal state.
LockFailed,
/// A worker operation failed.
WorkerError,
/// Failed to create the tokio runtime.
RuntimeCreation,
/// Shutdown timed out.
ShutdownTimedOut,
}

/// Error returned by SharedRuntime FFI functions.
#[repr(C)]
pub struct SharedRuntimeFFIError {
pub code: SharedRuntimeErrorCode,
pub msg: *mut c_char,
}

impl SharedRuntimeFFIError {
fn new(code: SharedRuntimeErrorCode, msg: &str) -> Self {
Self {
code,
msg: CString::new(msg).unwrap_or_default().into_raw(),
}
}
}

impl From<SharedRuntimeError> for SharedRuntimeFFIError {
fn from(err: SharedRuntimeError) -> Self {
let code = match &err {
SharedRuntimeError::RuntimeUnavailable => SharedRuntimeErrorCode::RuntimeUnavailable,
SharedRuntimeError::LockFailed(_) => SharedRuntimeErrorCode::LockFailed,
SharedRuntimeError::WorkerError(_) => SharedRuntimeErrorCode::WorkerError,
SharedRuntimeError::RuntimeCreation(_) => SharedRuntimeErrorCode::RuntimeCreation,
SharedRuntimeError::ShutdownTimedOut(_) => SharedRuntimeErrorCode::ShutdownTimedOut,
};
SharedRuntimeFFIError::new(code, &err.to_string())
}
}

impl Drop for SharedRuntimeFFIError {
fn drop(&mut self) {
if !self.msg.is_null() {
// SAFETY: `msg` is always produced by `CString::into_raw` in `new`.
unsafe {
drop(CString::from_raw(self.msg));
self.msg = std::ptr::null_mut();
}
}
}
}

/// Frees a `SharedRuntimeFFIError`. After this call the pointer is invalid.
#[no_mangle]
pub unsafe extern "C" fn ddog_shared_runtime_error_free(error: Option<Box<SharedRuntimeFFIError>>) {
drop(error);
}

/// Create a new `SharedRuntime`.
///
/// On success writes a raw handle into `*out_handle` and returns `None`.
/// On failure leaves `*out_handle` unchanged and returns an error.
///
/// The caller owns the handle and must eventually pass it to
/// [`ddog_shared_runtime_free`] (or another consumer that takes ownership).
#[no_mangle]
pub unsafe extern "C" fn ddog_shared_runtime_new(
out_handle: NonNull<*const SharedRuntime>,
) -> Option<Box<SharedRuntimeFFIError>> {
match SharedRuntime::new() {
Ok(runtime) => {
out_handle.as_ptr().write(Arc::into_raw(Arc::new(runtime)));
None
}
Err(err) => Some(Box::new(SharedRuntimeFFIError::from(err))),
}
}

/// Free a handle, decrementing the `Arc` strong count.
///
/// The underlying runtime may not be dropped if other components are still using it.
/// Use [`ddog_shared_runtime_shutdown`] to cleanly stop workers.
#[no_mangle]
pub unsafe extern "C" fn ddog_shared_runtime_free(handle: *const SharedRuntime) {
if !handle.is_null() {
// SAFETY: handle was produced by Arc::into_raw; this call takes ownership.
drop(Arc::from_raw(handle));
}
}

/// Must be called in the parent process before `fork()`.
///
/// Pauses all workers so that no background threads are running during the
/// fork, preventing deadlocks in the child process.
///
/// Returns an error if `handle` is null.
#[no_mangle]
pub unsafe extern "C" fn ddog_shared_runtime_before_fork(
handle: *const SharedRuntime,
) -> Option<Box<SharedRuntimeFFIError>> {
if handle.is_null() {
return Some(Box::new(SharedRuntimeFFIError::new(
SharedRuntimeErrorCode::InvalidArgument,
"handle is null",
)));
}
// SAFETY: handle was produced by Arc::into_raw and the Arc is still alive.
(*handle).before_fork();
None
}

/// Must be called in the parent process after `fork()`.
///
/// Restarts all workers that were paused by [`ddog_shared_runtime_before_fork`].
///
/// Returns `None` on success, or an error if workers could not be restarted.
#[no_mangle]
pub unsafe extern "C" fn ddog_shared_runtime_after_fork_parent(
handle: *const SharedRuntime,
) -> Option<Box<SharedRuntimeFFIError>> {
if handle.is_null() {
return Some(Box::new(SharedRuntimeFFIError::new(
SharedRuntimeErrorCode::InvalidArgument,
"handle is null",
)));
}
// SAFETY: handle was produced by Arc::into_raw and the Arc is still alive.
match (*handle).after_fork_parent() {
Ok(()) => None,
Err(err) => Some(Box::new(SharedRuntimeFFIError::from(err))),
}
}

/// Must be called in the child process after `fork()`.
///
/// Creates a fresh tokio runtime and restarts all workers. The original
/// runtime cannot be safely reused after a fork.
///
/// Returns `None` on success, or an error if the runtime could not be
/// reinitialized.
#[no_mangle]
pub unsafe extern "C" fn ddog_shared_runtime_after_fork_child(
handle: *const SharedRuntime,
) -> Option<Box<SharedRuntimeFFIError>> {
if handle.is_null() {
return Some(Box::new(SharedRuntimeFFIError::new(
SharedRuntimeErrorCode::InvalidArgument,
"handle is null",
)));
}
// SAFETY: handle was produced by Arc::into_raw and the Arc is still alive.
match (*handle).after_fork_child() {
Ok(()) => None,
Err(err) => Some(Box::new(SharedRuntimeFFIError::from(err))),
}
}

/// Shut down the `SharedRuntime`, stopping all workers.
///
/// `timeout_ms` is the maximum time to wait for workers to stop, in
/// milliseconds. Pass `0` for no timeout.
///
/// Returns `None` on success, or `SharedRuntimeErrorCode::ShutdownTimedOut`
/// if the timeout was reached.
#[no_mangle]
pub unsafe extern "C" fn ddog_shared_runtime_shutdown(
handle: *const SharedRuntime,
timeout_ms: u64,
) -> Option<Box<SharedRuntimeFFIError>> {
if handle.is_null() {
return Some(Box::new(SharedRuntimeFFIError::new(
SharedRuntimeErrorCode::InvalidArgument,
"handle is null",
)));
}

let timeout = if timeout_ms > 0 {
Some(std::time::Duration::from_millis(timeout_ms))
} else {
None
};

// SAFETY: handle was produced by Arc::into_raw and the Arc is still alive.
match (*handle).shutdown(timeout) {
Ok(()) => None,
Err(err) => Some(Box::new(SharedRuntimeFFIError::from(err))),
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::mem::MaybeUninit;

#[test]
fn test_new_and_free() {
unsafe {
let mut handle: MaybeUninit<*const SharedRuntime> = MaybeUninit::uninit();
let err = ddog_shared_runtime_new(NonNull::new_unchecked(handle.as_mut_ptr()));
assert!(err.is_none());
ddog_shared_runtime_free(handle.assume_init());
}
}

#[test]
fn test_before_after_fork_null() {
unsafe {
let err = ddog_shared_runtime_before_fork(std::ptr::null());
assert_eq!(err.unwrap().code, SharedRuntimeErrorCode::InvalidArgument);

let err = ddog_shared_runtime_after_fork_parent(std::ptr::null());
assert_eq!(err.unwrap().code, SharedRuntimeErrorCode::InvalidArgument);

let err = ddog_shared_runtime_after_fork_child(std::ptr::null());
assert_eq!(err.unwrap().code, SharedRuntimeErrorCode::InvalidArgument);
}
}

#[test]
fn test_fork_lifecycle() {
unsafe {
let mut handle: MaybeUninit<*const SharedRuntime> = MaybeUninit::uninit();
ddog_shared_runtime_new(NonNull::new_unchecked(handle.as_mut_ptr()));
let handle = handle.assume_init();

let err = ddog_shared_runtime_before_fork(handle);
assert!(err.is_none(), "{:?}", err.map(|e| e.code));

let err = ddog_shared_runtime_after_fork_parent(handle);
assert!(err.is_none(), "{:?}", err.map(|e| e.code));

ddog_shared_runtime_free(handle);
}
}

#[test]
fn test_shutdown() {
unsafe {
let mut handle: MaybeUninit<*const SharedRuntime> = MaybeUninit::uninit();
ddog_shared_runtime_new(NonNull::new_unchecked(handle.as_mut_ptr()));
let handle = handle.assume_init();

let err = ddog_shared_runtime_shutdown(handle, 0);
assert!(err.is_none());

ddog_shared_runtime_free(handle);
}
}

#[test]
fn test_error_free() {
let error = Box::new(SharedRuntimeFFIError::new(
SharedRuntimeErrorCode::InvalidArgument,
"test error",
));
unsafe { ddog_shared_runtime_error_free(Some(error)) };
}
}
Loading
Loading