diff --git a/producer.go b/producer.go index 951c1b9d..a8b0ce08 100644 --- a/producer.go +++ b/producer.go @@ -334,7 +334,6 @@ func (p *producer) StartWorkContext(fetchCtx, workCtx context.Context) error { } p.id.Store(id) - // TODO: fetcher should have some jitter in it to avoid stampeding issues. p.fetchLimiter = chanutil.NewDebouncedChan(fetchCtx, p.config.FetchCooldown, true) var ( @@ -596,7 +595,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context) { func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() - fetchPollTimer := time.NewTimer(p.config.FetchPollInterval) + fetchPollTimer := time.NewTimer(p.jitteredFetchPollInterval()) for { select { case <-ctx.Done(): @@ -607,11 +606,20 @@ func (p *producer) fetchPollLoop(ctx context.Context, wg *sync.WaitGroup) { return case <-fetchPollTimer.C: p.fetchLimiter.Call() - fetchPollTimer.Reset(p.config.FetchPollInterval) + fetchPollTimer.Reset(p.jitteredFetchPollInterval()) } } } +// jitteredFetchPollInterval returns FetchPollInterval with random jitter in +// [0, 10% of FetchPollInterval) added (minimum 10ms). This prevents multiple +// producers from synchronizing their fetches after a transient event (e.g. GC +// pause, network blip), which would cause periodic DB load spikes. +func (p *producer) jitteredFetchPollInterval() time.Duration { + jitterRange := max(p.config.FetchPollInterval/10, 10*time.Millisecond) + return randutil.DurationBetween(p.config.FetchPollInterval, p.config.FetchPollInterval+jitterRange) +} + func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { var limit int if p.paused { diff --git a/producer_test.go b/producer_test.go index 538bf272..b524338a 100644 --- a/producer_test.go +++ b/producer_test.go @@ -696,6 +696,24 @@ func testProducer(t *testing.T, makeProducer func(ctx context.Context, t *testin }) } +func TestProducer_jitteredFetchPollInterval(t *testing.T) { + t.Parallel() + + prod := &producer{} + prod.config = &producerConfig{ + FetchPollInterval: 1 * time.Second, + } + + // Run enough iterations to catch any out-of-bounds values without being + // flaky. The jitter range is [FetchPollInterval, FetchPollInterval + + // 10% of FetchPollInterval), so [1s, 1.1s). + for range 100 { + d := prod.jitteredFetchPollInterval() + require.GreaterOrEqual(t, d, prod.config.FetchPollInterval) + require.Less(t, d, prod.config.FetchPollInterval+prod.config.FetchPollInterval/10) + } +} + func emitQueueNotification(t *testing.T, ctx context.Context, exec riverdriver.Executor, schema, queue, action string, metadata []byte) { t.Helper()