From 00c2b267eb2e48fd9df2c61dba8d98cf4f250cc6 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Tue, 13 Jan 2026 11:34:00 +0100 Subject: [PATCH 01/13] feat(sidecar): implement thread listener module Signed-off-by: Alexandre Rulleau --- datadog-sidecar-ffi/src/lib.rs | 53 +++++ datadog-sidecar/src/setup/mod.rs | 6 + datadog-sidecar/src/setup/thread_listener.rs | 222 +++++++++++++++++++ 3 files changed, 281 insertions(+) create mode 100644 datadog-sidecar/src/setup/thread_listener.rs diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 78bab1d2e6..4659b70ae6 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -310,6 +310,59 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - MaybeError::None } +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { + use datadog_sidecar::setup::MasterListener; + + let cfg = datadog_sidecar::config::FromEnv::config(); + try_c!(MasterListener::start(pid, cfg)); + + MaybeError::None +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_connect_worker( + pid: i32, + connection: &mut *mut SidecarTransport, +) -> MaybeError { + use datadog_sidecar::setup::connect_to_master; + + let transport = try_c!(connect_to_master(pid)); + *connection = Box::into_raw(transport); + + MaybeError::None +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { + use datadog_sidecar::setup::MasterListener; + + try_c!(MasterListener::shutdown()); + + MaybeError::None +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool { + use datadog_sidecar::setup::MasterListener; + + MasterListener::is_active(pid) +} + +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError { + use datadog_sidecar::setup::MasterListener; + + try_c!(MasterListener::clear_inherited_state()); + + MaybeError::None +} + #[no_mangle] pub extern "C" fn ddog_sidecar_ping(transport: &mut Box) -> MaybeError { try_c!(blocking::ping(transport)); diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index 07c837aab0..fc0ed95ff5 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -12,6 +12,12 @@ mod windows; #[cfg(windows)] pub use self::windows::*; +// Thread-based listener module (Unix only) +#[cfg(unix)] +pub mod thread_listener; +#[cfg(unix)] +pub use thread_listener::{MasterListener, connect_to_master}; + use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs new file mode 100644 index 0000000000..6783e2cde2 --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -0,0 +1,222 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::sync::{Arc, Mutex, OnceLock, mpsc}; +use std::thread::{self, JoinHandle}; +use std::time::Duration; +use std::io; +use tokio::net::UnixListener; +use tokio::runtime::Runtime; +use tracing::{info, error}; + +use crate::config::Config; +use crate::config::IpcMode::{InstancePerProcess, Shared}; +use crate::service::blocking::SidecarTransport; +use crate::service::SidecarServer; +use crate::setup::{Liaison, SharedDirLiaison}; +use datadog_ipc::platform::AsyncChannel; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: mpsc::Sender<()>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread. + /// + /// This spawns a new OS thread with a Tokio runtime that listens for + /// worker connections. Only one listener can be active per process. + pub fn start(pid: i32, config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex.lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let (shutdown_tx, shutdown_rx) = mpsc::channel(); + + // Wrap shutdown receiver in Arc> for sharing with async function + let shutdown_rx = Arc::new(Mutex::new(shutdown_rx)); + + let runtime = Runtime::new() + .map_err(|e| io::Error::other(format!("Failed to create Tokio runtime: {}", e)))?; + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + runtime.block_on(async { + if let Err(e) = run_listener(config, shutdown_rx).await { + error!("Listener thread error: {}", e); + } + }); + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx, + thread_handle: Some(thread_handle), + pid, + }); + + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex.lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + let _ = master.shutdown_tx.send(()); + + // Give the runtime a moment to process shutdown + std::thread::sleep(Duration::from_millis(100)); + + if let Some(handle) = master.thread_handle.take() { + handle.join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + /// + /// Used for fork detection: child processes inherit the listener state + /// but don't own the actual thread. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state after fork. + /// + /// Child processes must call this to prevent attempting to use the + /// parent's listener thread, which doesn't exist in the child. + pub fn clear_inherited_state() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex.lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + info!("Clearing inherited master listener state in child process"); + *listener_guard = None; + } + + Ok(()) + } +} + +/// Async listener loop that accepts worker connections. +/// +/// This runs in the listener thread's Tokio runtime and handles: +/// - Accepting new worker connections +/// - Spawning handlers for each connection +/// - Graceful shutdown on signal +async fn run_listener(config: Config, shutdown_rx: Arc>>) -> io::Result<()> { + info!("Listener thread running, creating IPC server"); + + // Create IPC server using the platform-specific Liaison + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; + + let std_listener = liaison.attempt_listen()? + .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; + + std_listener.set_nonblocking(true)?; + let ipc_server = UnixListener::from_std(std_listener)?; + + info!("IPC server listening for worker connections"); + + let server = SidecarServer::default(); + + loop { + if let Ok(rx) = shutdown_rx.lock() { + if rx.try_recv().is_ok() || matches!(rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)) { + info!("Shutdown signal received, exiting listener loop"); + break; + } + } + + match tokio::time::timeout(Duration::from_millis(100), ipc_server.accept()).await { + Ok(Ok((client, _addr))) => { + info!("Accepted new worker connection"); + let server_clone = server.clone(); + + tokio::spawn(async move { + handle_worker_connection(client, server_clone).await; + }); + } + Ok(Err(e)) => { + error!("Failed to accept worker connection: {}", e); + } + Err(_) => { + // Timeout - continue loop to check shutdown signal + continue; + } + } + } + + info!("Listener thread shutting down"); + Ok(()) +} + +/// Handle a single worker connection. +/// +/// Processes requests from the worker and sends responses until the +/// connection is closed. +async fn handle_worker_connection( + client: tokio::net::UnixStream, + server: SidecarServer, +) { + info!("Handling worker connection"); + server.accept_connection(AsyncChannel::from(client)).await; + info!("Worker connection handler exiting"); +} + +/// Connect to the master listener as a worker. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener (PID {})", pid); + + let config = Config::get(); + + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; + + let channel = liaison.connect_to_server() + .map_err(|e| io::Error::other(format!("Failed to connect to master listener: {}", e)))?; + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, // Reconnection handled by caller + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +} From 87054b92c88597ac8a5e1c56dea9acfe6c644a7f Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 23 Jan 2026 11:29:30 +0100 Subject: [PATCH 02/13] feat(sidecar): apply feedbacks Signed-off-by: Alexandre Rulleau --- datadog-sidecar/src/entry.rs | 98 ++++++++--- datadog-sidecar/src/setup/mod.rs | 2 +- datadog-sidecar/src/setup/thread_listener.rs | 173 +++++++++---------- 3 files changed, 159 insertions(+), 114 deletions(-) diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index 218ec23252..14c5594fe4 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -15,7 +15,7 @@ use std::{ }, time::{Duration, Instant}, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; #[cfg(unix)] use crate::crashtracker::crashtracker_unix_socket_path; @@ -32,7 +32,28 @@ use crate::tracer::SHM_LIMITER; use crate::watchdog::Watchdog; use crate::{ddog_daemon_entry_point, setup_daemon_process}; -async fn main_loop(listener: L, cancel: Arc) -> io::Result<()> +/// Configuration for main_loop behavior +pub struct MainLoopConfig { + pub enable_ctrl_c_handler: bool, + pub enable_crashtracker: bool, + pub external_shutdown_rx: Option>, +} + +impl Default for MainLoopConfig { + fn default() -> Self { + Self { + enable_ctrl_c_handler: true, + enable_crashtracker: true, + external_shutdown_rx: None, + } + } +} + +pub async fn main_loop( + listener: L, + cancel: Arc, + loop_config: MainLoopConfig, +) -> io::Result<()> where L: FnOnce(Box) -> Fut, Fut: Future>, @@ -64,29 +85,45 @@ where } }); - tokio::spawn(async move { - if let Err(err) = tokio::signal::ctrl_c().await { - tracing::error!("Error setting up signal handler {}", err); - } - tracing::info!("Received Ctrl-C Signal, shutting down"); - cancel(); - }); + if let Some(shutdown_rx) = loop_config.external_shutdown_rx { + let cancel = cancel.clone(); + tokio::spawn(async move { + let _ = shutdown_rx.await; + tracing::info!("External shutdown signal received"); + cancel(); + }); + } + + if loop_config.enable_ctrl_c_handler { + let cancel = cancel.clone(); + tokio::spawn(async move { + if let Err(err) = tokio::signal::ctrl_c().await { + tracing::error!("Error setting up signal handler {}", err); + } + tracing::info!("Received Ctrl-C Signal, shutting down"); + cancel(); + }); + } #[cfg(unix)] - tokio::spawn(async move { - let socket_path = crashtracker_unix_socket_path(); - match libdd_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default()) - { - Ok(listener) => loop { - if let Err(e) = - libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener).await - { - tracing::warn!("Got error while receiving crash report: {e}"); - } - }, - Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), - } - }); + if loop_config.enable_crashtracker { + tokio::spawn(async move { + let socket_path = crashtracker_unix_socket_path(); + match libdd_crashtracker::get_receiver_unix_socket( + socket_path.to_str().unwrap_or_default(), + ) { + Ok(listener) => loop { + if let Err(e) = + libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener) + .await + { + tracing::warn!("Got error while receiving crash report: {e}"); + } + }, + Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), + } + }); + } // Init. Early, before we start listening. drop(SHM_LIMITER.lock()); @@ -143,6 +180,19 @@ where } pub fn enter_listener_loop(acquire_listener: F) -> anyhow::Result<()> +where + F: FnOnce() -> io::Result<(L, C)>, + L: FnOnce(Box) -> Fut, + Fut: Future>, + C: Fn() + Sync + Send + 'static, +{ + enter_listener_loop_with_config(acquire_listener, MainLoopConfig::default()) +} + +pub fn enter_listener_loop_with_config( + acquire_listener: F, + loop_config: MainLoopConfig, +) -> anyhow::Result<()> where F: FnOnce() -> io::Result<(L, C)>, L: FnOnce(Box) -> Fut, @@ -159,7 +209,7 @@ where let (listener, cancel) = acquire_listener()?; runtime - .block_on(main_loop(listener, Arc::new(cancel))) + .block_on(main_loop(listener, Arc::new(cancel), loop_config)) .map_err(|e| e.into()) } diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index fc0ed95ff5..1417c8bde1 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -16,7 +16,7 @@ pub use self::windows::*; #[cfg(unix)] pub mod thread_listener; #[cfg(unix)] -pub use thread_listener::{MasterListener, connect_to_master}; +pub use thread_listener::{connect_to_master, MasterListener}; use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs index 6783e2cde2..dd41d0f7d7 100644 --- a/datadog-sidecar/src/setup/thread_listener.rs +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -1,26 +1,24 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -use std::sync::{Arc, Mutex, OnceLock, mpsc}; -use std::thread::{self, JoinHandle}; -use std::time::Duration; use std::io; +use std::sync::{Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; use tokio::net::UnixListener; -use tokio::runtime::Runtime; -use tracing::{info, error}; +use tokio::sync::oneshot; +use tracing::{error, info}; use crate::config::Config; use crate::config::IpcMode::{InstancePerProcess, Shared}; +use crate::entry::MainLoopConfig; use crate::service::blocking::SidecarTransport; -use crate::service::SidecarServer; use crate::setup::{Liaison, SharedDirLiaison}; -use datadog_ipc::platform::AsyncChannel; use datadog_ipc::transport::blocking::BlockingTransport; static MASTER_LISTENER: OnceLock>> = OnceLock::new(); pub struct MasterListener { - shutdown_tx: mpsc::Sender<()>, + shutdown_tx: Option>, thread_handle: Option>, pid: i32, } @@ -28,38 +26,32 @@ pub struct MasterListener { impl MasterListener { /// Start the master listener thread. /// - /// This spawns a new OS thread with a Tokio runtime that listens for - /// worker connections. Only one listener can be active per process. + /// This spawns a new OS thread that calls enter_listener_loop_with_config + /// to create a Tokio runtime and listen for worker connections. + /// Only one listener can be active per process. pub fn start(pid: i32, config: Config) -> io::Result<()> { let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); - let mut listener_guard = listener_mutex.lock() + let mut listener_guard = listener_mutex + .lock() .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; if listener_guard.is_some() { return Err(io::Error::other("Master listener is already running")); } - let (shutdown_tx, shutdown_rx) = mpsc::channel(); - - // Wrap shutdown receiver in Arc> for sharing with async function - let shutdown_rx = Arc::new(Mutex::new(shutdown_rx)); - - let runtime = Runtime::new() - .map_err(|e| io::Error::other(format!("Failed to create Tokio runtime: {}", e)))?; + let (shutdown_tx, shutdown_rx) = oneshot::channel(); let thread_handle = thread::Builder::new() .name(format!("ddtrace-sidecar-listener-{}", pid)) .spawn(move || { - runtime.block_on(async { - if let Err(e) = run_listener(config, shutdown_rx).await { - error!("Listener thread error: {}", e); - } - }); + if let Err(e) = run_listener(config, shutdown_rx) { + error!("Listener thread error: {}", e); + } }) .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; *listener_guard = Some(MasterListener { - shutdown_tx, + shutdown_tx: Some(shutdown_tx), thread_handle: Some(thread_handle), pid, }); @@ -73,17 +65,19 @@ impl MasterListener { /// and will wait for the thread to exit cleanly. pub fn shutdown() -> io::Result<()> { let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); - let mut listener_guard = listener_mutex.lock() + let mut listener_guard = listener_mutex + .lock() .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; if let Some(mut master) = listener_guard.take() { - let _ = master.shutdown_tx.send(()); - - // Give the runtime a moment to process shutdown - std::thread::sleep(Duration::from_millis(100)); + // Signal shutdown by sending to or dropping the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } if let Some(handle) = master.thread_handle.take() { - handle.join() + handle + .join() .map_err(|_| io::Error::other("Failed to join listener thread"))?; } @@ -113,7 +107,8 @@ impl MasterListener { /// parent's listener thread, which doesn't exist in the child. pub fn clear_inherited_state() -> io::Result<()> { let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); - let mut listener_guard = listener_mutex.lock() + let mut listener_guard = listener_mutex + .lock() .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; if listener_guard.is_some() { @@ -125,75 +120,74 @@ impl MasterListener { } } -/// Async listener loop that accepts worker connections. -/// -/// This runs in the listener thread's Tokio runtime and handles: -/// - Accepting new worker connections -/// - Spawning handlers for each connection -/// - Graceful shutdown on signal -async fn run_listener(config: Config, shutdown_rx: Arc>>) -> io::Result<()> { - info!("Listener thread running, creating IPC server"); +/// Accept connections in a loop for thread mode. +async fn accept_socket_loop_thread( + listener: UnixListener, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in thread listener"); + break; + } + accept = listener.accept() => { + match accept { + Ok((socket, _)) => { + info!("Accepted new worker connection"); + handler(socket); + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + break; + } + } + } + } + } + Ok(()) +} - // Create IPC server using the platform-specific Liaison - let liaison: SharedDirLiaison = match config.ipc_mode { - Shared => Liaison::ipc_shared(), - InstancePerProcess => Liaison::ipc_per_process(), - }; +/// Entry point for thread listener - calls enter_listener_loop_with_config +fn run_listener(config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating IPC server"); - let std_listener = liaison.attempt_listen()? - .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; + let acquire_listener = move || { + let liaison: SharedDirLiaison = match config.ipc_mode { + Shared => Liaison::ipc_shared(), + InstancePerProcess => Liaison::ipc_per_process(), + }; - std_listener.set_nonblocking(true)?; - let ipc_server = UnixListener::from_std(std_listener)?; + let std_listener = liaison + .attempt_listen()? + .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; - info!("IPC server listening for worker connections"); + std_listener.set_nonblocking(true)?; + let listener = UnixListener::from_std(std_listener)?; - let server = SidecarServer::default(); + info!("IPC server listening for worker connections"); - loop { - if let Ok(rx) = shutdown_rx.lock() { - if rx.try_recv().is_ok() || matches!(rx.try_recv(), Err(mpsc::TryRecvError::Disconnected)) { - info!("Shutdown signal received, exiting listener loop"); - break; - } - } + let cancel = || {}; + Ok(( + move |handler| accept_socket_loop_thread(listener, handler, shutdown_rx), + cancel, + )) + }; - match tokio::time::timeout(Duration::from_millis(100), ipc_server.accept()).await { - Ok(Ok((client, _addr))) => { - info!("Accepted new worker connection"); - let server_clone = server.clone(); + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + }; - tokio::spawn(async move { - handle_worker_connection(client, server_clone).await; - }); - } - Ok(Err(e)) => { - error!("Failed to accept worker connection: {}", e); - } - Err(_) => { - // Timeout - continue loop to check shutdown signal - continue; - } - } - } + crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + .map_err(|e| io::Error::other(format!("Thread listener failed: {}", e)))?; - info!("Listener thread shutting down"); + info!("Listener thread exiting"); Ok(()) } -/// Handle a single worker connection. -/// -/// Processes requests from the worker and sends responses until the -/// connection is closed. -async fn handle_worker_connection( - client: tokio::net::UnixStream, - server: SidecarServer, -) { - info!("Handling worker connection"); - server.accept_connection(AsyncChannel::from(client)).await; - info!("Worker connection handler exiting"); -} - /// Connect to the master listener as a worker. /// /// Establishes a connection to the master listener thread for the given PID. @@ -207,7 +201,8 @@ pub fn connect_to_master(pid: i32) -> io::Result> { InstancePerProcess => Liaison::ipc_per_process(), }; - let channel = liaison.connect_to_server() + let channel = liaison + .connect_to_server() .map_err(|e| io::Error::other(format!("Failed to connect to master listener: {}", e)))?; let transport = BlockingTransport::from(channel); From 8beb392adb6f5246dd7d1404d0a2c00341a741cb Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 23 Jan 2026 13:27:43 +0100 Subject: [PATCH 03/13] feat(sidecar): support threaded connection for windows Signed-off-by: Alexandre Rulleau --- datadog-sidecar-ffi/src/lib.rs | 17 +- datadog-sidecar/src/setup/mod.rs | 8 +- datadog-sidecar/src/setup/thread_listener.rs | 2 +- .../src/setup/thread_listener_windows.rs | 211 ++++++++++++++++++ 4 files changed, 221 insertions(+), 17 deletions(-) create mode 100644 datadog-sidecar/src/setup/thread_listener_windows.rs diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 4659b70ae6..4370c41c23 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -60,6 +60,8 @@ use std::slice; use std::sync::Arc; use std::time::Duration; +use datadog_sidecar::setup::{connect_to_master, MasterListener}; + #[no_mangle] #[cfg(target_os = "windows")] pub extern "C" fn ddog_setup_crashtracking( @@ -311,10 +313,7 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { - use datadog_sidecar::setup::MasterListener; - let cfg = datadog_sidecar::config::FromEnv::config(); try_c!(MasterListener::start(pid, cfg)); @@ -322,13 +321,10 @@ pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_connect_worker( pid: i32, connection: &mut *mut SidecarTransport, ) -> MaybeError { - use datadog_sidecar::setup::connect_to_master; - let transport = try_c!(connect_to_master(pid)); *connection = Box::into_raw(transport); @@ -336,28 +332,19 @@ pub extern "C" fn ddog_sidecar_connect_worker( } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { - use datadog_sidecar::setup::MasterListener; - try_c!(MasterListener::shutdown()); MaybeError::None } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool { - use datadog_sidecar::setup::MasterListener; - MasterListener::is_active(pid) } #[no_mangle] -#[cfg(unix)] pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError { - use datadog_sidecar::setup::MasterListener; - try_c!(MasterListener::clear_inherited_state()); MaybeError::None diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index 1417c8bde1..b82587b217 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -12,12 +12,18 @@ mod windows; #[cfg(windows)] pub use self::windows::*; -// Thread-based listener module (Unix only) +// Thread-based listener module (Unix) #[cfg(unix)] pub mod thread_listener; #[cfg(unix)] pub use thread_listener::{connect_to_master, MasterListener}; +// Thread-based listener module (Windows) +#[cfg(windows)] +pub mod thread_listener_windows; +#[cfg(windows)] +pub use thread_listener_windows::{connect_to_master, MasterListener}; + use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs index dd41d0f7d7..815fe1466d 100644 --- a/datadog-sidecar/src/setup/thread_listener.rs +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -209,7 +209,7 @@ pub fn connect_to_master(pid: i32) -> io::Result> { let sidecar_transport = Box::new(SidecarTransport { inner: Mutex::new(transport), - reconnect_fn: None, // Reconnection handled by caller + reconnect_fn: None, }); info!("Successfully connected to master listener"); diff --git a/datadog-sidecar/src/setup/thread_listener_windows.rs b/datadog-sidecar/src/setup/thread_listener_windows.rs new file mode 100644 index 0000000000..2414ff0a14 --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener_windows.rs @@ -0,0 +1,211 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::io; +use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle}; +use std::sync::{Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; +use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}; +use tokio::sync::oneshot; +use tracing::{error, info}; + +use crate::config::Config; +use crate::entry::MainLoopConfig; +use crate::service::blocking::SidecarTransport; +use datadog_ipc::platform::metadata::ProcessHandle; +use datadog_ipc::platform::Channel; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: Option>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread using Windows Named Pipes. + /// + /// This spawns a new OS thread that creates a named pipe server + /// to listen for worker connections. Only one listener can be active per process. + pub fn start(pid: i32, _config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + if let Err(e) = run_listener_windows(pipe_name, shutdown_rx) { + error!("Listener thread error: {}", e); + } + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx: Some(shutdown_tx), + thread_handle: Some(thread_handle), + pid, + }); + + info!("Started Windows named pipe listener (PID {})", pid); + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + // Signal shutdown by sending to the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } + + if let Some(handle) = master.thread_handle.take() { + handle + .join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state. + /// Kept for API compatibility with Unix version. + pub fn clear_inherited_state() -> io::Result<()> { + Ok(()) + } +} + +/// Accept connections in a loop for Windows named pipes. +async fn accept_pipe_loop_windows( + pipe_name: String, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + let mut server = ServerOptions::new() + .first_pipe_instance(true) + .max_instances(254) // Windows allows up to 255 instances + .create(&pipe_name)?; + + info!("Named pipe server created at: {}", pipe_name); + + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in Windows pipe listener"); + break; + } + result = server.connect() => { + match result { + Ok(_) => { + info!("Accepted new worker connection on named pipe"); + handler(server); + + server = ServerOptions::new() + .create(&pipe_name)?; + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + match ServerOptions::new().create(&pipe_name) { + Ok(new_server) => server = new_server, + Err(e2) => { + error!("Failed to recover named pipe: {}", e2); + break; + } + } + } + } + } + } + } + Ok(()) +} + +/// Entry point for Windows named pipe listener +fn run_listener_windows(pipe_name: String, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating Windows named pipe server"); + + let acquire_listener = move || { + let cancel = || {}; + let pipe_name_clone = pipe_name.clone(); + Ok(( + move |handler| accept_pipe_loop_windows(pipe_name_clone, handler, shutdown_rx), + cancel, + )) + }; + + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + }; + + crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + .map_err(|e| io::Error::other(format!("Windows thread listener failed: {}", e)))?; + + info!("Listener thread exiting"); + Ok(()) +} + +/// Connect to the master listener as a worker using Windows Named Pipes. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener via named pipe (PID {})", pid); + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + + let client = ClientOptions::new().open(&pipe_name)?; + + info!("Connected to named pipe: {}", pipe_name); + + let raw_handle = client.as_raw_handle(); + let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) }; + + std::mem::forget(client); + + let process_handle = + ProcessHandle::Getter(Box::new(move || Ok(ProcessHandle::Pid(pid as u32)))); + let channel = Channel::from_client_handle_and_pid(owned_handle, process_handle); + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +} From fff033d121db00893f33fb5d939aee9b0ffc2726 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Tue, 3 Mar 2026 17:03:26 +0100 Subject: [PATCH 04/13] fix(sidecar): scope thread mode IPC to master PID to prevent process interference In thread mode on Linux, primary_sidecar_identifier() returned geteuid(), causing all PHP processes with the same euid to share the same Unix socket and sidecar resources (SHM, rate limiters, crash tracker, etc.). - Add set_sidecar_master_pid(pid) and update primary_sidecar_identifier() to return the stored PID instead of euid when in thread mode - Add SharedDirLiaison::ipc_for_pid(pid) to build per-master socket paths - Use ipc_for_pid(pid) in both run_listener and connect_to_master so the socket path is always pid-specific, matching Windows named-pipe behavior - Call set_sidecar_master_pid from ddog_sidecar_connect_master so all shared resources use the master PID as their identifier key --- datadog-sidecar-ffi/src/lib.rs | 1 + datadog-sidecar/src/setup/thread_listener.rs | 20 +++++++------------- datadog-sidecar/src/setup/unix.rs | 19 +++++++++++++++++++ datadog-sidecar/src/unix.rs | 14 +++++++++++++- 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 4370c41c23..dbda3f8ca4 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -315,6 +315,7 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - #[no_mangle] pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { let cfg = datadog_sidecar::config::FromEnv::config(); + datadog_sidecar::set_sidecar_master_pid(pid as u32); try_c!(MasterListener::start(pid, cfg)); MaybeError::None diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs index 815fe1466d..0199e8cebb 100644 --- a/datadog-sidecar/src/setup/thread_listener.rs +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -9,7 +9,6 @@ use tokio::sync::oneshot; use tracing::{error, info}; use crate::config::Config; -use crate::config::IpcMode::{InstancePerProcess, Shared}; use crate::entry::MainLoopConfig; use crate::service::blocking::SidecarTransport; use crate::setup::{Liaison, SharedDirLiaison}; @@ -44,7 +43,7 @@ impl MasterListener { let thread_handle = thread::Builder::new() .name(format!("ddtrace-sidecar-listener-{}", pid)) .spawn(move || { - if let Err(e) = run_listener(config, shutdown_rx) { + if let Err(e) = run_listener(pid as u32, config, shutdown_rx) { error!("Listener thread error: {}", e); } }) @@ -150,14 +149,13 @@ async fn accept_socket_loop_thread( } /// Entry point for thread listener - calls enter_listener_loop_with_config -fn run_listener(config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { +fn run_listener(pid: u32, _config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { info!("Listener thread running, creating IPC server"); let acquire_listener = move || { - let liaison: SharedDirLiaison = match config.ipc_mode { - Shared => Liaison::ipc_shared(), - InstancePerProcess => Liaison::ipc_per_process(), - }; + // In thread mode, always use a pid-specific socket so that multiple PHP processes + // with the same euid do not share a listener. + let liaison = SharedDirLiaison::ipc_for_pid(pid); let std_listener = liaison .attempt_listen()? @@ -194,12 +192,8 @@ fn run_listener(config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Resul pub fn connect_to_master(pid: i32) -> io::Result> { info!("Connecting to master listener (PID {})", pid); - let config = Config::get(); - - let liaison: SharedDirLiaison = match config.ipc_mode { - Shared => Liaison::ipc_shared(), - InstancePerProcess => Liaison::ipc_per_process(), - }; + // Use the same pid-specific socket path as the master listener. + let liaison = SharedDirLiaison::ipc_for_pid(pid as u32); let channel = liaison .connect_to_server() diff --git a/datadog-sidecar/src/setup/unix.rs b/datadog-sidecar/src/setup/unix.rs index 589602b9b4..80534e846d 100644 --- a/datadog-sidecar/src/setup/unix.rs +++ b/datadog-sidecar/src/setup/unix.rs @@ -115,6 +115,25 @@ impl SharedDirLiaison { pub fn new_default_location() -> Self { Self::new(env::temp_dir().join("libdatadog")) } + + pub fn ipc_for_pid(pid: u32) -> Self { + let base_dir = env::temp_dir().join("libdatadog"); + let versioned_socket_basename = format!( + "libdd.{}@{}.sock", + crate::sidecar_version!(), + pid + ); + let socket_path = base_dir + .join(&versioned_socket_basename) + .with_extension(".sock"); + let lock_path = base_dir + .join(&versioned_socket_basename) + .with_extension(".sock.lock"); + Self { + socket_path, + lock_path, + } + } } impl Default for SharedDirLiaison { diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index 61ce695a7f..b728e0ee72 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -5,6 +5,7 @@ use spawn_worker::{getpid, SpawnWorker, Stdio, TrampolineData}; use std::ffi::CString; use std::os::unix::net::UnixListener as StdUnixListener; +use std::sync::atomic::{AtomicU32, Ordering}; use crate::config::Config; use crate::enter_listener_loop; @@ -132,8 +133,19 @@ pub fn setup_daemon_process( Ok(()) } +static SIDECAR_MASTER_PID: AtomicU32 = AtomicU32::new(0); + +pub fn set_sidecar_master_pid(pid: u32) { + SIDECAR_MASTER_PID.store(pid, Ordering::Relaxed); +} + pub fn primary_sidecar_identifier() -> u32 { - unsafe { libc::geteuid() } + let pid = SIDECAR_MASTER_PID.load(Ordering::Relaxed); + if pid != 0 { + pid + } else { + unsafe { libc::geteuid() } + } } fn maybe_start_appsec() -> bool { From 2df618f504c2b72c8c2027610783b1fc9059589e Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Wed, 4 Mar 2026 14:07:46 +0100 Subject: [PATCH 05/13] fix(sidecar): encode master uid in thread-mode socket name for setuid compatibility Thread-mode sockets are placed in /tmp/libdatadog/ with the master's effective uid encoded in the socket name (libdd.@-.sock). This ensures that a worker process that has since dropped privileges via setuid() (e.g. www-data under PHP-FPM) still computes the same socket path as the master listener. Also fix ensure_dir_exists to always attempt chmod 0o777 (best-effort, ignoring EPERM) so that even pre-existing directories get the world-writable bit set when the process has sufficient permissions. Also fix socket/lock path construction: remove spurious .with_extension(".sock") (which produced a double-dot suffix on Rust >=1.87) and use "sock.lock" without a leading dot for the lock file. --- datadog-sidecar-ffi/src/lib.rs | 1 + datadog-sidecar/src/setup/unix.rs | 29 ++++++++++------------------- datadog-sidecar/src/unix.rs | 15 +++++++++++++++ 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index dbda3f8ca4..6cbcd0eff0 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -316,6 +316,7 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { let cfg = datadog_sidecar::config::FromEnv::config(); datadog_sidecar::set_sidecar_master_pid(pid as u32); + datadog_sidecar::set_sidecar_master_uid(unsafe { libc::geteuid() }); try_c!(MasterListener::start(pid, cfg)); MaybeError::None diff --git a/datadog-sidecar/src/setup/unix.rs b/datadog-sidecar/src/setup/unix.rs index 80534e846d..d77c729964 100644 --- a/datadog-sidecar/src/setup/unix.rs +++ b/datadog-sidecar/src/setup/unix.rs @@ -30,13 +30,10 @@ fn ensure_dir_world_writable>(path: P) -> io::Result<()> { } fn ensure_dir_exists>(path: P) -> io::Result<()> { - if path.as_ref().exists() { - return Ok(()); + if !path.as_ref().exists() { + fs::create_dir_all(&path)?; } - - fs::create_dir_all(&path)?; - ensure_dir_world_writable(&path)?; - + let _ = ensure_dir_world_writable(&path); Ok(()) } @@ -99,12 +96,10 @@ impl SharedDirLiaison { primary_sidecar_identifier() ); let base_dir = base_dir.as_ref(); - let socket_path = base_dir - .join(&versioned_socket_basename) - .with_extension(".sock"); + let socket_path = base_dir.join(&versioned_socket_basename); let lock_path = base_dir .join(&versioned_socket_basename) - .with_extension(".sock.lock"); + .with_extension("sock.lock"); Self { socket_path, @@ -117,18 +112,14 @@ impl SharedDirLiaison { } pub fn ipc_for_pid(pid: u32) -> Self { + let uid = crate::sidecar_master_uid(); let base_dir = env::temp_dir().join("libdatadog"); - let versioned_socket_basename = format!( - "libdd.{}@{}.sock", - crate::sidecar_version!(), - pid - ); - let socket_path = base_dir - .join(&versioned_socket_basename) - .with_extension(".sock"); + let versioned_socket_basename = + format!("libdd.{}@{}-{}.sock", crate::sidecar_version!(), uid, pid); + let socket_path = base_dir.join(&versioned_socket_basename); let lock_path = base_dir .join(&versioned_socket_basename) - .with_extension(".sock.lock"); + .with_extension("sock.lock"); Self { socket_path, lock_path, diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index b728e0ee72..2dbf3ae613 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -135,10 +135,25 @@ pub fn setup_daemon_process( static SIDECAR_MASTER_PID: AtomicU32 = AtomicU32::new(0); +static SIDECAR_MASTER_UID: AtomicU32 = AtomicU32::new(u32::MAX); + pub fn set_sidecar_master_pid(pid: u32) { SIDECAR_MASTER_PID.store(pid, Ordering::Relaxed); } +pub fn set_sidecar_master_uid(uid: u32) { + SIDECAR_MASTER_UID.store(uid, Ordering::Relaxed); +} + +pub fn sidecar_master_uid() -> u32 { + let uid = SIDECAR_MASTER_UID.load(Ordering::Relaxed); + if uid != u32::MAX { + uid + } else { + unsafe { libc::geteuid() } + } +} + pub fn primary_sidecar_identifier() -> u32 { let pid = SIDECAR_MASTER_PID.load(Ordering::Relaxed); if pid != 0 { From 47f38843b864f061034c286a1ad87fe1397b522f Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Wed, 4 Mar 2026 14:41:22 +0100 Subject: [PATCH 06/13] fix(sidecar): guard set_sidecar_master_pid/uid with #[cfg(unix)] --- datadog-sidecar-ffi/src/lib.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 6cbcd0eff0..d82a1c6074 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -315,8 +315,11 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - #[no_mangle] pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { let cfg = datadog_sidecar::config::FromEnv::config(); - datadog_sidecar::set_sidecar_master_pid(pid as u32); - datadog_sidecar::set_sidecar_master_uid(unsafe { libc::geteuid() }); + #[cfg(unix)] + { + datadog_sidecar::set_sidecar_master_pid(pid as u32); + datadog_sidecar::set_sidecar_master_uid(unsafe { libc::geteuid() }); + } try_c!(MasterListener::start(pid, cfg)); MaybeError::None From 0ce3cd60e280e5bedeefa6377173ed396f106833 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Wed, 4 Mar 2026 16:35:49 +0100 Subject: [PATCH 07/13] fix(thread-mode): use abstract Unix socket and current_thread Tokio runtime - Switch thread mode IPC to AbstractUnixSocketLiaison on Linux: abstract sockets have no filesystem permissions, so PHP-FPM workers running as a different user (e.g. www-data) can connect to a sidecar thread running in the master process without any chmod/chown workarounds - Use new_current_thread() Tokio runtime in run_listener to avoid spawning extra OS worker threads that trigger LSan "Running thread was not suspended" warnings at process exit under ASAN - Move UnixListener::from_std() inside block_on so it runs with an active Tokio reactor (required by the tokio::net API) - Remove unused SIDECAR_MASTER_UID tracking (replaced by abstract sockets) - Keep SharedDirLiaison::attempt_listen chmod 0o777 for non-Linux platforms --- datadog-sidecar-ffi/src/lib.rs | 5 +- datadog-sidecar/src/setup/thread_listener.rs | 63 +++++++++++++------- datadog-sidecar/src/setup/unix.rs | 20 +++++-- datadog-sidecar/src/unix.rs | 15 ----- 4 files changed, 59 insertions(+), 44 deletions(-) diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index d82a1c6074..8a27159773 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -316,10 +316,7 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { let cfg = datadog_sidecar::config::FromEnv::config(); #[cfg(unix)] - { - datadog_sidecar::set_sidecar_master_pid(pid as u32); - datadog_sidecar::set_sidecar_master_uid(unsafe { libc::geteuid() }); - } + datadog_sidecar::set_sidecar_master_pid(pid as u32); try_c!(MasterListener::start(pid, cfg)); MaybeError::None diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs index 0199e8cebb..62d132a44d 100644 --- a/datadog-sidecar/src/setup/thread_listener.rs +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::io; -use std::sync::{Mutex, OnceLock}; +use std::sync::{Arc, Mutex, OnceLock}; use std::thread::{self, JoinHandle}; use tokio::net::UnixListener; use tokio::sync::oneshot; @@ -11,7 +11,11 @@ use tracing::{error, info}; use crate::config::Config; use crate::entry::MainLoopConfig; use crate::service::blocking::SidecarTransport; -use crate::setup::{Liaison, SharedDirLiaison}; +#[cfg(target_os = "linux")] +use crate::setup::AbstractUnixSocketLiaison; +use crate::setup::Liaison; +#[cfg(not(target_os = "linux"))] +use crate::setup::SharedDirLiaison; use datadog_ipc::transport::blocking::BlockingTransport; static MASTER_LISTENER: OnceLock>> = OnceLock::new(); @@ -148,38 +152,52 @@ async fn accept_socket_loop_thread( Ok(()) } -/// Entry point for thread listener - calls enter_listener_loop_with_config +/// Entry point for thread listener. +/// +/// Uses a single-threaded Tokio runtime (current_thread) to avoid spawning extra OS +/// threads. A multi-thread runtime would leave worker threads visible to LSan/ASAN at +/// process exit, causing "Running thread was not suspended" warnings. With +/// current_thread all async work runs on this OS thread; no other threads are created. fn run_listener(pid: u32, _config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { info!("Listener thread running, creating IPC server"); - let acquire_listener = move || { - // In thread mode, always use a pid-specific socket so that multiple PHP processes - // with the same euid do not share a listener. - let liaison = SharedDirLiaison::ipc_for_pid(pid); - - let std_listener = liaison - .attempt_listen()? - .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; + #[cfg(target_os = "linux")] + let liaison = AbstractUnixSocketLiaison::ipc_for_pid(pid); + #[cfg(not(target_os = "linux"))] + let liaison = SharedDirLiaison::ipc_for_pid(pid); - std_listener.set_nonblocking(true)?; - let listener = UnixListener::from_std(std_listener)?; + let std_listener = liaison + .attempt_listen()? + .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; - info!("IPC server listening for worker connections"); + std_listener.set_nonblocking(true)?; - let cancel = || {}; - Ok(( - move |handler| accept_socket_loop_thread(listener, handler, shutdown_rx), - cancel, - )) - }; + info!("IPC server listening for worker connections"); + let cancel = || {}; let loop_config = MainLoopConfig { enable_ctrl_c_handler: false, enable_crashtracker: false, external_shutdown_rx: None, }; - crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| io::Error::other(format!("Failed building tokio runtime: {}", e)))?; + + // UnixListener::from_std() requires a Tokio reactor context, so it must be + // called inside block_on rather than before the runtime is built. + runtime + .block_on(async { + let listener = UnixListener::from_std(std_listener)?; + crate::entry::main_loop( + move |handler| accept_socket_loop_thread(listener, handler, shutdown_rx), + Arc::new(cancel), + loop_config, + ) + .await + }) .map_err(|e| io::Error::other(format!("Thread listener failed: {}", e)))?; info!("Listener thread exiting"); @@ -193,6 +211,9 @@ pub fn connect_to_master(pid: i32) -> io::Result> { info!("Connecting to master listener (PID {})", pid); // Use the same pid-specific socket path as the master listener. + #[cfg(target_os = "linux")] + let liaison = AbstractUnixSocketLiaison::ipc_for_pid(pid as u32); + #[cfg(not(target_os = "linux"))] let liaison = SharedDirLiaison::ipc_for_pid(pid as u32); let channel = liaison diff --git a/datadog-sidecar/src/setup/unix.rs b/datadog-sidecar/src/setup/unix.rs index d77c729964..3d6fb43f97 100644 --- a/datadog-sidecar/src/setup/unix.rs +++ b/datadog-sidecar/src/setup/unix.rs @@ -72,7 +72,11 @@ impl Liaison for SharedDirLiaison { } fs::remove_file(&self.socket_path)?; } - Ok(Some(UnixListener::bind(&self.socket_path)?)) + let listener = UnixListener::bind(&self.socket_path)?; + // Make the socket world-accessible so PHP workers running as a different user + // (e.g. www-data under PHP-FPM when the master started as root) can connect. + let _ = fs::set_permissions(&self.socket_path, fs::Permissions::from_mode(0o777)); + Ok(Some(listener)) } fn ipc_shared() -> Self { @@ -112,10 +116,8 @@ impl SharedDirLiaison { } pub fn ipc_for_pid(pid: u32) -> Self { - let uid = crate::sidecar_master_uid(); let base_dir = env::temp_dir().join("libdatadog"); - let versioned_socket_basename = - format!("libdd.{}@{}-{}.sock", crate::sidecar_version!(), uid, pid); + let versioned_socket_basename = format!("libdd.{}@{}.sock", crate::sidecar_version!(), pid); let socket_path = base_dir.join(&versioned_socket_basename); let lock_path = base_dir .join(&versioned_socket_basename) @@ -185,6 +187,16 @@ mod linux { } } + impl AbstractUnixSocketLiaison { + pub fn ipc_for_pid(pid: u32) -> Self { + let path = PathBuf::from(format!( + concat!("libdatadog/", crate::sidecar_version!(), "@{}.sock"), + pid + )); + Self { path } + } + } + impl Default for AbstractUnixSocketLiaison { fn default() -> Self { Self::ipc_shared() diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index 2dbf3ae613..b728e0ee72 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -135,25 +135,10 @@ pub fn setup_daemon_process( static SIDECAR_MASTER_PID: AtomicU32 = AtomicU32::new(0); -static SIDECAR_MASTER_UID: AtomicU32 = AtomicU32::new(u32::MAX); - pub fn set_sidecar_master_pid(pid: u32) { SIDECAR_MASTER_PID.store(pid, Ordering::Relaxed); } -pub fn set_sidecar_master_uid(uid: u32) { - SIDECAR_MASTER_UID.store(uid, Ordering::Relaxed); -} - -pub fn sidecar_master_uid() -> u32 { - let uid = SIDECAR_MASTER_UID.load(Ordering::Relaxed); - if uid != u32::MAX { - uid - } else { - unsafe { libc::geteuid() } - } -} - pub fn primary_sidecar_identifier() -> u32 { let pid = SIDECAR_MASTER_PID.load(Ordering::Relaxed); if pid != 0 { From 53b209731cc67fad3167eec9cd185a851e0c9fe1 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Thu, 5 Mar 2026 14:51:03 +0100 Subject: [PATCH 08/13] chore: remove redundant chmod 0o777 on socket (unused in thread mode on Linux) --- datadog-sidecar/src/setup/unix.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/datadog-sidecar/src/setup/unix.rs b/datadog-sidecar/src/setup/unix.rs index 3d6fb43f97..1849164beb 100644 --- a/datadog-sidecar/src/setup/unix.rs +++ b/datadog-sidecar/src/setup/unix.rs @@ -72,11 +72,7 @@ impl Liaison for SharedDirLiaison { } fs::remove_file(&self.socket_path)?; } - let listener = UnixListener::bind(&self.socket_path)?; - // Make the socket world-accessible so PHP workers running as a different user - // (e.g. www-data under PHP-FPM when the master started as root) can connect. - let _ = fs::set_permissions(&self.socket_path, fs::Permissions::from_mode(0o777)); - Ok(Some(listener)) + Ok(Some(UnixListener::bind(&self.socket_path)?)) } fn ipc_shared() -> Self { From ccb5c11fd2d32bc8711a6e7b5a1116dd84e7168f Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Thu, 5 Mar 2026 15:41:33 +0100 Subject: [PATCH 09/13] fix(sidecar): make named SHM open mode configurable via global hook In thread mode under PHP-FPM, the sidecar runs as a thread inside the master process (possibly root). Named SHM objects (/ddlimiters-*, /ddrc*, /ddcfg*) were created with 0600 (owner-only), preventing PHP worker processes running as a different user (e.g. www-data) from opening them. Add a global SHM_OPEN_MODE atomic in datadog-ipc that defaults to 0600. NamedShmHandle::create() now reads this global instead of hardcoding the mode. Expose ddog_sidecar_set_shm_open_mode() via FFI so the PHP extension can set 0644 before starting the sidecar master listener when running as root. --- datadog-ipc/src/platform/unix/mem_handle.rs | 14 ++++++++++++-- datadog-ipc/src/platform/unix/mod.rs | 2 ++ datadog-sidecar-ffi/src/lib.rs | 6 ++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/datadog-ipc/src/platform/unix/mem_handle.rs b/datadog-ipc/src/platform/unix/mem_handle.rs index f88c1dc9db..cef7c1f7d4 100644 --- a/datadog-ipc/src/platform/unix/mem_handle.rs +++ b/datadog-ipc/src/platform/unix/mem_handle.rs @@ -18,7 +18,7 @@ use std::io; use std::num::NonZeroUsize; use std::os::fd::AsFd; use std::os::unix::fs::MetadataExt; -use std::sync::atomic::{AtomicI32, Ordering}; +use std::sync::atomic::{AtomicI32, AtomicU32, Ordering}; fn fallback_path(name: &P) -> nix::Result { name.with_nix_path(|cstr| { @@ -95,6 +95,16 @@ pub(crate) fn munmap_handle(mapped: &mut MappedMem) { static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0); +static SHM_OPEN_MODE: AtomicU32 = AtomicU32::new(0o600); + +pub fn set_shm_open_mode(mode: u32) { + SHM_OPEN_MODE.store(mode, Ordering::Relaxed); +} + +fn shm_open_mode() -> Mode { + Mode::from_bits_truncate(SHM_OPEN_MODE.load(Ordering::Relaxed)) +} + impl ShmHandle { #[cfg(target_os = "linux")] fn open_anon_shm(name: &str) -> anyhow::Result { @@ -139,7 +149,7 @@ impl ShmHandle { impl NamedShmHandle { pub fn create(path: CString, size: usize) -> io::Result { - Self::create_mode(path, size, Mode::S_IWUSR | Mode::S_IRUSR) + Self::create_mode(path, size, shm_open_mode()) } pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result { diff --git a/datadog-ipc/src/platform/unix/mod.rs b/datadog-ipc/src/platform/unix/mod.rs index eef432d5cc..80bc27e181 100644 --- a/datadog-ipc/src/platform/unix/mod.rs +++ b/datadog-ipc/src/platform/unix/mod.rs @@ -19,6 +19,8 @@ pub(crate) use mem_handle_macos::*; #[cfg(not(target_os = "macos"))] mod mem_handle; #[cfg(not(target_os = "macos"))] +pub use mem_handle::set_shm_open_mode; +#[cfg(not(target_os = "macos"))] pub(crate) use mem_handle::*; #[no_mangle] diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 8a27159773..464cdcaa70 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -322,6 +322,12 @@ pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { MaybeError::None } +#[no_mangle] +#[cfg(unix)] +pub extern "C" fn ddog_sidecar_set_shm_open_mode(mode: u32) { + datadog_ipc::platform::set_shm_open_mode(mode); +} + #[no_mangle] pub extern "C" fn ddog_sidecar_connect_worker( pid: i32, From d7d6fcd806ee0470e587aee927ff1b75a098c41d Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 6 Mar 2026 14:18:16 +0100 Subject: [PATCH 10/13] fix(sidecar): add set_shm_open_mode for macOS (mem_handle_macos.rs) --- datadog-ipc/src/platform/unix/mem_handle_macos.rs | 14 ++++++++++++-- datadog-ipc/src/platform/unix/mod.rs | 2 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/datadog-ipc/src/platform/unix/mem_handle_macos.rs b/datadog-ipc/src/platform/unix/mem_handle_macos.rs index 27fe0006ff..cf9a4b0d7a 100644 --- a/datadog-ipc/src/platform/unix/mem_handle_macos.rs +++ b/datadog-ipc/src/platform/unix/mem_handle_macos.rs @@ -14,7 +14,7 @@ use std::ffi::{CStr, CString}; use std::io; use std::num::NonZeroUsize; use std::os::fd::{AsFd, OwnedFd}; -use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicI32, AtomicU32, AtomicUsize, Ordering}; const MAPPING_MAX_SIZE: usize = 1 << 17; // 128 MiB ought to be enough for everybody? const NOT_COMMITTED: usize = 1 << (usize::BITS - 1); @@ -69,6 +69,16 @@ pub(crate) fn munmap_handle(mapped: &MappedMem) { static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0); +static SHM_OPEN_MODE: AtomicU32 = AtomicU32::new(0o600); + +pub fn set_shm_open_mode(mode: u32) { + SHM_OPEN_MODE.store(mode, Ordering::Relaxed); +} + +fn shm_open_mode() -> Mode { + Mode::from_bits_truncate(SHM_OPEN_MODE.load(Ordering::Relaxed)) +} + impl ShmHandle { pub fn new(size: usize) -> anyhow::Result { let path = format!( @@ -100,7 +110,7 @@ fn path_slice(path: &CStr) -> &[u8] { impl NamedShmHandle { pub fn create(path: CString, size: usize) -> io::Result { - Self::create_mode(path, size, Mode::S_IWUSR | Mode::S_IRUSR) + Self::create_mode(path, size, shm_open_mode()) } pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result { diff --git a/datadog-ipc/src/platform/unix/mod.rs b/datadog-ipc/src/platform/unix/mod.rs index 80bc27e181..93beafd17a 100644 --- a/datadog-ipc/src/platform/unix/mod.rs +++ b/datadog-ipc/src/platform/unix/mod.rs @@ -20,6 +20,8 @@ pub(crate) use mem_handle_macos::*; mod mem_handle; #[cfg(not(target_os = "macos"))] pub use mem_handle::set_shm_open_mode; +#[cfg(target_os = "macos")] +pub use mem_handle_macos::set_shm_open_mode; #[cfg(not(target_os = "macos"))] pub(crate) use mem_handle::*; From 69a98fbd1a665f781fbba5e830b8bd4e233f8906 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Fri, 6 Mar 2026 14:34:48 +0100 Subject: [PATCH 11/13] fix(sidecar): cast mode_t to u16 on macOS in shm_open_mode --- datadog-ipc/src/platform/unix/mem_handle_macos.rs | 2 +- datadog-ipc/src/platform/unix/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/datadog-ipc/src/platform/unix/mem_handle_macos.rs b/datadog-ipc/src/platform/unix/mem_handle_macos.rs index cf9a4b0d7a..fbe2f05bbe 100644 --- a/datadog-ipc/src/platform/unix/mem_handle_macos.rs +++ b/datadog-ipc/src/platform/unix/mem_handle_macos.rs @@ -76,7 +76,7 @@ pub fn set_shm_open_mode(mode: u32) { } fn shm_open_mode() -> Mode { - Mode::from_bits_truncate(SHM_OPEN_MODE.load(Ordering::Relaxed)) + Mode::from_bits_truncate(SHM_OPEN_MODE.load(Ordering::Relaxed) as u16) } impl ShmHandle { diff --git a/datadog-ipc/src/platform/unix/mod.rs b/datadog-ipc/src/platform/unix/mod.rs index 93beafd17a..f1350d8abe 100644 --- a/datadog-ipc/src/platform/unix/mod.rs +++ b/datadog-ipc/src/platform/unix/mod.rs @@ -20,10 +20,10 @@ pub(crate) use mem_handle_macos::*; mod mem_handle; #[cfg(not(target_os = "macos"))] pub use mem_handle::set_shm_open_mode; -#[cfg(target_os = "macos")] -pub use mem_handle_macos::set_shm_open_mode; #[cfg(not(target_os = "macos"))] pub(crate) use mem_handle::*; +#[cfg(target_os = "macos")] +pub use mem_handle_macos::set_shm_open_mode; #[no_mangle] #[cfg(polyfill_glibc_memfd)] From 7a13166dccde82c3aec08afdf89c93c8cb0f56c0 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Mon, 9 Mar 2026 16:01:40 +0100 Subject: [PATCH 12/13] fix(sidecar): use fchown via SO_PEERCRED to grant cross-user SHM access In PHP-FPM thread mode the master process runs as root and spawns worker processes as www-data. Named POSIX SHM objects were created with 0600 (owner-only), so workers could not open them for writing. The correct fix is to fchown() the SHM to the worker's UID after creation. The worker UID is obtained via SO_PEERCRED (peer_cred()) on the first accepted Unix socket connection in the thread listener, before the SHM lazy-lock is initialized. Changes: - Replace set_shm_open_mode/SHM_OPEN_MODE with set_shm_owner_uid/SHM_OWNER_UID in both mem_handle.rs and mem_handle_macos.rs - Call fchown(fd, worker_uid, None) in NamedShmHandle::create_mode() when SHM_OWNER_UID is set; restore default mode to S_IWUSR|S_IRUSR (0600) - Add nix "user" feature to datadog-ipc for fchown/Uid support - Add init_shm_eagerly field to MainLoopConfig (default true); thread mode sets it false to defer SHM initialization to first connection - In accept_socket_loop_thread: use FIRST_CONNECTION_INIT OnceLock to call set_shm_owner_uid(peer_uid) then init SHM_LIMITER exactly once on first worker connection - Remove ddog_sidecar_set_shm_open_mode FFI function (no longer needed) --- datadog-ipc/Cargo.toml | 2 +- datadog-ipc/src/platform/unix/mem_handle.rs | 26 ++++++++++++++----- .../src/platform/unix/mem_handle_macos.rs | 25 +++++++++++++----- datadog-ipc/src/platform/unix/mod.rs | 4 +-- datadog-sidecar-ffi/src/lib.rs | 6 ----- datadog-sidecar/src/entry.rs | 9 +++++-- datadog-sidecar/src/setup/thread_listener.rs | 15 +++++++++++ 7 files changed, 62 insertions(+), 25 deletions(-) diff --git a/datadog-ipc/Cargo.toml b/datadog-ipc/Cargo.toml index f02fd7162e..e4c40f2c65 100644 --- a/datadog-ipc/Cargo.toml +++ b/datadog-ipc/Cargo.toml @@ -43,7 +43,7 @@ tracing-subscriber = { version = "0.3.22" } spawn_worker = { path = "../spawn_worker" } [target.'cfg(not(windows))'.dependencies] -nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket"] } +nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket", "user"] } sendfd = { version = "0.4", features = ["tokio"] } tokio = { version = "1.23", features = ["sync", "io-util", "signal"] } diff --git a/datadog-ipc/src/platform/unix/mem_handle.rs b/datadog-ipc/src/platform/unix/mem_handle.rs index cef7c1f7d4..cb94fb058a 100644 --- a/datadog-ipc/src/platform/unix/mem_handle.rs +++ b/datadog-ipc/src/platform/unix/mem_handle.rs @@ -10,7 +10,7 @@ use nix::errno::Errno; use nix::fcntl::{open, OFlag}; use nix::sys::mman::{self, mmap, munmap, MapFlags, ProtFlags}; use nix::sys::stat::Mode; -use nix::unistd::{ftruncate, mkdir, unlink}; +use nix::unistd::{fchown, ftruncate, mkdir, unlink, Uid}; use nix::NixPath; use std::ffi::{CStr, CString}; use std::fs::File; @@ -18,8 +18,12 @@ use std::io; use std::num::NonZeroUsize; use std::os::fd::AsFd; use std::os::unix::fs::MetadataExt; +use std::os::unix::io::AsRawFd; use std::sync::atomic::{AtomicI32, AtomicU32, Ordering}; +// Sentinel value meaning "no owner UID override" +const NO_OWNER_UID: u32 = u32::MAX; + fn fallback_path(name: &P) -> nix::Result { name.with_nix_path(|cstr| { let mut path = "/tmp/libdatadog".to_string().into_bytes(); @@ -95,14 +99,19 @@ pub(crate) fn munmap_handle(mapped: &mut MappedMem) { static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0); -static SHM_OPEN_MODE: AtomicU32 = AtomicU32::new(0o600); +static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID); -pub fn set_shm_open_mode(mode: u32) { - SHM_OPEN_MODE.store(mode, Ordering::Relaxed); +pub fn set_shm_owner_uid(uid: u32) { + SHM_OWNER_UID.store(uid, Ordering::Relaxed); } -fn shm_open_mode() -> Mode { - Mode::from_bits_truncate(SHM_OPEN_MODE.load(Ordering::Relaxed)) +fn shm_owner_uid() -> Option { + let uid = SHM_OWNER_UID.load(Ordering::Relaxed); + if uid == NO_OWNER_UID { + None + } else { + Some(uid) + } } impl ShmHandle { @@ -149,12 +158,15 @@ impl ShmHandle { impl NamedShmHandle { pub fn create(path: CString, size: usize) -> io::Result { - Self::create_mode(path, size, shm_open_mode()) + Self::create_mode(path, size, Mode::S_IWUSR | Mode::S_IRUSR) } pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result { let fd = shm_open(path.as_bytes(), OFlag::O_CREAT | OFlag::O_RDWR, mode)?; ftruncate(&fd, size as off_t)?; + if let Some(uid) = shm_owner_uid() { + let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None); + } Self::new(fd, Some(path), size) } diff --git a/datadog-ipc/src/platform/unix/mem_handle_macos.rs b/datadog-ipc/src/platform/unix/mem_handle_macos.rs index fbe2f05bbe..baf444a0b8 100644 --- a/datadog-ipc/src/platform/unix/mem_handle_macos.rs +++ b/datadog-ipc/src/platform/unix/mem_handle_macos.rs @@ -9,11 +9,12 @@ use nix::errno::Errno; use nix::fcntl::OFlag; use nix::sys::mman::{mmap, munmap, shm_open, shm_unlink, MapFlags, ProtFlags}; use nix::sys::stat::Mode; -use nix::unistd::ftruncate; +use nix::unistd::{fchown, ftruncate, Uid}; use std::ffi::{CStr, CString}; use std::io; use std::num::NonZeroUsize; use std::os::fd::{AsFd, OwnedFd}; +use std::os::unix::io::AsRawFd; use std::sync::atomic::{AtomicI32, AtomicU32, AtomicUsize, Ordering}; const MAPPING_MAX_SIZE: usize = 1 << 17; // 128 MiB ought to be enough for everybody? @@ -69,14 +70,21 @@ pub(crate) fn munmap_handle(mapped: &MappedMem) { static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0); -static SHM_OPEN_MODE: AtomicU32 = AtomicU32::new(0o600); +const NO_OWNER_UID: u32 = u32::MAX; -pub fn set_shm_open_mode(mode: u32) { - SHM_OPEN_MODE.store(mode, Ordering::Relaxed); +static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID); + +pub fn set_shm_owner_uid(uid: u32) { + SHM_OWNER_UID.store(uid, Ordering::Relaxed); } -fn shm_open_mode() -> Mode { - Mode::from_bits_truncate(SHM_OPEN_MODE.load(Ordering::Relaxed) as u16) +fn shm_owner_uid() -> Option { + let uid = SHM_OWNER_UID.load(Ordering::Relaxed); + if uid == NO_OWNER_UID { + None + } else { + Some(uid) + } } impl ShmHandle { @@ -110,7 +118,7 @@ fn path_slice(path: &CStr) -> &[u8] { impl NamedShmHandle { pub fn create(path: CString, size: usize) -> io::Result { - Self::create_mode(path, size, shm_open_mode()) + Self::create_mode(path, size, Mode::S_IWUSR | Mode::S_IRUSR) } pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result { @@ -122,6 +130,9 @@ impl NamedShmHandle { truncate?; } } + if let Some(uid) = shm_owner_uid() { + let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None); + } Self::new(fd, Some(path), size) } diff --git a/datadog-ipc/src/platform/unix/mod.rs b/datadog-ipc/src/platform/unix/mod.rs index f1350d8abe..5d219adfed 100644 --- a/datadog-ipc/src/platform/unix/mod.rs +++ b/datadog-ipc/src/platform/unix/mod.rs @@ -19,11 +19,11 @@ pub(crate) use mem_handle_macos::*; #[cfg(not(target_os = "macos"))] mod mem_handle; #[cfg(not(target_os = "macos"))] -pub use mem_handle::set_shm_open_mode; +pub use mem_handle::set_shm_owner_uid; #[cfg(not(target_os = "macos"))] pub(crate) use mem_handle::*; #[cfg(target_os = "macos")] -pub use mem_handle_macos::set_shm_open_mode; +pub use mem_handle_macos::set_shm_owner_uid; #[no_mangle] #[cfg(polyfill_glibc_memfd)] diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 464cdcaa70..8a27159773 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -322,12 +322,6 @@ pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { MaybeError::None } -#[no_mangle] -#[cfg(unix)] -pub extern "C" fn ddog_sidecar_set_shm_open_mode(mode: u32) { - datadog_ipc::platform::set_shm_open_mode(mode); -} - #[no_mangle] pub extern "C" fn ddog_sidecar_connect_worker( pid: i32, diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index 14c5594fe4..0090727def 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -37,6 +37,9 @@ pub struct MainLoopConfig { pub enable_ctrl_c_handler: bool, pub enable_crashtracker: bool, pub external_shutdown_rx: Option>, + /// Set to false in thread mode so the worker's UID can be obtained on the + /// first connection and used to fchown the SHM. + pub init_shm_eagerly: bool, } impl Default for MainLoopConfig { @@ -45,6 +48,7 @@ impl Default for MainLoopConfig { enable_ctrl_c_handler: true, enable_crashtracker: true, external_shutdown_rx: None, + init_shm_eagerly: true, } } } @@ -125,8 +129,9 @@ where }); } - // Init. Early, before we start listening. - drop(SHM_LIMITER.lock()); + if loop_config.init_shm_eagerly { + drop(SHM_LIMITER.lock()); + } let server = SidecarServer::default(); diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs index 62d132a44d..2bed4e417e 100644 --- a/datadog-sidecar/src/setup/thread_listener.rs +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -16,10 +16,14 @@ use crate::setup::AbstractUnixSocketLiaison; use crate::setup::Liaison; #[cfg(not(target_os = "linux"))] use crate::setup::SharedDirLiaison; +use crate::tracer::SHM_LIMITER; use datadog_ipc::transport::blocking::BlockingTransport; static MASTER_LISTENER: OnceLock>> = OnceLock::new(); +/// Ensures first-connection SHM initialization runs exactly once across all threads. +static FIRST_CONNECTION_INIT: OnceLock<()> = OnceLock::new(); + pub struct MasterListener { shutdown_tx: Option>, thread_handle: Option>, @@ -139,6 +143,15 @@ async fn accept_socket_loop_thread( match accept { Ok((socket, _)) => { info!("Accepted new worker connection"); + // On the first connection, get the worker's UID and + // fchown the SHM to that UID so cross-user access works when + // the master runs as root and workers run as a different user. + FIRST_CONNECTION_INIT.get_or_init(|| { + if let Ok(cred) = socket.peer_cred() { + datadog_ipc::platform::set_shm_owner_uid(cred.uid()); + } + drop(SHM_LIMITER.lock()); + }); handler(socket); } Err(e) => { @@ -179,6 +192,8 @@ fn run_listener(pid: u32, _config: Config, shutdown_rx: oneshot::Receiver<()>) - enable_ctrl_c_handler: false, enable_crashtracker: false, external_shutdown_rx: None, + // Defer SHM init to first connection so we can fchown using the worker's UID. + init_shm_eagerly: false, }; let runtime = tokio::runtime::Builder::new_current_thread() From a66093ebd8afaf96613e75f6366858d10e560796 Mon Sep 17 00:00:00 2001 From: Alexandre Rulleau Date: Mon, 9 Mar 2026 17:28:30 +0100 Subject: [PATCH 13/13] fix(sidecar): add missing init_shm_eagerly field in Windows thread listener --- datadog-sidecar/src/setup/thread_listener_windows.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datadog-sidecar/src/setup/thread_listener_windows.rs b/datadog-sidecar/src/setup/thread_listener_windows.rs index 2414ff0a14..40bf7e4430 100644 --- a/datadog-sidecar/src/setup/thread_listener_windows.rs +++ b/datadog-sidecar/src/setup/thread_listener_windows.rs @@ -169,6 +169,7 @@ fn run_listener_windows(pipe_name: String, shutdown_rx: oneshot::Receiver<()>) - enable_ctrl_c_handler: false, enable_crashtracker: false, external_shutdown_rx: None, + init_shm_eagerly: true, }; crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config)