From dcf0675bae654924b2319f1f792721df3cb415a0 Mon Sep 17 00:00:00 2001 From: Chad Retz Date: Fri, 6 Feb 2026 17:39:18 -0600 Subject: [PATCH] Fix leak issues --- temporalio/Cargo.lock | 74 ++++++ temporalio/ext/Cargo.toml | 4 + temporalio/ext/src/lib.rs | 49 ++++ temporalio/ext/src/util.rs | 14 +- temporalio/extra/memory_bench.rb | 213 ++++++++++++++++++ .../worker/workflow_executor/thread_pool.rb | 2 +- 6 files changed, 352 insertions(+), 4 deletions(-) create mode 100644 temporalio/extra/memory_bench.rb diff --git a/temporalio/Cargo.lock b/temporalio/Cargo.lock index fdffe877..15af77c6 100644 --- a/temporalio/Cargo.lock +++ b/temporalio/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "addr2line" +version = "0.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b5d307320b3181d6d7954e663bd7c774a838b8220fe0593c86d9fb09f498b4b" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -262,6 +271,21 @@ dependencies = [ "rand 0.8.5", ] +[[package]] +name = "backtrace" +version = "0.3.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb531853791a215d7c62a30daf0dde835f381ab5de4589cfe7c649d2cbe92bd6" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-link 0.2.1", +] + [[package]] name = "base64" version = "0.21.7" @@ -730,6 +754,22 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "dhat" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cd11d84628e233de0ce467de10b8633f4ddaecafadefc86e13b84b8739b827" +dependencies = [ + "backtrace", + "lazy_static", + "mintex", + "parking_lot", + "rustc-hash 1.1.0", + "serde", + "serde_json", + "thousands", +] + [[package]] name = "dirs" version = "6.0.0" @@ -1056,6 +1096,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "gimli" +version = "0.32.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" + [[package]] name = "glob" version = "0.3.3" @@ -1694,6 +1740,12 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "mintex" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c505b3e17ed6b70a7ed2e67fbb2c560ee327353556120d6e72f5232b6880d536" + [[package]] name = "mio" version = "1.1.1" @@ -1799,6 +1851,15 @@ dependencies = [ "objc2-core-foundation", ] +[[package]] +name = "object" +version = "0.37.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff76201f031d8863c38aa7f905eca4f53abbfa15f609db4277d44cd8938f33fe" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -2656,6 +2717,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rustc-demangle" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b50b8869d9fc858ce7266cce0194bd74df58b9d0e3f6df3a9fc8eb470d95c09d" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -3238,6 +3305,7 @@ name = "temporalio_bridge" version = "0.1.0" dependencies = [ "async-trait", + "dhat", "futures", "log", "magnus", @@ -3310,6 +3378,12 @@ dependencies = [ "syn", ] +[[package]] +name = "thousands" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" + [[package]] name = "thread_local" version = "1.1.9" diff --git a/temporalio/ext/Cargo.toml b/temporalio/ext/Cargo.toml index 7331c809..33d8d302 100644 --- a/temporalio/ext/Cargo.toml +++ b/temporalio/ext/Cargo.toml @@ -26,3 +26,7 @@ tokio-util = "0.7" tonic = { workspace = true } tracing = "0.1" url = "2.5" +dhat = { version = "0.3", optional = true } + +[features] +dhat-heap = ["dhat"] diff --git a/temporalio/ext/src/lib.rs b/temporalio/ext/src/lib.rs index 623c7c7f..460dfdd7 100644 --- a/temporalio/ext/src/lib.rs +++ b/temporalio/ext/src/lib.rs @@ -1,5 +1,12 @@ use magnus::{Error, ExceptionClass, RModule, Ruby, prelude::*, value::Lazy}; +#[cfg(feature = "dhat-heap")] +#[global_allocator] +static ALLOC: dhat::Alloc = dhat::Alloc; + +#[cfg(feature = "dhat-heap")] +static DHAT_PROFILER: std::sync::Mutex> = std::sync::Mutex::new(None); + mod client; mod client_rpc_generated; mod envconfig; @@ -58,6 +65,15 @@ macro_rules! lazy_id { fn init(ruby: &Ruby) -> Result<(), Error> { Lazy::force(&ROOT_ERR, ruby); + #[cfg(feature = "dhat-heap")] + { + let profiler = dhat::Profiler::builder() + .file_name("dhat-heap.json") + .build(); + *DHAT_PROFILER.lock().unwrap() = Some(profiler); + eprintln!("[dhat] Heap profiling enabled."); + } + client::init(ruby)?; envconfig::init(ruby)?; metric::init(ruby)?; @@ -65,5 +81,38 @@ fn init(ruby: &Ruby) -> Result<(), Error> { testing::init(ruby)?; worker::init(ruby)?; + #[cfg(feature = "dhat-heap")] + { + let bridge_mod = ruby.get_inner(&ROOT_MOD); + bridge_mod + .define_module_function("dhat_heap_stats", magnus::function!(dhat_heap_stats, 0))?; + bridge_mod.define_module_function( + "dhat_dump_and_stop", + magnus::function!(dhat_dump_and_stop, 0), + )?; + } + Ok(()) } + +/// Print current dhat heap stats to stderr +#[cfg(feature = "dhat-heap")] +fn dhat_heap_stats() { + let stats = dhat::HeapStats::get(); + eprintln!( + "[dhat] curr_bytes={} curr_blocks={} total_bytes={} total_blocks={}", + stats.curr_bytes, stats.curr_blocks, stats.total_bytes, stats.total_blocks + ); +} + +/// Drop the profiler to force writing the JSON profile +#[cfg(feature = "dhat-heap")] +fn dhat_dump_and_stop() { + let mut guard = DHAT_PROFILER.lock().unwrap(); + if let Some(profiler) = guard.take() { + drop(profiler); // This triggers the profile write to dhat-heap.json + eprintln!("[dhat] Profile written to dhat-heap.json"); + } else { + eprintln!("[dhat] Profiler already stopped or not initialized"); + } +} diff --git a/temporalio/ext/src/util.rs b/temporalio/ext/src/util.rs index c7b1f237..fad968aa 100644 --- a/temporalio/ext/src/util.rs +++ b/temporalio/ext/src/util.rs @@ -88,22 +88,30 @@ where where U: FnMut(), { - let mut func: U = unsafe { *Box::from_raw(data as _) }; - + // Borrow rather than take ownership — the caller frees after + // rb_thread_call_without_gvl returns. This avoids leaking the + // unblock closure when Ruby never invokes it (the common case). + let func: &mut U = unsafe { &mut *(data as *mut U) }; func(); } let boxed_func = Box::new(func); let boxed_unblock = Box::new(unblock); + let unblock_ptr = Box::into_raw(boxed_unblock); unsafe { let result = rb_sys::rb_thread_call_without_gvl( Some(anon_func::), Box::into_raw(boxed_func) as *mut _, Some(anon_unblock::), - Box::into_raw(boxed_unblock) as *mut _, + unblock_ptr as *mut _, ); + // Free the unblock closure. By the time rb_thread_call_without_gvl + // returns, anon_unblock (if called at all) has already completed, + // so this is safe. + drop(Box::from_raw(unblock_ptr)); + *Box::from_raw(result as _) } } diff --git a/temporalio/extra/memory_bench.rb b/temporalio/extra/memory_bench.rb new file mode 100644 index 00000000..9eed3b52 --- /dev/null +++ b/temporalio/extra/memory_bench.rb @@ -0,0 +1,213 @@ +# frozen_string_literal: true + +# rubocop:disable Style/Documentation, Style/DocumentationMethod + +# Memory profiling harness for investigating memory leaks. +# Runs trivial workflows in batches and samples RSS between batches. +# +# Usage: +# ruby extra/memory_bench.rb --workflow-count 5000 --batch-size 500 +# +# With an external Temporal server: +# ruby extra/memory_bench.rb --workflow-count 5000 --batch-size 500 \ +# --target-host localhost:7233 --namespace default + +require_relative '../lib/temporalio/activity' +require_relative '../lib/temporalio/client' +require_relative '../lib/temporalio/testing' +require_relative '../lib/temporalio/worker' +require_relative '../lib/temporalio/workflow' + +require 'logger' +require 'objspace' +require 'optparse' +require 'securerandom' + +module MemoryBench + class NoopWorkflow < Temporalio::Workflow::Definition + def execute(input) + "done-#{input}" + end + end + + class SimpleActivity < Temporalio::Activity::Definition + def execute(input) + "activity-#{input}" + end + end + + class WithActivityWorkflow < Temporalio::Workflow::Definition + def execute(input) + Temporalio::Workflow.execute_activity(SimpleActivity, input, start_to_close_timeout: 30) + end + end +end + +def rss_mib + `ps -o rss= -p #{Process.pid}`.to_i / 1024.0 +end + +def objspace_mib + ObjectSpace.memsize_of_all / (1024.0 * 1024.0) +end + +# Call malloc_trim(0) to return freed pages to OS (Linux only, requires fiddle gem) +MALLOC_TRIM = begin + require 'fiddle' + libc = Fiddle.dlopen(nil) + Fiddle::Function.new(libc['malloc_trim'], [Fiddle::TYPE_INT], Fiddle::TYPE_INT) +rescue LoadError, Fiddle::DLError + nil +end + +def trim_native_memory + MALLOC_TRIM&.call(0) +end + +def run_memory_bench( + workflow_count:, batch_size:, max_cached_workflows:, max_concurrent:, + target_host:, namespace:, use_activity: +) + logger = Logger.new($stdout, level: Logger::INFO) + task_queue = "mem-bench-#{SecureRandom.uuid}" + workflows = use_activity ? [MemoryBench::WithActivityWorkflow] : [MemoryBench::NoopWorkflow] + activities = use_activity ? [MemoryBench::SimpleActivity] : [] + workflow_class = use_activity ? MemoryBench::WithActivityWorkflow : MemoryBench::NoopWorkflow + + samples = [] + samples << { workflows_completed: 0, rss_mib: rss_mib, objspace_mib: objspace_mib, phase: 'pre_connect' } + + connect_and_run = proc do |client| + samples << { workflows_completed: 0, rss_mib: rss_mib, objspace_mib: objspace_mib, phase: 'connected' } + + worker = Temporalio::Worker.new( + client:, + task_queue:, + activities:, + workflows:, + tuner: Temporalio::Worker::Tuner.create_fixed( + workflow_slots: max_concurrent, + activity_slots: max_concurrent, + local_activity_slots: max_concurrent + ), + max_cached_workflows: + ) + + samples << { workflows_completed: 0, rss_mib: rss_mib, objspace_mib: objspace_mib, phase: 'worker_created' } + + # Run workflows on a separate thread while worker processes them + completed = 0 + batches = (workflow_count.to_f / batch_size).ceil + + worker.run do + samples << { workflows_completed: 0, rss_mib: rss_mib, objspace_mib: objspace_mib, phase: 'worker_running' } + + batches.times do |batch_idx| + count = [batch_size, workflow_count - completed].min + + # Start and wait for each workflow sequentially + count.times do |i| + wf_num = completed + i + handle = client.start_workflow( + workflow_class, wf_num, + id: "mem-#{SecureRandom.uuid}", + task_queue: + ) + handle.result + end + completed += count + + # Force GC, trim native allocator, and sample + GC.start + trim_native_memory + mem = rss_mib + obj = objspace_mib + samples << { workflows_completed: completed, rss_mib: mem, objspace_mib: obj, phase: 'batch' } + logger.info("Batch #{batch_idx + 1}/#{batches}: #{completed}/#{workflow_count} workflows, " \ + "RSS=#{mem.round(1)} MiB, ObjSpace=#{obj.round(1)} MiB") + end + end + + # Post-worker sample + GC.start + trim_native_memory + samples << { + workflows_completed: workflow_count, rss_mib: rss_mib, objspace_mib: objspace_mib, phase: 'worker_stopped' + } + end + + # Dump dhat profile if available (compiled with dhat-heap feature) + dhat_dump = proc do + Temporalio::Internal::Bridge.dhat_heap_stats + Temporalio::Internal::Bridge.dhat_dump_and_stop + rescue NoMethodError + # dhat not compiled in, ignore + end + + if target_host + logger.info("Connecting to #{target_host} namespace=#{namespace}") + client = Temporalio::Client.connect(target_host, namespace:, logger:) + connect_and_run.call(client) + else + logger.info('Starting local test environment') + Temporalio::Testing::WorkflowEnvironment.start_local(logger:) do |env| + connect_and_run.call(env.client) + end + end + + # Print results + puts "\n=== Memory Bench Results ===" + puts "Config: workflow_count=#{workflow_count} batch_size=#{batch_size} " \ + "max_cached_workflows=#{max_cached_workflows} max_concurrent=#{max_concurrent} " \ + "use_activity=#{use_activity}" + + batch_samples = samples.select { |s| s[:phase] == 'batch' } + if batch_samples.size >= 2 + first = batch_samples.first + last = batch_samples.last + wf_delta = last[:workflows_completed] - first[:workflows_completed] + rss_growth = last[:rss_mib] - first[:rss_mib] + obj_growth = last[:objspace_mib] - first[:objspace_mib] + rss_rate = wf_delta.positive? ? (rss_growth / wf_delta * 1000).round(2) : 0 + obj_rate = wf_delta.positive? ? (obj_growth / wf_delta * 1000).round(2) : 0 + puts "RSS growth: #{rss_growth.round(2)} MiB over #{wf_delta} workflows (#{rss_rate} MiB/1000 wf)" + puts "ObjSpace growth: #{obj_growth.round(2)} MiB over #{wf_delta} workflows (#{obj_rate} MiB/1000 wf)" + puts "Native growth: #{(rss_growth - obj_growth).round(2)} MiB (RSS minus ObjSpace)" + end + + puts "\nPhase/Batch, Workflows, RSS_MiB, ObjSpace_MiB" + samples.each do |s| + puts "#{s[:phase]}, #{s[:workflows_completed]}, #{s[:rss_mib].round(2)}, #{s[:objspace_mib].round(2)}" + end + + dhat_dump.call +ensure + logger&.close +end + +# Parse options +parser = OptionParser.new +workflow_count = 5000 +batch_size = 500 +max_cached_workflows = 1000 +max_concurrent = 100 +target_host = nil +namespace = 'default' +use_activity = false + +parser.on('--workflow-count N', Integer, "Number of workflows (default: #{workflow_count})") { |v| workflow_count = v } +parser.on('--batch-size N', Integer, "Batch size for sampling (default: #{batch_size})") { |v| batch_size = v } +parser.on('--max-cached-workflows N', Integer, + "Max cached workflows (default: #{max_cached_workflows})") { |v| max_cached_workflows = v } +parser.on('--max-concurrent N', Integer, "Max concurrent slots (default: #{max_concurrent})") { |v| max_concurrent = v } +parser.on('--target-host HOST', 'External Temporal host (default: start local)') { |v| target_host = v } +parser.on('--namespace NS', "Namespace (default: #{namespace})") { |v| namespace = v } +parser.on('--with-activity', 'Use workflow that calls an activity') { use_activity = true } +parser.parse! + +run_memory_bench( + workflow_count:, batch_size:, max_cached_workflows:, max_concurrent:, + target_host:, namespace:, use_activity: +) + +# rubocop:enable Style/Documentation, Style/DocumentationMethod diff --git a/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb b/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb index e175d383..9773578f 100644 --- a/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb +++ b/temporalio/lib/temporalio/worker/workflow_executor/thread_pool.rb @@ -49,7 +49,7 @@ def _activate(activation, worker_state, &) # If not found, get a new one either by creating if not enough or find the one with the fewest. new_worker = if @workers.size < @max_threads created_worker = Worker.new(self) - @workers << Worker.new(self) + @workers << created_worker created_worker else @workers.min_by(&:workflow_count)