diff --git a/datadog-ipc/Cargo.toml b/datadog-ipc/Cargo.toml index f02fd7162e..e4c40f2c65 100644 --- a/datadog-ipc/Cargo.toml +++ b/datadog-ipc/Cargo.toml @@ -43,7 +43,7 @@ tracing-subscriber = { version = "0.3.22" } spawn_worker = { path = "../spawn_worker" } [target.'cfg(not(windows))'.dependencies] -nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket"] } +nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket", "user"] } sendfd = { version = "0.4", features = ["tokio"] } tokio = { version = "1.23", features = ["sync", "io-util", "signal"] } diff --git a/datadog-ipc/src/platform/unix/mem_handle.rs b/datadog-ipc/src/platform/unix/mem_handle.rs index f88c1dc9db..cb94fb058a 100644 --- a/datadog-ipc/src/platform/unix/mem_handle.rs +++ b/datadog-ipc/src/platform/unix/mem_handle.rs @@ -10,7 +10,7 @@ use nix::errno::Errno; use nix::fcntl::{open, OFlag}; use nix::sys::mman::{self, mmap, munmap, MapFlags, ProtFlags}; use nix::sys::stat::Mode; -use nix::unistd::{ftruncate, mkdir, unlink}; +use nix::unistd::{fchown, ftruncate, mkdir, unlink, Uid}; use nix::NixPath; use std::ffi::{CStr, CString}; use std::fs::File; @@ -18,7 +18,11 @@ use std::io; use std::num::NonZeroUsize; use std::os::fd::AsFd; use std::os::unix::fs::MetadataExt; -use std::sync::atomic::{AtomicI32, Ordering}; +use std::os::unix::io::AsRawFd; +use std::sync::atomic::{AtomicI32, AtomicU32, Ordering}; + +// Sentinel value meaning "no owner UID override" +const NO_OWNER_UID: u32 = u32::MAX; fn fallback_path(name: &P) -> nix::Result { name.with_nix_path(|cstr| { @@ -95,6 +99,21 @@ pub(crate) fn munmap_handle(mapped: &mut MappedMem) { static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0); +static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID); + +pub fn set_shm_owner_uid(uid: u32) { + SHM_OWNER_UID.store(uid, Ordering::Relaxed); +} + +fn shm_owner_uid() -> Option { + let uid = SHM_OWNER_UID.load(Ordering::Relaxed); + if uid == NO_OWNER_UID { + None + } else { + Some(uid) + } +} + impl ShmHandle { #[cfg(target_os = "linux")] fn open_anon_shm(name: &str) -> anyhow::Result { @@ -145,6 +164,9 @@ impl NamedShmHandle { pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result { let fd = shm_open(path.as_bytes(), OFlag::O_CREAT | OFlag::O_RDWR, mode)?; ftruncate(&fd, size as off_t)?; + if let Some(uid) = shm_owner_uid() { + let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None); + } Self::new(fd, Some(path), size) } diff --git a/datadog-ipc/src/platform/unix/mem_handle_macos.rs b/datadog-ipc/src/platform/unix/mem_handle_macos.rs index 27fe0006ff..baf444a0b8 100644 --- a/datadog-ipc/src/platform/unix/mem_handle_macos.rs +++ b/datadog-ipc/src/platform/unix/mem_handle_macos.rs @@ -9,12 +9,13 @@ use nix::errno::Errno; use nix::fcntl::OFlag; use nix::sys::mman::{mmap, munmap, shm_open, shm_unlink, MapFlags, ProtFlags}; use nix::sys::stat::Mode; -use nix::unistd::ftruncate; +use nix::unistd::{fchown, ftruncate, Uid}; use std::ffi::{CStr, CString}; use std::io; use std::num::NonZeroUsize; use std::os::fd::{AsFd, OwnedFd}; -use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering}; +use std::os::unix::io::AsRawFd; +use std::sync::atomic::{AtomicI32, AtomicU32, AtomicUsize, Ordering}; const MAPPING_MAX_SIZE: usize = 1 << 17; // 128 MiB ought to be enough for everybody? const NOT_COMMITTED: usize = 1 << (usize::BITS - 1); @@ -69,6 +70,23 @@ pub(crate) fn munmap_handle(mapped: &MappedMem) { static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0); +const NO_OWNER_UID: u32 = u32::MAX; + +static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID); + +pub fn set_shm_owner_uid(uid: u32) { + SHM_OWNER_UID.store(uid, Ordering::Relaxed); +} + +fn shm_owner_uid() -> Option { + let uid = SHM_OWNER_UID.load(Ordering::Relaxed); + if uid == NO_OWNER_UID { + None + } else { + Some(uid) + } +} + impl ShmHandle { pub fn new(size: usize) -> anyhow::Result { let path = format!( @@ -112,6 +130,9 @@ impl NamedShmHandle { truncate?; } } + if let Some(uid) = shm_owner_uid() { + let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None); + } Self::new(fd, Some(path), size) } diff --git a/datadog-ipc/src/platform/unix/mod.rs b/datadog-ipc/src/platform/unix/mod.rs index eef432d5cc..5d219adfed 100644 --- a/datadog-ipc/src/platform/unix/mod.rs +++ b/datadog-ipc/src/platform/unix/mod.rs @@ -19,7 +19,11 @@ pub(crate) use mem_handle_macos::*; #[cfg(not(target_os = "macos"))] mod mem_handle; #[cfg(not(target_os = "macos"))] +pub use mem_handle::set_shm_owner_uid; +#[cfg(not(target_os = "macos"))] pub(crate) use mem_handle::*; +#[cfg(target_os = "macos")] +pub use mem_handle_macos::set_shm_owner_uid; #[no_mangle] #[cfg(polyfill_glibc_memfd)] diff --git a/datadog-sidecar-ffi/src/lib.rs b/datadog-sidecar-ffi/src/lib.rs index 78bab1d2e6..8a27159773 100644 --- a/datadog-sidecar-ffi/src/lib.rs +++ b/datadog-sidecar-ffi/src/lib.rs @@ -60,6 +60,8 @@ use std::slice; use std::sync::Arc; use std::time::Duration; +use datadog_sidecar::setup::{connect_to_master, MasterListener}; + #[no_mangle] #[cfg(target_os = "windows")] pub extern "C" fn ddog_setup_crashtracking( @@ -310,6 +312,46 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) - MaybeError::None } +#[no_mangle] +pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError { + let cfg = datadog_sidecar::config::FromEnv::config(); + #[cfg(unix)] + datadog_sidecar::set_sidecar_master_pid(pid as u32); + try_c!(MasterListener::start(pid, cfg)); + + MaybeError::None +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_connect_worker( + pid: i32, + connection: &mut *mut SidecarTransport, +) -> MaybeError { + let transport = try_c!(connect_to_master(pid)); + *connection = Box::into_raw(transport); + + MaybeError::None +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError { + try_c!(MasterListener::shutdown()); + + MaybeError::None +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool { + MasterListener::is_active(pid) +} + +#[no_mangle] +pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError { + try_c!(MasterListener::clear_inherited_state()); + + MaybeError::None +} + #[no_mangle] pub extern "C" fn ddog_sidecar_ping(transport: &mut Box) -> MaybeError { try_c!(blocking::ping(transport)); diff --git a/datadog-sidecar/src/entry.rs b/datadog-sidecar/src/entry.rs index 218ec23252..0090727def 100644 --- a/datadog-sidecar/src/entry.rs +++ b/datadog-sidecar/src/entry.rs @@ -15,7 +15,7 @@ use std::{ }, time::{Duration, Instant}, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; #[cfg(unix)] use crate::crashtracker::crashtracker_unix_socket_path; @@ -32,7 +32,32 @@ use crate::tracer::SHM_LIMITER; use crate::watchdog::Watchdog; use crate::{ddog_daemon_entry_point, setup_daemon_process}; -async fn main_loop(listener: L, cancel: Arc) -> io::Result<()> +/// Configuration for main_loop behavior +pub struct MainLoopConfig { + pub enable_ctrl_c_handler: bool, + pub enable_crashtracker: bool, + pub external_shutdown_rx: Option>, + /// Set to false in thread mode so the worker's UID can be obtained on the + /// first connection and used to fchown the SHM. + pub init_shm_eagerly: bool, +} + +impl Default for MainLoopConfig { + fn default() -> Self { + Self { + enable_ctrl_c_handler: true, + enable_crashtracker: true, + external_shutdown_rx: None, + init_shm_eagerly: true, + } + } +} + +pub async fn main_loop( + listener: L, + cancel: Arc, + loop_config: MainLoopConfig, +) -> io::Result<()> where L: FnOnce(Box) -> Fut, Fut: Future>, @@ -64,32 +89,49 @@ where } }); - tokio::spawn(async move { - if let Err(err) = tokio::signal::ctrl_c().await { - tracing::error!("Error setting up signal handler {}", err); - } - tracing::info!("Received Ctrl-C Signal, shutting down"); - cancel(); - }); + if let Some(shutdown_rx) = loop_config.external_shutdown_rx { + let cancel = cancel.clone(); + tokio::spawn(async move { + let _ = shutdown_rx.await; + tracing::info!("External shutdown signal received"); + cancel(); + }); + } + + if loop_config.enable_ctrl_c_handler { + let cancel = cancel.clone(); + tokio::spawn(async move { + if let Err(err) = tokio::signal::ctrl_c().await { + tracing::error!("Error setting up signal handler {}", err); + } + tracing::info!("Received Ctrl-C Signal, shutting down"); + cancel(); + }); + } #[cfg(unix)] - tokio::spawn(async move { - let socket_path = crashtracker_unix_socket_path(); - match libdd_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default()) - { - Ok(listener) => loop { - if let Err(e) = - libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener).await - { - tracing::warn!("Got error while receiving crash report: {e}"); - } - }, - Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), - } - }); + if loop_config.enable_crashtracker { + tokio::spawn(async move { + let socket_path = crashtracker_unix_socket_path(); + match libdd_crashtracker::get_receiver_unix_socket( + socket_path.to_str().unwrap_or_default(), + ) { + Ok(listener) => loop { + if let Err(e) = + libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener) + .await + { + tracing::warn!("Got error while receiving crash report: {e}"); + } + }, + Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"), + } + }); + } - // Init. Early, before we start listening. - drop(SHM_LIMITER.lock()); + if loop_config.init_shm_eagerly { + drop(SHM_LIMITER.lock()); + } let server = SidecarServer::default(); @@ -143,6 +185,19 @@ where } pub fn enter_listener_loop(acquire_listener: F) -> anyhow::Result<()> +where + F: FnOnce() -> io::Result<(L, C)>, + L: FnOnce(Box) -> Fut, + Fut: Future>, + C: Fn() + Sync + Send + 'static, +{ + enter_listener_loop_with_config(acquire_listener, MainLoopConfig::default()) +} + +pub fn enter_listener_loop_with_config( + acquire_listener: F, + loop_config: MainLoopConfig, +) -> anyhow::Result<()> where F: FnOnce() -> io::Result<(L, C)>, L: FnOnce(Box) -> Fut, @@ -159,7 +214,7 @@ where let (listener, cancel) = acquire_listener()?; runtime - .block_on(main_loop(listener, Arc::new(cancel))) + .block_on(main_loop(listener, Arc::new(cancel), loop_config)) .map_err(|e| e.into()) } diff --git a/datadog-sidecar/src/setup/mod.rs b/datadog-sidecar/src/setup/mod.rs index 07c837aab0..b82587b217 100644 --- a/datadog-sidecar/src/setup/mod.rs +++ b/datadog-sidecar/src/setup/mod.rs @@ -12,6 +12,18 @@ mod windows; #[cfg(windows)] pub use self::windows::*; +// Thread-based listener module (Unix) +#[cfg(unix)] +pub mod thread_listener; +#[cfg(unix)] +pub use thread_listener::{connect_to_master, MasterListener}; + +// Thread-based listener module (Windows) +#[cfg(windows)] +pub mod thread_listener_windows; +#[cfg(windows)] +pub use thread_listener_windows::{connect_to_master, MasterListener}; + use datadog_ipc::platform::Channel; use std::io; diff --git a/datadog-sidecar/src/setup/thread_listener.rs b/datadog-sidecar/src/setup/thread_listener.rs new file mode 100644 index 0000000000..2bed4e417e --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener.rs @@ -0,0 +1,247 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::io; +use std::sync::{Arc, Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; +use tokio::net::UnixListener; +use tokio::sync::oneshot; +use tracing::{error, info}; + +use crate::config::Config; +use crate::entry::MainLoopConfig; +use crate::service::blocking::SidecarTransport; +#[cfg(target_os = "linux")] +use crate::setup::AbstractUnixSocketLiaison; +use crate::setup::Liaison; +#[cfg(not(target_os = "linux"))] +use crate::setup::SharedDirLiaison; +use crate::tracer::SHM_LIMITER; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +/// Ensures first-connection SHM initialization runs exactly once across all threads. +static FIRST_CONNECTION_INIT: OnceLock<()> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: Option>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread. + /// + /// This spawns a new OS thread that calls enter_listener_loop_with_config + /// to create a Tokio runtime and listen for worker connections. + /// Only one listener can be active per process. + pub fn start(pid: i32, config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + if let Err(e) = run_listener(pid as u32, config, shutdown_rx) { + error!("Listener thread error: {}", e); + } + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx: Some(shutdown_tx), + thread_handle: Some(thread_handle), + pid, + }); + + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + // Signal shutdown by sending to or dropping the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } + + if let Some(handle) = master.thread_handle.take() { + handle + .join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + /// + /// Used for fork detection: child processes inherit the listener state + /// but don't own the actual thread. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state after fork. + /// + /// Child processes must call this to prevent attempting to use the + /// parent's listener thread, which doesn't exist in the child. + pub fn clear_inherited_state() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + info!("Clearing inherited master listener state in child process"); + *listener_guard = None; + } + + Ok(()) + } +} + +/// Accept connections in a loop for thread mode. +async fn accept_socket_loop_thread( + listener: UnixListener, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in thread listener"); + break; + } + accept = listener.accept() => { + match accept { + Ok((socket, _)) => { + info!("Accepted new worker connection"); + // On the first connection, get the worker's UID and + // fchown the SHM to that UID so cross-user access works when + // the master runs as root and workers run as a different user. + FIRST_CONNECTION_INIT.get_or_init(|| { + if let Ok(cred) = socket.peer_cred() { + datadog_ipc::platform::set_shm_owner_uid(cred.uid()); + } + drop(SHM_LIMITER.lock()); + }); + handler(socket); + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + break; + } + } + } + } + } + Ok(()) +} + +/// Entry point for thread listener. +/// +/// Uses a single-threaded Tokio runtime (current_thread) to avoid spawning extra OS +/// threads. A multi-thread runtime would leave worker threads visible to LSan/ASAN at +/// process exit, causing "Running thread was not suspended" warnings. With +/// current_thread all async work runs on this OS thread; no other threads are created. +fn run_listener(pid: u32, _config: Config, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating IPC server"); + + #[cfg(target_os = "linux")] + let liaison = AbstractUnixSocketLiaison::ipc_for_pid(pid); + #[cfg(not(target_os = "linux"))] + let liaison = SharedDirLiaison::ipc_for_pid(pid); + + let std_listener = liaison + .attempt_listen()? + .ok_or_else(|| io::Error::other("Failed to create IPC listener"))?; + + std_listener.set_nonblocking(true)?; + + info!("IPC server listening for worker connections"); + + let cancel = || {}; + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + // Defer SHM init to first connection so we can fchown using the worker's UID. + init_shm_eagerly: false, + }; + + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .map_err(|e| io::Error::other(format!("Failed building tokio runtime: {}", e)))?; + + // UnixListener::from_std() requires a Tokio reactor context, so it must be + // called inside block_on rather than before the runtime is built. + runtime + .block_on(async { + let listener = UnixListener::from_std(std_listener)?; + crate::entry::main_loop( + move |handler| accept_socket_loop_thread(listener, handler, shutdown_rx), + Arc::new(cancel), + loop_config, + ) + .await + }) + .map_err(|e| io::Error::other(format!("Thread listener failed: {}", e)))?; + + info!("Listener thread exiting"); + Ok(()) +} + +/// Connect to the master listener as a worker. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener (PID {})", pid); + + // Use the same pid-specific socket path as the master listener. + #[cfg(target_os = "linux")] + let liaison = AbstractUnixSocketLiaison::ipc_for_pid(pid as u32); + #[cfg(not(target_os = "linux"))] + let liaison = SharedDirLiaison::ipc_for_pid(pid as u32); + + let channel = liaison + .connect_to_server() + .map_err(|e| io::Error::other(format!("Failed to connect to master listener: {}", e)))?; + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +} diff --git a/datadog-sidecar/src/setup/thread_listener_windows.rs b/datadog-sidecar/src/setup/thread_listener_windows.rs new file mode 100644 index 0000000000..40bf7e4430 --- /dev/null +++ b/datadog-sidecar/src/setup/thread_listener_windows.rs @@ -0,0 +1,212 @@ +// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::io; +use std::os::windows::io::{AsRawHandle, FromRawHandle, OwnedHandle}; +use std::sync::{Mutex, OnceLock}; +use std::thread::{self, JoinHandle}; +use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions}; +use tokio::sync::oneshot; +use tracing::{error, info}; + +use crate::config::Config; +use crate::entry::MainLoopConfig; +use crate::service::blocking::SidecarTransport; +use datadog_ipc::platform::metadata::ProcessHandle; +use datadog_ipc::platform::Channel; +use datadog_ipc::transport::blocking::BlockingTransport; + +static MASTER_LISTENER: OnceLock>> = OnceLock::new(); + +pub struct MasterListener { + shutdown_tx: Option>, + thread_handle: Option>, + pid: i32, +} + +impl MasterListener { + /// Start the master listener thread using Windows Named Pipes. + /// + /// This spawns a new OS thread that creates a named pipe server + /// to listen for worker connections. Only one listener can be active per process. + pub fn start(pid: i32, _config: Config) -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if listener_guard.is_some() { + return Err(io::Error::other("Master listener is already running")); + } + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + + let thread_handle = thread::Builder::new() + .name(format!("ddtrace-sidecar-listener-{}", pid)) + .spawn(move || { + if let Err(e) = run_listener_windows(pipe_name, shutdown_rx) { + error!("Listener thread error: {}", e); + } + }) + .map_err(|e| io::Error::other(format!("Failed to spawn listener thread: {}", e)))?; + + *listener_guard = Some(MasterListener { + shutdown_tx: Some(shutdown_tx), + thread_handle: Some(thread_handle), + pid, + }); + + info!("Started Windows named pipe listener (PID {})", pid); + Ok(()) + } + + /// Shutdown the master listener thread. + /// + /// Sends shutdown signal and joins the listener thread. This is blocking + /// and will wait for the thread to exit cleanly. + pub fn shutdown() -> io::Result<()> { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + let mut listener_guard = listener_mutex + .lock() + .map_err(|e| io::Error::other(format!("Failed to acquire listener lock: {}", e)))?; + + if let Some(mut master) = listener_guard.take() { + // Signal shutdown by sending to the oneshot sender + if let Some(tx) = master.shutdown_tx.take() { + let _ = tx.send(()); + } + + if let Some(handle) = master.thread_handle.take() { + handle + .join() + .map_err(|_| io::Error::other("Failed to join listener thread"))?; + } + + info!("Master listener thread shut down successfully"); + Ok(()) + } else { + Err(io::Error::other("No master listener is running")) + } + } + + /// Check if the master listener is active for the given PID. + pub fn is_active(pid: i32) -> bool { + let listener_mutex = MASTER_LISTENER.get_or_init(|| Mutex::new(None)); + if let Ok(listener_guard) = listener_mutex.lock() { + listener_guard.as_ref().is_some_and(|l| l.pid == pid) + } else { + false + } + } + + /// Clear inherited listener state. + /// Kept for API compatibility with Unix version. + pub fn clear_inherited_state() -> io::Result<()> { + Ok(()) + } +} + +/// Accept connections in a loop for Windows named pipes. +async fn accept_pipe_loop_windows( + pipe_name: String, + handler: Box, + mut shutdown_rx: oneshot::Receiver<()>, +) -> io::Result<()> { + let mut server = ServerOptions::new() + .first_pipe_instance(true) + .max_instances(254) // Windows allows up to 255 instances + .create(&pipe_name)?; + + info!("Named pipe server created at: {}", pipe_name); + + loop { + tokio::select! { + _ = &mut shutdown_rx => { + info!("Shutdown signal received in Windows pipe listener"); + break; + } + result = server.connect() => { + match result { + Ok(_) => { + info!("Accepted new worker connection on named pipe"); + handler(server); + + server = ServerOptions::new() + .create(&pipe_name)?; + } + Err(e) => { + error!("Failed to accept worker connection: {}", e); + match ServerOptions::new().create(&pipe_name) { + Ok(new_server) => server = new_server, + Err(e2) => { + error!("Failed to recover named pipe: {}", e2); + break; + } + } + } + } + } + } + } + Ok(()) +} + +/// Entry point for Windows named pipe listener +fn run_listener_windows(pipe_name: String, shutdown_rx: oneshot::Receiver<()>) -> io::Result<()> { + info!("Listener thread running, creating Windows named pipe server"); + + let acquire_listener = move || { + let cancel = || {}; + let pipe_name_clone = pipe_name.clone(); + Ok(( + move |handler| accept_pipe_loop_windows(pipe_name_clone, handler, shutdown_rx), + cancel, + )) + }; + + let loop_config = MainLoopConfig { + enable_ctrl_c_handler: false, + enable_crashtracker: false, + external_shutdown_rx: None, + init_shm_eagerly: true, + }; + + crate::entry::enter_listener_loop_with_config(acquire_listener, loop_config) + .map_err(|e| io::Error::other(format!("Windows thread listener failed: {}", e)))?; + + info!("Listener thread exiting"); + Ok(()) +} + +/// Connect to the master listener as a worker using Windows Named Pipes. +/// +/// Establishes a connection to the master listener thread for the given PID. +pub fn connect_to_master(pid: i32) -> io::Result> { + info!("Connecting to master listener via named pipe (PID {})", pid); + + let pipe_name = format!(r"\\.\pipe\ddtrace_sidecar_{}", pid); + + let client = ClientOptions::new().open(&pipe_name)?; + + info!("Connected to named pipe: {}", pipe_name); + + let raw_handle = client.as_raw_handle(); + let owned_handle = unsafe { OwnedHandle::from_raw_handle(raw_handle) }; + + std::mem::forget(client); + + let process_handle = + ProcessHandle::Getter(Box::new(move || Ok(ProcessHandle::Pid(pid as u32)))); + let channel = Channel::from_client_handle_and_pid(owned_handle, process_handle); + + let transport = BlockingTransport::from(channel); + + let sidecar_transport = Box::new(SidecarTransport { + inner: Mutex::new(transport), + reconnect_fn: None, + }); + + info!("Successfully connected to master listener"); + Ok(sidecar_transport) +} diff --git a/datadog-sidecar/src/setup/unix.rs b/datadog-sidecar/src/setup/unix.rs index 589602b9b4..1849164beb 100644 --- a/datadog-sidecar/src/setup/unix.rs +++ b/datadog-sidecar/src/setup/unix.rs @@ -30,13 +30,10 @@ fn ensure_dir_world_writable>(path: P) -> io::Result<()> { } fn ensure_dir_exists>(path: P) -> io::Result<()> { - if path.as_ref().exists() { - return Ok(()); + if !path.as_ref().exists() { + fs::create_dir_all(&path)?; } - - fs::create_dir_all(&path)?; - ensure_dir_world_writable(&path)?; - + let _ = ensure_dir_world_writable(&path); Ok(()) } @@ -99,12 +96,10 @@ impl SharedDirLiaison { primary_sidecar_identifier() ); let base_dir = base_dir.as_ref(); - let socket_path = base_dir - .join(&versioned_socket_basename) - .with_extension(".sock"); + let socket_path = base_dir.join(&versioned_socket_basename); let lock_path = base_dir .join(&versioned_socket_basename) - .with_extension(".sock.lock"); + .with_extension("sock.lock"); Self { socket_path, @@ -115,6 +110,19 @@ impl SharedDirLiaison { pub fn new_default_location() -> Self { Self::new(env::temp_dir().join("libdatadog")) } + + pub fn ipc_for_pid(pid: u32) -> Self { + let base_dir = env::temp_dir().join("libdatadog"); + let versioned_socket_basename = format!("libdd.{}@{}.sock", crate::sidecar_version!(), pid); + let socket_path = base_dir.join(&versioned_socket_basename); + let lock_path = base_dir + .join(&versioned_socket_basename) + .with_extension("sock.lock"); + Self { + socket_path, + lock_path, + } + } } impl Default for SharedDirLiaison { @@ -175,6 +183,16 @@ mod linux { } } + impl AbstractUnixSocketLiaison { + pub fn ipc_for_pid(pid: u32) -> Self { + let path = PathBuf::from(format!( + concat!("libdatadog/", crate::sidecar_version!(), "@{}.sock"), + pid + )); + Self { path } + } + } + impl Default for AbstractUnixSocketLiaison { fn default() -> Self { Self::ipc_shared() diff --git a/datadog-sidecar/src/unix.rs b/datadog-sidecar/src/unix.rs index 61ce695a7f..b728e0ee72 100644 --- a/datadog-sidecar/src/unix.rs +++ b/datadog-sidecar/src/unix.rs @@ -5,6 +5,7 @@ use spawn_worker::{getpid, SpawnWorker, Stdio, TrampolineData}; use std::ffi::CString; use std::os::unix::net::UnixListener as StdUnixListener; +use std::sync::atomic::{AtomicU32, Ordering}; use crate::config::Config; use crate::enter_listener_loop; @@ -132,8 +133,19 @@ pub fn setup_daemon_process( Ok(()) } +static SIDECAR_MASTER_PID: AtomicU32 = AtomicU32::new(0); + +pub fn set_sidecar_master_pid(pid: u32) { + SIDECAR_MASTER_PID.store(pid, Ordering::Relaxed); +} + pub fn primary_sidecar_identifier() -> u32 { - unsafe { libc::geteuid() } + let pid = SIDECAR_MASTER_PID.load(Ordering::Relaxed); + if pid != 0 { + pid + } else { + unsafe { libc::geteuid() } + } } fn maybe_start_appsec() -> bool {