From 46862569f38f2afbf8a882a62f990497bc713465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Kj=C3=B6lhede?= Date: Fri, 13 Feb 2026 21:59:49 +0100 Subject: [PATCH 1/3] Add jitter to fetch poll loop to prevent producer stampeding Co-Authored-By: Claude Opus 4.6 --- producer.go | 13 ++++++++++--- producer_test.go | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/producer.go b/producer.go index 951c1b9d..fab4307f 100644 --- a/producer.go +++ b/producer.go @@ -334,7 +334,6 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { } p.id.Store(id) - // TODO: fetcher should have some jitter in it to avoid stampeding issues. p.fetchLimiter = chanutil.NewDebouncedChan(fetchCtx, p.config.FetchCooldown, true) var ( @@ -596,7 +595,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - fetchPollTimer := time.NewTimer(p.config.FetchPollInterval) + fetchPollTimer := time.NewTimer(p.jitteredFetchPollInterval()) for { select { case <-ctx.Done(): @@ -607,11 +606,19 @@ func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { return case <-fetchPollTimer.C: p.fetchLimiter.Call() - fetchPollTimer.Reset(p.config.FetchPollInterval) + fetchPollTimer.Reset(p.jitteredFetchPollInterval()) } } } +// jitteredFetchPollInterval returns FetchPollInterval with random jitter in +// [0, FetchCooldown) added. This prevents multiple producers from +// synchronizing their fetches after a transient event (e.g. GC pause, network +// blip), which would cause periodic DB load spikes. +func (p *producer) jitteredFetchPollInterval() time.Duration { + return randutil.DurationBetween(p.config.FetchPollInterval, p.config.FetchPollInterval+p.config.FetchCooldown) +} + func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { var limit int if p.paused { diff --git a/producer_test.go b/producer_test.go index 538bf272..370cbef9 100644 --- a/producer_test.go +++ b/producer_test.go @@ -696,6 +696,25 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin }) } +func TestProducer_jitteredFetchPollInterval(t *testing.T) { + t.Parallel() + + p := &producer{} + p.config = &producerConfig{ + FetchCooldown: 100 * time.Millisecond, + FetchPollInterval: 1 * time.Second, + } + + // Run enough iterations to catch any out-of-bounds values without being + // flaky. The jitter range is [FetchPollInterval, FetchPollInterval + + // FetchCooldown), so [1s, 1.1s). + for range 1_000 { + d := p.jitteredFetchPollInterval() + require.GreaterOrEqual(t, d, p.config.FetchPollInterval) + require.Less(t, d, p.config.FetchPollInterval+p.config.FetchCooldown) + } +} + func emitQueueNotification(t *testing.T, ctx context.Context, exec riverdriver.Executor, schema, queue, action string, metadata []byte) { t.Helper() From 651ea8d5b615e4ca526d0f3a9dbd58f247bb771a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Kj=C3=B6lhede?= Date: Sun, 15 Feb 2026 20:15:39 +0100 Subject: [PATCH 2/3] Base jitter range on poll interval instead of cooldown Use 10% of FetchPollInterval (min 10ms) as the jitter range rather than coupling it to the unrelated FetchCooldown config value. Co-Authored-By: Claude Opus 4.6 --- producer.go | 9 +++++---- producer_test.go | 7 +++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/producer.go b/producer.go index fab4307f..a8b0ce08 100644 --- a/producer.go +++ b/producer.go @@ -612,11 +612,12 @@ func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { } // jitteredFetchPollInterval returns FetchPollInterval with random jitter in -// [0, FetchCooldown) added. This prevents multiple producers from -// synchronizing their fetches after a transient event (e.g. GC pause, network -// blip), which would cause periodic DB load spikes. +// [0, 10% of FetchPollInterval) added (minimum 10ms). This prevents multiple +// producers from synchronizing their fetches after a transient event (e.g. GC +// pause, network blip), which would cause periodic DB load spikes. func (p *producer) jitteredFetchPollInterval() time.Duration { - return randutil.DurationBetween(p.config.FetchPollInterval, p.config.FetchPollInterval+p.config.FetchCooldown) + jitterRange := max(p.config.FetchPollInterval/10, 10*time.Millisecond) + return randutil.DurationBetween(p.config.FetchPollInterval, p.config.FetchPollInterval+jitterRange) } func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { diff --git a/producer_test.go b/producer_test.go index 370cbef9..2eef9c2d 100644 --- a/producer_test.go +++ b/producer_test.go @@ -701,17 +701,16 @@ func TestProducer_jitteredFetchPollInterval(t *testing.T) { p := &producer{} p.config = &producerConfig{ - FetchCooldown: 100 * time.Millisecond, FetchPollInterval: 1 * time.Second, } // Run enough iterations to catch any out-of-bounds values without being // flaky. The jitter range is [FetchPollInterval, FetchPollInterval + - // FetchCooldown), so [1s, 1.1s). - for range 1_000 { + // 10% of FetchPollInterval), so [1s, 1.1s). + for range 100 { d := p.jitteredFetchPollInterval() require.GreaterOrEqual(t, d, p.config.FetchPollInterval) - require.Less(t, d, p.config.FetchPollInterval+p.config.FetchCooldown) + require.Less(t, d, p.config.FetchPollInterval+p.config.FetchPollInterval/10) } } From 161a4f66d76aa94cab4b1501f70ec9404a842a0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Kj=C3=B6lhede?= Date: Sun, 15 Feb 2026 20:18:57 +0100 Subject: [PATCH 3/3] Fix varnamelen lint: rename p to prod in jitter test Co-Authored-By: Claude Opus 4.6 --- producer_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/producer_test.go b/producer_test.go index 2eef9c2d..b524338a 100644 --- a/producer_test.go +++ b/producer_test.go @@ -699,8 +699,8 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin func TestProducer_jitteredFetchPollInterval(t *testing.T) { t.Parallel() - p := &producer{} - p.config = &producerConfig{ + prod := &producer{} + prod.config = &producerConfig{ FetchPollInterval: 1 * time.Second, } @@ -708,9 +708,9 @@ func TestProducer_jitteredFetchPollInterval(t *testing.T) { // flaky. The jitter range is [FetchPollInterval, FetchPollInterval + // 10% of FetchPollInterval), so [1s, 1.1s). for range 100 { - d := p.jitteredFetchPollInterval() - require.GreaterOrEqual(t, d, p.config.FetchPollInterval) - require.Less(t, d, p.config.FetchPollInterval+p.config.FetchPollInterval/10) + d := prod.jitteredFetchPollInterval() + require.GreaterOrEqual(t, d, prod.config.FetchPollInterval) + require.Less(t, d, prod.config.FetchPollInterval+prod.config.FetchPollInterval/10) } }