Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datadog-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tracing-subscriber = { version = "0.3.22" }
spawn_worker = { path = "../spawn_worker" }

[target.'cfg(not(windows))'.dependencies]
nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket"] }
nix = { version = "0.29", features = ["fs", "mman", "process", "poll", "socket", "user"] }
sendfd = { version = "0.4", features = ["tokio"] }
tokio = { version = "1.23", features = ["sync", "io-util", "signal"] }

Expand Down
26 changes: 24 additions & 2 deletions datadog-ipc/src/platform/unix/mem_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ use nix::errno::Errno;
use nix::fcntl::{open, OFlag};
use nix::sys::mman::{self, mmap, munmap, MapFlags, ProtFlags};
use nix::sys::stat::Mode;
use nix::unistd::{ftruncate, mkdir, unlink};
use nix::unistd::{fchown, ftruncate, mkdir, unlink, Uid};
use nix::NixPath;
use std::ffi::{CStr, CString};
use std::fs::File;
use std::io;
use std::num::NonZeroUsize;
use std::os::fd::AsFd;
use std::os::unix::fs::MetadataExt;
use std::sync::atomic::{AtomicI32, Ordering};
use std::os::unix::io::AsRawFd;
use std::sync::atomic::{AtomicI32, AtomicU32, Ordering};

// Sentinel value meaning "no owner UID override"
const NO_OWNER_UID: u32 = u32::MAX;

fn fallback_path<P: ?Sized + NixPath>(name: &P) -> nix::Result<CString> {
name.with_nix_path(|cstr| {
Expand Down Expand Up @@ -95,6 +99,21 @@ pub(crate) fn munmap_handle<T: MemoryHandle>(mapped: &mut MappedMem<T>) {

static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0);

static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID);

pub fn set_shm_owner_uid(uid: u32) {
SHM_OWNER_UID.store(uid, Ordering::Relaxed);
}

fn shm_owner_uid() -> Option<u32> {
let uid = SHM_OWNER_UID.load(Ordering::Relaxed);
if uid == NO_OWNER_UID {
None
} else {
Some(uid)
}
}

impl ShmHandle {
#[cfg(target_os = "linux")]
fn open_anon_shm(name: &str) -> anyhow::Result<OwnedFd> {
Expand Down Expand Up @@ -145,6 +164,9 @@ impl NamedShmHandle {
pub fn create_mode(path: CString, size: usize, mode: Mode) -> io::Result<NamedShmHandle> {
let fd = shm_open(path.as_bytes(), OFlag::O_CREAT | OFlag::O_RDWR, mode)?;
ftruncate(&fd, size as off_t)?;
if let Some(uid) = shm_owner_uid() {
let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None);
}
Self::new(fd, Some(path), size)
}

Expand Down
25 changes: 23 additions & 2 deletions datadog-ipc/src/platform/unix/mem_handle_macos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ use nix::errno::Errno;
use nix::fcntl::OFlag;
use nix::sys::mman::{mmap, munmap, shm_open, shm_unlink, MapFlags, ProtFlags};
use nix::sys::stat::Mode;
use nix::unistd::ftruncate;
use nix::unistd::{fchown, ftruncate, Uid};
use std::ffi::{CStr, CString};
use std::io;
use std::num::NonZeroUsize;
use std::os::fd::{AsFd, OwnedFd};
use std::sync::atomic::{AtomicI32, AtomicUsize, Ordering};
use std::os::unix::io::AsRawFd;
use std::sync::atomic::{AtomicI32, AtomicU32, AtomicUsize, Ordering};

const MAPPING_MAX_SIZE: usize = 1 << 17; // 128 MiB ought to be enough for everybody?
const NOT_COMMITTED: usize = 1 << (usize::BITS - 1);
Expand Down Expand Up @@ -69,6 +70,23 @@ pub(crate) fn munmap_handle<T: MemoryHandle>(mapped: &MappedMem<T>) {

static ANON_SHM_ID: AtomicI32 = AtomicI32::new(0);

const NO_OWNER_UID: u32 = u32::MAX;

static SHM_OWNER_UID: AtomicU32 = AtomicU32::new(NO_OWNER_UID);

pub fn set_shm_owner_uid(uid: u32) {
SHM_OWNER_UID.store(uid, Ordering::Relaxed);
}

fn shm_owner_uid() -> Option<u32> {
let uid = SHM_OWNER_UID.load(Ordering::Relaxed);
if uid == NO_OWNER_UID {
None
} else {
Some(uid)
}
}

impl ShmHandle {
pub fn new(size: usize) -> anyhow::Result<ShmHandle> {
let path = format!(
Expand Down Expand Up @@ -112,6 +130,9 @@ impl NamedShmHandle {
truncate?;
}
}
if let Some(uid) = shm_owner_uid() {
let _ = fchown(fd.as_raw_fd(), Some(Uid::from_raw(uid)), None);
}
Self::new(fd, Some(path), size)
}

Expand Down
4 changes: 4 additions & 0 deletions datadog-ipc/src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ pub(crate) use mem_handle_macos::*;
#[cfg(not(target_os = "macos"))]
mod mem_handle;
#[cfg(not(target_os = "macos"))]
pub use mem_handle::set_shm_owner_uid;
#[cfg(not(target_os = "macos"))]
pub(crate) use mem_handle::*;
#[cfg(target_os = "macos")]
pub use mem_handle_macos::set_shm_owner_uid;

#[no_mangle]
#[cfg(polyfill_glibc_memfd)]
Expand Down
42 changes: 42 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ use std::slice;
use std::sync::Arc;
use std::time::Duration;

use datadog_sidecar::setup::{connect_to_master, MasterListener};

#[no_mangle]
#[cfg(target_os = "windows")]
pub extern "C" fn ddog_setup_crashtracking(
Expand Down Expand Up @@ -310,6 +312,46 @@ pub extern "C" fn ddog_sidecar_connect(connection: &mut *mut SidecarTransport) -
MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_connect_master(pid: i32) -> MaybeError {
let cfg = datadog_sidecar::config::FromEnv::config();
#[cfg(unix)]
datadog_sidecar::set_sidecar_master_pid(pid as u32);
try_c!(MasterListener::start(pid, cfg));

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_connect_worker(
pid: i32,
connection: &mut *mut SidecarTransport,
) -> MaybeError {
let transport = try_c!(connect_to_master(pid));
*connection = Box::into_raw(transport);

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_shutdown_master_listener() -> MaybeError {
try_c!(MasterListener::shutdown());

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_is_master_listener_active(pid: i32) -> bool {
MasterListener::is_active(pid)
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_clear_inherited_listener() -> MaybeError {
try_c!(MasterListener::clear_inherited_state());

MaybeError::None
}

#[no_mangle]
pub extern "C" fn ddog_sidecar_ping(transport: &mut Box<SidecarTransport>) -> MaybeError {
try_c!(blocking::ping(transport));
Expand Down
107 changes: 81 additions & 26 deletions datadog-sidecar/src/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{
},
time::{Duration, Instant},
};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};

#[cfg(unix)]
use crate::crashtracker::crashtracker_unix_socket_path;
Expand All @@ -32,7 +32,32 @@ use crate::tracer::SHM_LIMITER;
use crate::watchdog::Watchdog;
use crate::{ddog_daemon_entry_point, setup_daemon_process};

async fn main_loop<L, C, Fut>(listener: L, cancel: Arc<C>) -> io::Result<()>
/// Configuration for main_loop behavior
pub struct MainLoopConfig {
pub enable_ctrl_c_handler: bool,
pub enable_crashtracker: bool,
pub external_shutdown_rx: Option<oneshot::Receiver<()>>,
/// Set to false in thread mode so the worker's UID can be obtained on the
/// first connection and used to fchown the SHM.
pub init_shm_eagerly: bool,
}

impl Default for MainLoopConfig {
fn default() -> Self {
Self {
enable_ctrl_c_handler: true,
enable_crashtracker: true,
external_shutdown_rx: None,
init_shm_eagerly: true,
}
}
}

pub async fn main_loop<L, C, Fut>(
listener: L,
cancel: Arc<C>,
loop_config: MainLoopConfig,
) -> io::Result<()>
where
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
Fut: Future<Output = io::Result<()>>,
Expand Down Expand Up @@ -64,32 +89,49 @@ where
}
});

tokio::spawn(async move {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!("Error setting up signal handler {}", err);
}
tracing::info!("Received Ctrl-C Signal, shutting down");
cancel();
});
if let Some(shutdown_rx) = loop_config.external_shutdown_rx {
let cancel = cancel.clone();
tokio::spawn(async move {
let _ = shutdown_rx.await;
tracing::info!("External shutdown signal received");
cancel();
});
}

if loop_config.enable_ctrl_c_handler {
let cancel = cancel.clone();
tokio::spawn(async move {
if let Err(err) = tokio::signal::ctrl_c().await {
tracing::error!("Error setting up signal handler {}", err);
}
tracing::info!("Received Ctrl-C Signal, shutting down");
cancel();
});
}

#[cfg(unix)]
tokio::spawn(async move {
let socket_path = crashtracker_unix_socket_path();
match libdd_crashtracker::get_receiver_unix_socket(socket_path.to_str().unwrap_or_default())
{
Ok(listener) => loop {
if let Err(e) =
libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener).await
{
tracing::warn!("Got error while receiving crash report: {e}");
}
},
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
}
});
if loop_config.enable_crashtracker {
tokio::spawn(async move {
let socket_path = crashtracker_unix_socket_path();
match libdd_crashtracker::get_receiver_unix_socket(
socket_path.to_str().unwrap_or_default(),
) {
Ok(listener) => loop {
if let Err(e) =
libdd_crashtracker::async_receiver_entry_point_unix_listener(&listener)
.await
{
tracing::warn!("Got error while receiving crash report: {e}");
}
},
Err(e) => tracing::error!("Failed setting up the crashtracker listener: {e}"),
}
});
}

// Init. Early, before we start listening.
drop(SHM_LIMITER.lock());
if loop_config.init_shm_eagerly {
drop(SHM_LIMITER.lock());
}

let server = SidecarServer::default();

Expand Down Expand Up @@ -143,6 +185,19 @@ where
}

pub fn enter_listener_loop<F, L, Fut, C>(acquire_listener: F) -> anyhow::Result<()>
where
F: FnOnce() -> io::Result<(L, C)>,
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
Fut: Future<Output = io::Result<()>>,
C: Fn() + Sync + Send + 'static,
{
enter_listener_loop_with_config(acquire_listener, MainLoopConfig::default())
}

pub fn enter_listener_loop_with_config<F, L, Fut, C>(
acquire_listener: F,
loop_config: MainLoopConfig,
) -> anyhow::Result<()>
where
F: FnOnce() -> io::Result<(L, C)>,
L: FnOnce(Box<dyn Fn(IpcClient)>) -> Fut,
Expand All @@ -159,7 +214,7 @@ where
let (listener, cancel) = acquire_listener()?;

runtime
.block_on(main_loop(listener, Arc::new(cancel)))
.block_on(main_loop(listener, Arc::new(cancel), loop_config))
.map_err(|e| e.into())
}

Expand Down
12 changes: 12 additions & 0 deletions datadog-sidecar/src/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,18 @@ mod windows;
#[cfg(windows)]
pub use self::windows::*;

// Thread-based listener module (Unix)
#[cfg(unix)]
pub mod thread_listener;
#[cfg(unix)]
pub use thread_listener::{connect_to_master, MasterListener};

// Thread-based listener module (Windows)
#[cfg(windows)]
pub mod thread_listener_windows;
#[cfg(windows)]
pub use thread_listener_windows::{connect_to_master, MasterListener};

use datadog_ipc::platform::Channel;
use std::io;

Expand Down
Loading
Loading