Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions engine/access/rpc/connection/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,35 +159,49 @@ func TestConcurrentConnectionsAndDisconnects(t *testing.T) {
assert.Equal(t, int32(1), callCount.Load())
})

// Test that connections and invalidations work correctly under concurrent load.
// Invalidation is done between batches (not concurrently with AddRequest) to avoid
// a known WaitGroup race between AddRequest and Close in CachedClient.
// The production code fix is tracked in https://github.com/onflow/flow-go/pull/7859
t.Run("test rapid connections and invalidations", func(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(connectionCount)
callCount := atomic.NewInt32(0)
for i := 0; i < connectionCount; i++ {
go func() {
defer wg.Done()
cachedConn, err := cache.GetConnected("foo", cfg, nil, func(string, Config, crypto.PublicKey, *CachedClient) (*grpc.ClientConn, error) {
callCount.Inc()
return conn, nil
})
require.NoError(t, err)
connectFn := func(string, Config, crypto.PublicKey, *CachedClient) (*grpc.ClientConn, error) {
callCount.Inc()
return conn, nil
}

done := cachedConn.AddRequest()
time.Sleep(1 * time.Millisecond)
cachedConn.Invalidate()
done()
}()
batchSize := 1000
numBatches := 100

for batch := 0; batch < numBatches; batch++ {
wg := sync.WaitGroup{}
wg.Add(batchSize)
for i := 0; i < batchSize; i++ {
go func() {
defer wg.Done()
cachedConn, err := cache.GetConnected("foo", cfg, nil, connectFn)
require.NoError(t, err)

done := cachedConn.AddRequest()
time.Sleep(1 * time.Millisecond)
done()
}()
Comment on lines +180 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

# First, locate and examine the test file
find . -name "cache_test.go" -path "*/engine/access/rpc/connection/*"

Repository: onflow/flow-go

Length of output: 102


🏁 Script executed:

# Examine the specific lines mentioned (180-188 and surrounding context)
sed -n '170,195p' engine/access/rpc/connection/cache_test.go

Repository: onflow/flow-go

Length of output: 664


🏁 Script executed:

# Also check line 149 mentioned in the note
sed -n '140,160p' engine/access/rpc/connection/cache_test.go

Repository: onflow/flow-go

Length of output: 736


🏁 Script executed:

# Search for all goroutines + require patterns in this file
rg -n "go func" -A 15 engine/access/rpc/connection/cache_test.go | head -100

Repository: onflow/flow-go

Length of output: 2885


🏁 Script executed:

# Check imports to see if both require and assert are available
head -30 engine/access/rpc/connection/cache_test.go

Repository: onflow/flow-go

Length of output: 785


require.NoError inside a goroutine can cause undefined test behavior.

require.NoError calls t.FailNow(), which invokes runtime.Goexit(). The testing package explicitly states this must only be called from the goroutine running the test function. In a spawned goroutine, it can cause a panic, a hang, or a silently passing test.

Use assert.NoError and return early instead, or collect the error via a channel/atomic.

Note: the same pattern exists at line 149 in the unchanged first subtest.

Proposed fix
 				go func() {
 					defer wg.Done()
 					cachedConn, err := cache.GetConnected("foo", cfg, nil, connectFn)
-					require.NoError(t, err)
+					if !assert.NoError(t, err) {
+						return
+					}
 
 					done := cachedConn.AddRequest()
 					time.Sleep(1 * time.Millisecond)
 					done()
 				}()
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
go func() {
defer wg.Done()
cachedConn, err := cache.GetConnected("foo", cfg, nil, connectFn)
require.NoError(t, err)
done := cachedConn.AddRequest()
time.Sleep(1 * time.Millisecond)
done()
}()
go func() {
defer wg.Done()
cachedConn, err := cache.GetConnected("foo", cfg, nil, connectFn)
if !assert.NoError(t, err) {
return
}
done := cachedConn.AddRequest()
time.Sleep(1 * time.Millisecond)
done()
}()
🤖 Prompt for AI Agents
In `@engine/access/rpc/connection/cache_test.go` around lines 180 - 188, The test
spawns goroutines that call require.NoError which calls t.FailNow and must only
run in the main test goroutine; replace the in-goroutine require.NoError usage
in the goroutine that calls cache.GetConnected (and the similar case in the
first subtest near cachedConn.AddRequest/connectFn) by either using
assert.NoError and returning from the goroutine on error or sending the error
back on a channel/atomic and having the main test goroutine fail; specifically
update the anonymous goroutine that calls cache.GetConnected("foo", cfg, nil,
connectFn) to check the error without t.FailNow (use assert.NoError from testify
or propagate the err to the main goroutine and handle fail there) and bail the
goroutine early if an error occurs.

}
wg.Wait()

// Invalidate after all requests in this batch complete.
// Safe: no concurrent AddRequest on this client at this point.
cache.invalidate("foo")
}
wg.Wait()

// since all connections are invalidated, the cache should be empty at the end
require.Eventually(t, func() bool {
return cache.Len() == 0
}, time.Second, 20*time.Millisecond, "cache should be empty")

// Many connections should be created, but some will be shared
// Multiple connections should be created due to invalidation between batches
assert.Greater(t, callCount.Load(), int32(1))
assert.LessOrEqual(t, callCount.Load(), int32(connectionCount))
assert.LessOrEqual(t, callCount.Load(), int32(numBatches*batchSize))
})
}

Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ func TestEvictingCacheClients(t *testing.T) {

// Invalidate marks the connection for closure asynchronously, so give it some time to run
require.Eventually(t, func() bool {
return cachedClient.closeRequested.Load()
return cachedClient.CloseRequested()
}, 100*time.Millisecond, 10*time.Millisecond, "client timed out closing connection")

// Call a gRPC method on the client, requests should be blocked since the connection is invalidated
Expand Down
9 changes: 8 additions & 1 deletion ledger/complete/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,14 @@ Loop:
}
c.logger.Info().Msg("Finished draining trie update channel in compactor on shutdown")

// Don't wait for checkpointing to finish because it might take too long.
// Don't wait for checkpointing to finish if it takes more than 10ms because it might take too long.
// We wait at least a little bit to make the tests a lot more stable.
if !checkpointSem.TryAcquire(1) {
select {
case <-checkpointResultCh:
case <-time.After(10 * time.Millisecond):
}
}
}

// checkpoint creates checkpoint of tries snapshot,
Expand Down
6 changes: 5 additions & 1 deletion network/test/cohort2/unicast_authorization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,11 @@ func (u *UnicastAuthorizationTestSuite) TestUnicastAuthorization_ReceiverHasNoSu
err = senderCon.Unicast(&libp2pmessage.TestMessage{
Text: string("hello"),
}, u.receiverID.NodeID)
require.NoError(u.T(), err)
if err != nil {
// It can happen that the receiver resets before the sender closes,
// in which case the error will be "stream reset"
require.ErrorContains(u.T(), err, "stream reset", "expected stream-related error when receiver has no subscription")
}

// wait for slashing violations consumer mock to invoke run func and close ch if expected method call happens
unittest.RequireCloseBefore(u.T(), u.waitCh, u.channelCloseDuration, "could close ch on time")
Expand Down
Loading