diff --git a/CHANGELOG.md b/CHANGELOG.md index d2b35288..c1800b52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added root River CLI flag `--statement-timeout` so Postgres session statement timeout can be set explicitly for commands like migrations. Explicit flag values take priority over database URL query params, and query params still take priority over built-in defaults. [PR #1142](https://github.com/riverqueue/river/pull/1142). +### Fixed + +- Fix connection leak in `Listener.Connect` in case where `afterConnectExec` failed. Thanks Johan Kjölhede ([@GiGurra](https://github.com/GiGurra))! [PR #1147](https://github.com/riverqueue/river/pull/1147). +- Fix missing `ticker.Stop` in producer's `pollForSettingChanges` ([@GiGurra](https://github.com/GiGurra)). [PR #1148](https://github.com/riverqueue/river/pull/1148). +- Fix accidental use of cancelled context for `Notifier.Ping` ([@GiGurra](https://github.com/GiGurra)). [PR #1149](https://github.com/riverqueue/river/pull/1149). +- Add jitter to fetch poll loop to prevent producer stampeding ([@GiGurra](https://github.com/GiGurra)). [PR #1150](https://github.com/riverqueue/river/pull/1150). + ### Changed - Upgrade supported Go versions to 1.25 and 1.26, and update CI accordingly. [PR #1144](https://github.com/riverqueue/river/pull/1144). diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index b3e881b4..8f9618c9 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -79,10 +79,10 @@ type Notifier struct { baseservice.BaseService startstop.BaseStartStop - disableSleep bool // for tests only; disable sleep on exponential backoff - testPingInterval time.Duration // for tests only; override the 5s ping interval listener riverdriver.Listener notificationBuf chan *riverdriver.Notification + testDisableSleep bool // for tests only; disable sleep on exponential backoff + testPingInterval time.Duration // for tests only; override the 5s ping interval testSignals notifierTestSignals waitInterruptChan chan func() @@ -152,7 +152,7 @@ func (n *Notifier) Start(ctx context.Context) error { slog.String("sleep_duration", sleepDuration.String()), ) n.testSignals.BackoffError.Signal(err) - if !n.disableSleep { + if !n.testDisableSleep { serviceutil.CancellableSleep(ctx, sleepDuration) } } diff --git a/internal/notifier/notifier_test.go b/internal/notifier/notifier_test.go index f9b44daa..d98ff40d 100644 --- a/internal/notifier/notifier_test.go +++ b/internal/notifier/notifier_test.go @@ -490,7 +490,7 @@ func TestNotifier(t *testing.T) { notifier, _ := setup(t, nil) - notifier.disableSleep = true + notifier.testDisableSleep = true var errorNum int @@ -582,7 +582,7 @@ func TestNotifier(t *testing.T) { notifier, bundle := setup(t, nil) // Disable the backoff sleep that would occur after the first retry. - notifier.disableSleep = true + notifier.testDisableSleep = true var errorNum int diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go index 3b5fbf42..a58b9669 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver_test.go @@ -94,6 +94,18 @@ func TestListener_Connect(t *testing.T) { ctx := context.Background() + t.Run("SuccessfulConnect", func(t *testing.T) { + t.Parallel() + + pool := testPool(ctx, t, nil) + listener := &Listener{dbPool: pool} + + require.NoError(t, listener.Connect(ctx)) + require.NotNil(t, listener.conn) + + require.NoError(t, listener.Close(ctx)) + }) + t.Run("ReleasesPoolConnOnAfterConnectExecError", func(t *testing.T) { t.Parallel() @@ -118,18 +130,6 @@ func TestListener_Connect(t *testing.T) { require.NoError(t, err, "pool connection was leaked: Acquire timed out because the connection was not released") conn.Release() }) - - t.Run("SuccessfulConnect", func(t *testing.T) { - t.Parallel() - - pool := testPool(ctx, t, nil) - listener := &Listener{dbPool: pool} - - require.NoError(t, listener.Connect(ctx)) - require.NotNil(t, listener.conn) - - require.NoError(t, listener.Close(ctx)) - }) } func TestInterpretError(t *testing.T) {