Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Added root River CLI flag `--statement-timeout` so Postgres session statement timeout can be set explicitly for commands like migrations. Explicit flag values take priority over database URL query params, and query params still take priority over built-in defaults. [PR #1142](https://github.com/riverqueue/river/pull/1142).

### Fixed

- Fix connection leak in `Listener.Connect` in case where `afterConnectExec` failed. Thanks Johan Kjölhede ([@GiGurra](https://github.com/GiGurra))! [PR #1147](https://github.com/riverqueue/river/pull/1147).
- Fix missing `ticker.Stop` in producer's `pollForSettingChanges` ([@GiGurra](https://github.com/GiGurra)). [PR #1148](https://github.com/riverqueue/river/pull/1148).
- Fix accidental use of cancelled context for `Notifier.Ping` ([@GiGurra](https://github.com/GiGurra)). [PR #1149](https://github.com/riverqueue/river/pull/1149).
- Add jitter to fetch poll loop to prevent producer stampeding ([@GiGurra](https://github.com/GiGurra)). [PR #1150](https://github.com/riverqueue/river/pull/1150).

### Changed

- Upgrade supported Go versions to 1.25 and 1.26, and update CI accordingly. [PR #1144](https://github.com/riverqueue/river/pull/1144).
Expand Down
6 changes: 3 additions & 3 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ type Notifier struct {
baseservice.BaseService
startstop.BaseStartStop

disableSleep bool // for tests only; disable sleep on exponential backoff
testPingInterval time.Duration // for tests only; override the 5s ping interval
listener riverdriver.Listener
notificationBuf chan *riverdriver.Notification
testDisableSleep bool // for tests only; disable sleep on exponential backoff
testPingInterval time.Duration // for tests only; override the 5s ping interval
testSignals notifierTestSignals
waitInterruptChan chan func()

Expand Down Expand Up @@ -152,7 +152,7 @@ func (n *Notifier) Start(ctx context.Context) error {
slog.String("sleep_duration", sleepDuration.String()),
)
n.testSignals.BackoffError.Signal(err)
if !n.disableSleep {
if !n.testDisableSleep {
serviceutil.CancellableSleep(ctx, sleepDuration)
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestNotifier(t *testing.T) {

notifier, _ := setup(t, nil)

notifier.disableSleep = true
notifier.testDisableSleep = true

var errorNum int

Expand Down Expand Up @@ -582,7 +582,7 @@ func TestNotifier(t *testing.T) {
notifier, bundle := setup(t, nil)

// Disable the backoff sleep that would occur after the first retry.
notifier.disableSleep = true
notifier.testDisableSleep = true

var errorNum int

Expand Down
24 changes: 12 additions & 12 deletions riverdriver/riverpgxv5/river_pgx_v5_driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ func TestListener_Connect(t *testing.T) {

ctx := context.Background()

t.Run("SuccessfulConnect", func(t *testing.T) {
t.Parallel()

pool := testPool(ctx, t, nil)
listener := &Listener{dbPool: pool}

require.NoError(t, listener.Connect(ctx))
require.NotNil(t, listener.conn)

require.NoError(t, listener.Close(ctx))
})

t.Run("ReleasesPoolConnOnAfterConnectExecError", func(t *testing.T) {
t.Parallel()

Expand All @@ -118,18 +130,6 @@ func TestListener_Connect(t *testing.T) {
require.NoError(t, err, "pool connection was leaked: Acquire timed out because the connection was not released")
conn.Release()
})

t.Run("SuccessfulConnect", func(t *testing.T) {
t.Parallel()

pool := testPool(ctx, t, nil)
listener := &Listener{dbPool: pool}

require.NoError(t, listener.Connect(ctx))
require.NotNil(t, listener.conn)

require.NoError(t, listener.Close(ctx))
})
}

func TestInterpretError(t *testing.T) {
Expand Down