From 72463f979b4e838b0394eef5bbd768031e3f884d Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 10 Dec 2025 13:58:32 -0800 Subject: [PATCH 01/10] fix flaky test --- state/cluster/badger/mutator_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index cbf68967be3..37291cab3a5 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -497,6 +497,10 @@ func (suite *MutatorSuite) TestExtend_WithOrphanedReferenceBlock() { // create a block extending genesis (conflicting with previous) which is finalized finalized := unittest.BlockWithParentProtocolState(suite.protoGenesis) finalized.Payload.Guarantees = nil + if finalized.View == orphaned.View { + // ensure we don't have two certified blocks in a single view + finalized.View = orphaned.View + 1 + } err = suite.protoState.ExtendCertified(context.Background(), unittest.NewCertifiedBlock(finalized)) suite.Require().NoError(err) err = suite.protoState.Finalize(context.Background(), finalized.ID()) From cf3c38301d231d6f32fe6cf90d87acb9f590a99e Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Wed, 10 Dec 2025 15:11:31 -0800 Subject: [PATCH 02/10] Update SyncResponse to include finalized Header + CertifyingQC --- engine/collection/synchronization/engine.go | 2 +- .../collection/synchronization/engine_test.go | 18 +++++++++++------- .../synchronization/request_handler.go | 3 ++- engine/common/synchronization/engine.go | 2 +- engine/common/synchronization/engine_test.go | 18 +++++++++++------- .../common/synchronization/request_handler.go | 4 +++- model/flow/synchronization.go | 5 +++-- 7 files changed, 32 insertions(+), 20 deletions(-) diff --git a/engine/collection/synchronization/engine.go b/engine/collection/synchronization/engine.go index d87910e2fa7..f2f2cfe001a 100644 --- a/engine/collection/synchronization/engine.go +++ b/engine/collection/synchronization/engine.go @@ -287,7 +287,7 @@ func (e *Engine) onSyncResponse(_ flow.Identifier, res *flow.SyncResponse) { e.log.Error().Err(err).Msg("could not get last finalized header") return } - e.core.HandleHeight(final, res.Height) + e.core.HandleHeight(final, res.Header.Height) } // onBlockResponse processes a slice of requested block proposals. diff --git a/engine/collection/synchronization/engine_test.go b/engine/collection/synchronization/engine_test.go index 80203ef8aed..d7d0b72c611 100644 --- a/engine/collection/synchronization/engine_test.go +++ b/engine/collection/synchronization/engine_test.go @@ -187,7 +187,7 @@ func (ss *SyncSuite) TestOnSyncRequest() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head.Height, res.Height, "response should contain head height") + assert.Equal(ss.T(), ss.head.Height, res.Header.Height, "response should contain head height") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") @@ -204,13 +204,15 @@ func (ss *SyncSuite) TestOnSyncRequest() { func (ss *SyncSuite) TestOnSyncResponse() { // generate origin ID and response message originID := unittest.IdentifierFixture() + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(rand.Uint64())) res := &flow.SyncResponse{ - Nonce: rand.Uint64(), - Height: rand.Uint64(), + Nonce: rand.Uint64(), + Header: header, + CertifyingQC: unittest.CertifyBlock(header), } // the height should be handled - ss.core.On("HandleHeight", ss.head, res.Height) + ss.core.On("HandleHeight", ss.head, res.Header.Height) ss.e.onSyncResponse(originID, res) ss.core.AssertExpectations(ss.T()) } @@ -559,11 +561,13 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { originID := unittest.IdentifierFixture() for i := 0; i < 5; i++ { + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i))) msg := &flow.SyncResponse{ - Nonce: uint64(i), - Height: uint64(1000 + i), + Nonce: uint64(i), + Header: header, + CertifyingQC: unittest.CertifyBlock(header), } - ss.core.On("HandleHeight", mock.Anything, msg.Height).Once() + ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once() ss.metrics.On("MessageSent", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageReceived", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageHandled", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once() diff --git a/engine/collection/synchronization/request_handler.go b/engine/collection/synchronization/request_handler.go index 428af23aaee..e010bf37076 100644 --- a/engine/collection/synchronization/request_handler.go +++ b/engine/collection/synchronization/request_handler.go @@ -187,8 +187,9 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow // if we're sufficiently ahead of the requester, send a response res := &messages.SyncResponse{ - Height: final.Height, Nonce: req.Nonce, + Header: final, + //CertifyingQC: proof.CertifiedChild.Block.QC, } err = r.con.Unicast(res, originID) if err != nil { diff --git a/engine/common/synchronization/engine.go b/engine/common/synchronization/engine.go index cc6ec369064..7309dfb7b20 100644 --- a/engine/common/synchronization/engine.go +++ b/engine/common/synchronization/engine.go @@ -296,7 +296,7 @@ func (e *Engine) processAvailableResponses(ctx context.Context) { func (e *Engine) onSyncResponse(originID flow.Identifier, res *flow.SyncResponse) { e.log.Debug().Str("origin_id", originID.String()).Msg("received sync response") final := e.finalizedHeaderCache.Get() - e.core.HandleHeight(final, res.Height) + e.core.HandleHeight(final, res.Header.Height) } // onBlockResponse processes a structurally validated block proposal containing a specifically requested block response. diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index 2f5c399c6f8..cc3f5aed85e 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -84,7 +84,7 @@ func (ss *SyncSuite) TestOnSyncRequest_LowerThanReceiver_OutsideTolerance() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head.Height, res.Height, "response should contain head height") + assert.Equal(ss.T(), ss.head, res.Header, "response should contain head") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") @@ -104,16 +104,18 @@ func (ss *SyncSuite) TestOnSyncResponse() { height, err := rand.Uint64() require.NoError(ss.T(), err, "should generate height") + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(height)) // generate origin ID and response message originID := unittest.IdentifierFixture() res := &flow.SyncResponse{ - Nonce: nonce, - Height: height, + Nonce: nonce, + Header: header, + CertifyingQC: unittest.CertifyBlock(header), } // the height should be handled - ss.core.On("HandleHeight", ss.head, res.Height) + ss.core.On("HandleHeight", ss.head, res.Header.Height) ss.e.onSyncResponse(originID, res) ss.core.AssertExpectations(ss.T()) } @@ -466,11 +468,13 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { originID := unittest.IdentifierFixture() for i := 0; i < 5; i++ { + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i))) msg := &flow.SyncResponse{ - Nonce: uint64(i), - Height: uint64(1000 + i), + Nonce: uint64(i), + Header: header, + CertifyingQC: unittest.CertifyBlock(header), } - ss.core.On("HandleHeight", mock.Anything, msg.Height).Once() + ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once() ss.metrics.On("MessageSent", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageHandled", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageReceived", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once() diff --git a/engine/common/synchronization/request_handler.go b/engine/common/synchronization/request_handler.go index 75367e0d616..e707cf8594c 100644 --- a/engine/common/synchronization/request_handler.go +++ b/engine/common/synchronization/request_handler.go @@ -171,9 +171,11 @@ func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncR } // if we're sufficiently ahead of the requester, send a response + //protocol.Snapshot().Final() // TODO sync res := &messages.SyncResponse{ - Height: finalizedHeader.Height, Nonce: req.Nonce, + Header: finalizedHeader, + //CertifyingQC: state.QuorumCertificate(), } err := r.responseSender.SendResponse(res, originID) if err != nil { diff --git a/model/flow/synchronization.go b/model/flow/synchronization.go index ff98d2937a5..a7806ae8d36 100644 --- a/model/flow/synchronization.go +++ b/model/flow/synchronization.go @@ -12,8 +12,9 @@ type SyncRequest struct { // to a synchronization request that contains the latest finalized block height // of the responding node. type SyncResponse struct { - Nonce uint64 - Height uint64 + Nonce uint64 + Header *Header + CertifyingQC *QuorumCertificate } // BatchRequest is part of the synchronization protocol and represents an active From 1e1c95c7bf34fa539a0d5d95d1b29ddef01e0901 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 16 Dec 2025 13:30:28 -0800 Subject: [PATCH 03/10] add QuorumCertificate() method to cluster snapshot Used in the cluster sync engine to certify our latest finalized height in a syncResponse. Implementation may still change. --- .../synchronization/request_handler.go | 12 ++++++-- state/cluster/badger/snapshot.go | 22 ++++++++++++++ state/cluster/invalid/snapshot.go | 4 +++ state/cluster/mock/snapshot.go | 30 +++++++++++++++++++ state/cluster/snapshot.go | 6 ++++ 5 files changed, 71 insertions(+), 3 deletions(-) diff --git a/engine/collection/synchronization/request_handler.go b/engine/collection/synchronization/request_handler.go index e010bf37076..300eb32ac9a 100644 --- a/engine/collection/synchronization/request_handler.go +++ b/engine/collection/synchronization/request_handler.go @@ -176,6 +176,7 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow return fmt.Errorf("could not get last finalized header: %w", err) } + // TODO(8173) remove this step and only rely on certified sync responses? // queue any missing heights as needed r.core.HandleHeight(final, req.Height) @@ -185,11 +186,16 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow return nil } + qc, err := r.state.Final().QuorumCertificate() + if err != nil { + return fmt.Errorf("could not get QC for last finalized header: %w", err) + } + // if we're sufficiently ahead of the requester, send a response res := &messages.SyncResponse{ - Nonce: req.Nonce, - Header: final, - //CertifyingQC: proof.CertifiedChild.Block.QC, + Nonce: req.Nonce, + Header: final, + CertifyingQC: qc, } err = r.con.Unicast(res, originID) if err != nil { diff --git a/state/cluster/badger/snapshot.go b/state/cluster/badger/snapshot.go index 6dd8dad653e..830009d957c 100644 --- a/state/cluster/badger/snapshot.go +++ b/state/cluster/badger/snapshot.go @@ -68,6 +68,28 @@ func (s *Snapshot) Head() (*flow.Header, error) { return &head, err } +func (s *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { + var pendingIDs flow.IdentifierList + err := operation.RetrieveBlockChildren(s.state.db.Reader(), s.blockID, &pendingIDs) + if err != nil { + return nil, fmt.Errorf("could not get QC for finalized head %v: %w", s.blockID, err) + + // The low-level storage returns `storage.ErrNotFound` in two cases: + // 1. the block/collection is unknown + // 2. the block/collection is known but no children have been indexed yet + // By contract of the constructor, the blockID must correspond to a known collection in the database. + // A snapshot with s.err == nil is only created for known blocks. Hence, only case 2 is + // possible here. + } + var child flow.Header + err = operation.RetrieveHeader(s.state.db.Reader(), pendingIDs[0], &child) + if err != nil { + return nil, fmt.Errorf("could not retrieve header for unfinalized child %v: %w", pendingIDs[0], err) + } + return child.ParentQC(), nil + +} + // Pending returns the IDs of all collections descending from the snapshot's head collection. // The result is ordered such that parents are included before their children. While only valid // descendants will be returned, note that the descendants may not be finalized yet. diff --git a/state/cluster/invalid/snapshot.go b/state/cluster/invalid/snapshot.go index 02ccb6503ae..51cf379aa8b 100644 --- a/state/cluster/invalid/snapshot.go +++ b/state/cluster/invalid/snapshot.go @@ -42,6 +42,10 @@ func (u *Snapshot) Head() (*flow.Header, error) { return nil, u.err } +func (u *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { + return nil, u.err +} + func (u *Snapshot) Pending() ([]flow.Identifier, error) { return nil, u.err } diff --git a/state/cluster/mock/snapshot.go b/state/cluster/mock/snapshot.go index 08938de8e26..14197fedeb0 100644 --- a/state/cluster/mock/snapshot.go +++ b/state/cluster/mock/snapshot.go @@ -102,6 +102,36 @@ func (_m *Snapshot) Pending() ([]flow.Identifier, error) { return r0, r1 } +// QuorumCertificate provides a mock function with no fields +func (_m *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for QuorumCertificate") + } + + var r0 *flow.QuorumCertificate + var r1 error + if rf, ok := ret.Get(0).(func() (*flow.QuorumCertificate, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() *flow.QuorumCertificate); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*flow.QuorumCertificate) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewSnapshot creates a new instance of Snapshot. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewSnapshot(t interface { diff --git a/state/cluster/snapshot.go b/state/cluster/snapshot.go index 32f5539f44e..44a3b9e80ad 100644 --- a/state/cluster/snapshot.go +++ b/state/cluster/snapshot.go @@ -23,6 +23,12 @@ type Snapshot interface { // - If the snapshot is for an unknown collection [state.ErrUnknownSnapshotReference] Head() (*flow.Header, error) + // QuorumCertificate returns a valid quorum certificate pointing to the header at this snapshot. + // + // Expected error returns during normal operations: + // - If the snapshot is for an unknown collection [state.ErrUnknownSnapshotReference] + QuorumCertificate() (*flow.QuorumCertificate, error) + // Pending returns the IDs of *all* collections descending from the snapshot's head collection. // The result is ordered such that parents are included before their children. While only valid // descendants will be returned, note that the descendants may not be finalized yet. From 1ec161ee58b49b182f54539ef4455aabb20e596e Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 9 Jan 2026 13:06:35 -0800 Subject: [PATCH 04/10] Ensure consistent snapshot is used for header/QC in OnSyncRequest Also, the correct QC is now included in both consensus/cluster `SyncResponse`s, and tests now check that SyncResponses contain a QC. --- .../collection/synchronization/engine_test.go | 13 +++++++++- .../synchronization/request_handler.go | 5 ++-- engine/common/synchronization/engine.go | 2 +- .../synchronization/engine_suite_test.go | 9 +++++++ engine/common/synchronization/engine_test.go | 25 ++++++++++++++++++- .../common/synchronization/request_handler.go | 24 +++++++++++++----- .../synchronization/request_handler_engine.go | 1 + state/cluster/badger/snapshot.go | 2 +- 8 files changed, 69 insertions(+), 12 deletions(-) diff --git a/engine/collection/synchronization/engine_test.go b/engine/collection/synchronization/engine_test.go index d7d0b72c611..9d4d85442d4 100644 --- a/engine/collection/synchronization/engine_test.go +++ b/engine/collection/synchronization/engine_test.go @@ -41,6 +41,7 @@ type SyncSuite struct { myID flow.Identifier participants flow.IdentityList head *flow.Header + qc *flow.QuorumCertificate heights map[uint64]*clustermodel.Proposal blockIDs map[flow.Identifier]*clustermodel.Proposal net *mocknetwork.EngineRegistry @@ -64,6 +65,8 @@ func (ss *SyncSuite) SetupTest() { // generate a header for the final state header := unittest.BlockHeaderFixture() ss.head = header + // generate a QC certifying the header + ss.qc = unittest.CertifyBlock(ss.head) // create maps to enable block returns ss.heights = make(map[uint64]*clustermodel.Proposal) @@ -115,6 +118,12 @@ func (ss *SyncSuite) SetupTest() { }, nil, ) + ss.snapshot.On("QuorumCertificate").Return( + func() *flow.QuorumCertificate { + return ss.qc + }, + nil, + ) ss.snapshot.On("Identities", mock.Anything).Return( func(selector flow.IdentityFilter[flow.Identity]) flow.IdentityList { return ss.participants.Filter(selector) @@ -187,8 +196,10 @@ func (ss *SyncSuite) TestOnSyncRequest() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head.Height, res.Header.Height, "response should contain head height") + assert.Equal(ss.T(), ss.head, res.Header, "response should contain header") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") + assert.Equal(ss.T(), ss.qc, res.CertifyingQC, "response should contain QC") + assert.Equal(ss.T(), res.Header.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") }, diff --git a/engine/collection/synchronization/request_handler.go b/engine/collection/synchronization/request_handler.go index 300eb32ac9a..ec052ee8dc2 100644 --- a/engine/collection/synchronization/request_handler.go +++ b/engine/collection/synchronization/request_handler.go @@ -171,7 +171,8 @@ func (r *RequestHandlerEngine) setupRequestMessageHandler() { // inform the other node of it, so they can organize their block downloads. If // we have a lower height, we add the difference to our own download queue. func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow.SyncRequest) error { - final, err := r.state.Final().Head() + finalizedState := r.state.Final() + final, err := finalizedState.Head() if err != nil { return fmt.Errorf("could not get last finalized header: %w", err) } @@ -186,7 +187,7 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow return nil } - qc, err := r.state.Final().QuorumCertificate() + qc, err := finalizedState.QuorumCertificate() if err != nil { return fmt.Errorf("could not get QC for last finalized header: %w", err) } diff --git a/engine/common/synchronization/engine.go b/engine/common/synchronization/engine.go index 7309dfb7b20..b3bbdd8b167 100644 --- a/engine/common/synchronization/engine.go +++ b/engine/common/synchronization/engine.go @@ -117,7 +117,7 @@ func New( return nil, fmt.Errorf("could not register engine: %w", err) } e.con = con - e.requestHandler = NewRequestHandler(log, metrics, NewResponseSender(con), me, finalizedHeaderCache, blocks, core, true) + e.requestHandler = NewRequestHandler(log, metrics, NewResponseSender(con), me, state, finalizedHeaderCache, blocks, core, true) // set up worker routines builder := component.NewComponentManagerBuilder(). diff --git a/engine/common/synchronization/engine_suite_test.go b/engine/common/synchronization/engine_suite_test.go index 99090f02f2b..8469ffdf111 100644 --- a/engine/common/synchronization/engine_suite_test.go +++ b/engine/common/synchronization/engine_suite_test.go @@ -36,6 +36,7 @@ type SyncSuite struct { myID flow.Identifier participants flow.IdentityList head *flow.Header + qc *flow.QuorumCertificate heights map[uint64]*flow.Proposal blockIDs map[flow.Identifier]*flow.Proposal net *mocknetwork.EngineRegistry @@ -63,6 +64,8 @@ func (ss *SyncSuite) SetupTest() { // generate a header for the final state header := unittest.BlockHeaderFixture() ss.head = header + // generate a QC certifying the header + ss.qc = unittest.CertifyBlock(ss.head) // create maps to enable block returns ss.heights = make(map[uint64]*flow.Proposal) @@ -113,6 +116,12 @@ func (ss *SyncSuite) SetupTest() { }, nil, ) + ss.snapshot.On("QuorumCertificate").Return( + func() *flow.QuorumCertificate { + return ss.qc + }, + nil, + ) ss.snapshot.On("Identities", mock.Anything).Return( func(selector flow.IdentityFilter[flow.Identity]) flow.IdentityList { return ss.participants.Filter(selector) diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index cc3f5aed85e..c9a078cf8a9 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -84,8 +84,10 @@ func (ss *SyncSuite) TestOnSyncRequest_LowerThanReceiver_OutsideTolerance() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head, res.Header, "response should contain head") + assert.Equal(ss.T(), ss.head, res.Header, "response should contain header") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") + assert.Equal(ss.T(), ss.qc, res.CertifyingQC, "response should contain QC") + assert.Equal(ss.T(), ss.head.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") }, @@ -120,6 +122,27 @@ func (ss *SyncSuite) TestOnSyncResponse() { ss.core.AssertExpectations(ss.T()) } +func (ss *SyncSuite) TestInvalidSyncResponse() { + ss.T().Skip() // TODO(8174) - implement this test + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + + height, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate height") + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(height)) + + // generate origin ID and response message + originID := unittest.IdentifierFixture() + res := &flow.SyncResponse{ + Nonce: nonce, + Header: header, + CertifyingQC: nil, + } + // TODO(8174): the response should be rejected and/or a violation should be logged + ss.e.onSyncResponse(originID, res) + ss.core.AssertExpectations(ss.T()) +} + func (ss *SyncSuite) TestOnRangeRequest() { nonce, err := rand.Uint64() require.NoError(ss.T(), err, "should generate nonce") diff --git a/engine/common/synchronization/request_handler.go b/engine/common/synchronization/request_handler.go index e707cf8594c..76ff79690af 100644 --- a/engine/common/synchronization/request_handler.go +++ b/engine/common/synchronization/request_handler.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network/channels" + "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/utils/logging" ) @@ -50,6 +51,7 @@ type RequestHandler struct { metrics module.EngineMetrics blocks storage.Blocks + state protocol.State finalizedHeaderCache module.FinalizedHeaderCache core module.SyncCore responseSender ResponseSender @@ -67,6 +69,7 @@ func NewRequestHandler( metrics module.EngineMetrics, responseSender ResponseSender, me module.Local, + state protocol.State, finalizedHeaderCache *events.FinalizedHeaderCache, blocks storage.Blocks, core module.SyncCore, @@ -76,6 +79,7 @@ func NewRequestHandler( me: me, log: log.With().Str("engine", "synchronization").Logger(), metrics: metrics, + state: state, finalizedHeaderCache: finalizedHeaderCache, blocks: blocks, core: core, @@ -151,7 +155,11 @@ func (r *RequestHandler) setupRequestMessageHandler() { // we have a lower height, we add the difference to our own download queue. // No errors are expected during normal operation. func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncRequest) error { - finalizedHeader := r.finalizedHeaderCache.Get() + finalizedState := r.state.Final() + finalizedHeader, err := finalizedState.Head() + if err != nil { + return err + } logger := r.log.With().Str("origin_id", originID.String()).Logger() logger.Debug(). @@ -160,6 +168,7 @@ func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncR Msg("received new sync request") if r.queueMissingHeights { + // TODO(8174) remove this step and only rely on certified sync responses? // queue any missing heights as needed r.core.HandleHeight(finalizedHeader, req.Height) } @@ -171,13 +180,16 @@ func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncR } // if we're sufficiently ahead of the requester, send a response - //protocol.Snapshot().Final() // TODO sync + qcForFinalizedHeader, err := finalizedState.QuorumCertificate() + if err != nil { + return err + } res := &messages.SyncResponse{ - Nonce: req.Nonce, - Header: finalizedHeader, - //CertifyingQC: state.QuorumCertificate(), + Nonce: req.Nonce, + Header: finalizedHeader, + CertifyingQC: qcForFinalizedHeader, } - err := r.responseSender.SendResponse(res, originID) + err = r.responseSender.SendResponse(res, originID) if err != nil { logger.Warn().Err(err).Msg("sending sync response failed") return nil diff --git a/engine/common/synchronization/request_handler_engine.go b/engine/common/synchronization/request_handler_engine.go index 72321306466..6ae5ce119bd 100644 --- a/engine/common/synchronization/request_handler_engine.go +++ b/engine/common/synchronization/request_handler_engine.go @@ -92,6 +92,7 @@ func NewRequestHandlerEngine( metrics, NewResponseSender(con), me, + state, finalizedHeaderCache, blocks, core, diff --git a/state/cluster/badger/snapshot.go b/state/cluster/badger/snapshot.go index 830009d957c..50e7206d4b1 100644 --- a/state/cluster/badger/snapshot.go +++ b/state/cluster/badger/snapshot.go @@ -81,13 +81,13 @@ func (s *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { // A snapshot with s.err == nil is only created for known blocks. Hence, only case 2 is // possible here. } + // at least one child exists (pendingIDs[0]) var child flow.Header err = operation.RetrieveHeader(s.state.db.Reader(), pendingIDs[0], &child) if err != nil { return nil, fmt.Errorf("could not retrieve header for unfinalized child %v: %w", pendingIDs[0], err) } return child.ParentQC(), nil - } // Pending returns the IDs of all collections descending from the snapshot's head collection. From 05e6fa9a1e0f58597ba8c92a75b4dd558167f3dd Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 9 Jan 2026 15:45:26 -0800 Subject: [PATCH 05/10] convert collection sync engine to MessageProcessor --- engine/collection/synchronization/engine.go | 23 ------------------- .../collection/synchronization/engine_test.go | 9 ++------ .../synchronization/request_handler.go | 23 ------------------- 3 files changed, 2 insertions(+), 53 deletions(-) diff --git a/engine/collection/synchronization/engine.go b/engine/collection/synchronization/engine.go index f2f2cfe001a..a4dabdcb61c 100644 --- a/engine/collection/synchronization/engine.go +++ b/engine/collection/synchronization/engine.go @@ -186,29 +186,6 @@ func (e *Engine) Done() <-chan struct{} { return e.lm.Stopped() } -// SubmitLocal submits an event originating on the local node. -func (e *Engine) SubmitLocal(event interface{}) { - err := e.ProcessLocal(event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := e.Process(channel, originID, event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - return e.process(e.me.NodeID(), event) -} - // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { diff --git a/engine/collection/synchronization/engine_test.go b/engine/collection/synchronization/engine_test.go index 9d4d85442d4..4f5e2b577f9 100644 --- a/engine/collection/synchronization/engine_test.go +++ b/engine/collection/synchronization/engine_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/onflow/flow-go/engine" mockcollection "github.com/onflow/flow-go/engine/collection/mock" clustermodel "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" @@ -608,18 +607,14 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { ss.metrics.AssertExpectations(ss.T()) } -// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type +// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type // was submitted from network layer. func (ss *SyncSuite) TestProcessUnsupportedMessageType() { invalidEvent := uint64(42) - engines := []netint.Engine{ss.e, ss.e.requestHandler} + engines := []netint.MessageProcessor{ss.e, ss.e.requestHandler} for _, e := range engines { err := e.Process("ch", unittest.IdentifierFixture(), invalidEvent) // shouldn't result in error since byzantine inputs are expected require.NoError(ss.T(), err) - // in case of local processing error cannot be consumed since all inputs are trusted - err = e.ProcessLocal(invalidEvent) - require.Error(ss.T(), err) - require.True(ss.T(), engine.IsIncompatibleInputTypeError(err)) } } diff --git a/engine/collection/synchronization/request_handler.go b/engine/collection/synchronization/request_handler.go index ec052ee8dc2..83a158fe972 100644 --- a/engine/collection/synchronization/request_handler.go +++ b/engine/collection/synchronization/request_handler.go @@ -78,29 +78,6 @@ func NewRequestHandlerEngine( return r } -// SubmitLocal submits an event originating on the local node. -func (r *RequestHandlerEngine) SubmitLocal(event interface{}) { - err := r.ProcessLocal(event) - if err != nil { - r.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (r *RequestHandlerEngine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := r.Process(channel, originID, event) - if err != nil { - r.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// ProcessLocal processes an event originating on the local node. -func (r *RequestHandlerEngine) ProcessLocal(event interface{}) error { - return r.process(r.me.NodeID(), event) -} - // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. func (r *RequestHandlerEngine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { From 4d6fa69938e5b1215cc641939819e8d6c44adb5c Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 12 Jan 2026 10:14:14 -0800 Subject: [PATCH 06/10] update QuorumCertificate docs on cluster snapshot --- state/cluster/badger/snapshot.go | 7 +++++++ state/cluster/snapshot.go | 3 ++- storage/operation/qcs.go | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/state/cluster/badger/snapshot.go b/state/cluster/badger/snapshot.go index 50e7206d4b1..c8d76066c27 100644 --- a/state/cluster/badger/snapshot.go +++ b/state/cluster/badger/snapshot.go @@ -68,7 +68,14 @@ func (s *Snapshot) Head() (*flow.Header, error) { return &head, err } +// QuorumCertificate returns a valid quorum certificate for the header at this snapshot, if one exists. +// +// Expected error returns during normal operations: +// - [storage.ErrNotFound] is returned if the QC is unknown. func (s *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { + // Implementation detail: QuorumCertificates storage / operation.RetrieveQuorumCertificate only indexes QCs + // for main consensus blocks, not cluster blocks, so we directly check for any children that would have a + // QC for the cluster header. var pendingIDs flow.IdentifierList err := operation.RetrieveBlockChildren(s.state.db.Reader(), s.blockID, &pendingIDs) if err != nil { diff --git a/state/cluster/snapshot.go b/state/cluster/snapshot.go index 44a3b9e80ad..2dc593cefb0 100644 --- a/state/cluster/snapshot.go +++ b/state/cluster/snapshot.go @@ -23,9 +23,10 @@ type Snapshot interface { // - If the snapshot is for an unknown collection [state.ErrUnknownSnapshotReference] Head() (*flow.Header, error) - // QuorumCertificate returns a valid quorum certificate pointing to the header at this snapshot. + // QuorumCertificate returns a valid quorum certificate for the header at this snapshot, if one exists. // // Expected error returns during normal operations: + // - [storage.ErrNotFound] is returned if the QC is unknown. // - If the snapshot is for an unknown collection [state.ErrUnknownSnapshotReference] QuorumCertificate() (*flow.QuorumCertificate, error) diff --git a/storage/operation/qcs.go b/storage/operation/qcs.go index 00f1cabe7b8..2008e1839b8 100644 --- a/storage/operation/qcs.go +++ b/storage/operation/qcs.go @@ -23,7 +23,7 @@ import ( // inserting the QC after checking that no QC is already stored. // // Expected error returns: -// - [storage.ErrAlreadyExists] if any QuorumCertificate certifying the samn block already exists +// - [storage.ErrAlreadyExists] if any QuorumCertificate certifying the same block already exists func InsertQuorumCertificate(lctx lockctx.Proof, rw storage.ReaderBatchWriter, qc *flow.QuorumCertificate) error { if !lctx.HoldsLock(storage.LockInsertBlock) { return fmt.Errorf("cannot insert quorum certificate without holding lock %s", storage.LockInsertBlock) From 5e48913bcb3d778094f6841264108448c3faf8fc Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 12 Jan 2026 10:33:53 -0800 Subject: [PATCH 07/10] update comments --- model/flow/synchronization.go | 4 ++-- state/cluster/badger/snapshot.go | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/model/flow/synchronization.go b/model/flow/synchronization.go index a7806ae8d36..c841571bd65 100644 --- a/model/flow/synchronization.go +++ b/model/flow/synchronization.go @@ -9,8 +9,8 @@ type SyncRequest struct { } // SyncResponse is part of the synchronization protocol and represents the reply -// to a synchronization request that contains the latest finalized block height -// of the responding node. +// to a synchronization request. It contains the latest finalized block height +// of the responding node, via the finalized header and QC certifying that header. type SyncResponse struct { Nonce uint64 Header *Header diff --git a/state/cluster/badger/snapshot.go b/state/cluster/badger/snapshot.go index c8d76066c27..7a3066db622 100644 --- a/state/cluster/badger/snapshot.go +++ b/state/cluster/badger/snapshot.go @@ -79,16 +79,15 @@ func (s *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { var pendingIDs flow.IdentifierList err := operation.RetrieveBlockChildren(s.state.db.Reader(), s.blockID, &pendingIDs) if err != nil { - return nil, fmt.Errorf("could not get QC for finalized head %v: %w", s.blockID, err) - // The low-level storage returns `storage.ErrNotFound` in two cases: // 1. the block/collection is unknown // 2. the block/collection is known but no children have been indexed yet // By contract of the constructor, the blockID must correspond to a known collection in the database. // A snapshot with s.err == nil is only created for known blocks. Hence, only case 2 is - // possible here. + // possible here (no children). + return nil, fmt.Errorf("could not get QC for finalized head %v: %w", s.blockID, err) } - // at least one child exists (pendingIDs[0]) + // at least one child exists (pendingIDs[0]) - guaranteed by RetrieveBlockChildren var child flow.Header err = operation.RetrieveHeader(s.state.db.Reader(), pendingIDs[0], &child) if err != nil { From f85638082b2b1138f4bfb29fc0a0a5c564b79328 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 12 Jan 2026 10:49:18 -0800 Subject: [PATCH 08/10] avoid pointers in SyncResponse --- engine/collection/synchronization/engine_test.go | 8 ++++---- engine/collection/synchronization/request_handler.go | 4 ++-- engine/common/synchronization/engine_test.go | 12 ++++++------ engine/common/synchronization/request_handler.go | 4 ++-- model/flow/synchronization.go | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/engine/collection/synchronization/engine_test.go b/engine/collection/synchronization/engine_test.go index 4f5e2b577f9..caae838dbd8 100644 --- a/engine/collection/synchronization/engine_test.go +++ b/engine/collection/synchronization/engine_test.go @@ -217,8 +217,8 @@ func (ss *SyncSuite) TestOnSyncResponse() { header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(rand.Uint64())) res := &flow.SyncResponse{ Nonce: rand.Uint64(), - Header: header, - CertifyingQC: unittest.CertifyBlock(header), + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } // the height should be handled @@ -574,8 +574,8 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i))) msg := &flow.SyncResponse{ Nonce: uint64(i), - Header: header, - CertifyingQC: unittest.CertifyBlock(header), + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once() ss.metrics.On("MessageSent", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once() diff --git a/engine/collection/synchronization/request_handler.go b/engine/collection/synchronization/request_handler.go index 83a158fe972..d05c144f585 100644 --- a/engine/collection/synchronization/request_handler.go +++ b/engine/collection/synchronization/request_handler.go @@ -172,8 +172,8 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow // if we're sufficiently ahead of the requester, send a response res := &messages.SyncResponse{ Nonce: req.Nonce, - Header: final, - CertifyingQC: qc, + Header: *final, + CertifyingQC: *qc, } err = r.con.Unicast(res, originID) if err != nil { diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index c9a078cf8a9..75c379b8310 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -112,8 +112,8 @@ func (ss *SyncSuite) TestOnSyncResponse() { originID := unittest.IdentifierFixture() res := &flow.SyncResponse{ Nonce: nonce, - Header: header, - CertifyingQC: unittest.CertifyBlock(header), + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } // the height should be handled @@ -135,8 +135,8 @@ func (ss *SyncSuite) TestInvalidSyncResponse() { originID := unittest.IdentifierFixture() res := &flow.SyncResponse{ Nonce: nonce, - Header: header, - CertifyingQC: nil, + Header: *header, + CertifyingQC: flow.QuorumCertificate{}, } // TODO(8174): the response should be rejected and/or a violation should be logged ss.e.onSyncResponse(originID, res) @@ -494,8 +494,8 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i))) msg := &flow.SyncResponse{ Nonce: uint64(i), - Header: header, - CertifyingQC: unittest.CertifyBlock(header), + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once() ss.metrics.On("MessageSent", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once() diff --git a/engine/common/synchronization/request_handler.go b/engine/common/synchronization/request_handler.go index 76ff79690af..f8fa7219269 100644 --- a/engine/common/synchronization/request_handler.go +++ b/engine/common/synchronization/request_handler.go @@ -186,8 +186,8 @@ func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncR } res := &messages.SyncResponse{ Nonce: req.Nonce, - Header: finalizedHeader, - CertifyingQC: qcForFinalizedHeader, + Header: *finalizedHeader, + CertifyingQC: *qcForFinalizedHeader, } err = r.responseSender.SendResponse(res, originID) if err != nil { diff --git a/model/flow/synchronization.go b/model/flow/synchronization.go index c841571bd65..b9cbc9d2f72 100644 --- a/model/flow/synchronization.go +++ b/model/flow/synchronization.go @@ -13,8 +13,8 @@ type SyncRequest struct { // of the responding node, via the finalized header and QC certifying that header. type SyncResponse struct { Nonce uint64 - Header *Header - CertifyingQC *QuorumCertificate + Header Header + CertifyingQC QuorumCertificate } // BatchRequest is part of the synchronization protocol and represents an active From 6d9ca64caa366d310c0603140f1ba7342cfe7323 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 12 Jan 2026 11:13:59 -0800 Subject: [PATCH 09/10] fix tests (removing pointers from SyncResponse) --- engine/collection/synchronization/engine_test.go | 4 ++-- engine/common/synchronization/engine_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/engine/collection/synchronization/engine_test.go b/engine/collection/synchronization/engine_test.go index caae838dbd8..d088a803f17 100644 --- a/engine/collection/synchronization/engine_test.go +++ b/engine/collection/synchronization/engine_test.go @@ -195,9 +195,9 @@ func (ss *SyncSuite) TestOnSyncRequest() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head, res.Header, "response should contain header") + assert.Equal(ss.T(), *ss.head, res.Header, "response should contain header") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") - assert.Equal(ss.T(), ss.qc, res.CertifyingQC, "response should contain QC") + assert.Equal(ss.T(), *ss.qc, res.CertifyingQC, "response should contain QC") assert.Equal(ss.T(), res.Header.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index 75c379b8310..cbce4c2ccf4 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -84,9 +84,9 @@ func (ss *SyncSuite) TestOnSyncRequest_LowerThanReceiver_OutsideTolerance() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head, res.Header, "response should contain header") + assert.Equal(ss.T(), *ss.head, res.Header, "response should contain header") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") - assert.Equal(ss.T(), ss.qc, res.CertifyingQC, "response should contain QC") + assert.Equal(ss.T(), *ss.qc, res.CertifyingQC, "response should contain QC") assert.Equal(ss.T(), ss.head.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") From d14aaee5990c5e6b6b0307b5a25e974c7e7f9e48 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Mon, 12 Jan 2026 14:24:29 -0800 Subject: [PATCH 10/10] backwards compatibility for SyncResponse --- engine/collection/synchronization/engine.go | 7 ++++++- engine/collection/synchronization/request_handler.go | 1 + engine/common/synchronization/engine.go | 7 ++++++- engine/common/synchronization/engine_test.go | 8 ++++++++ engine/common/synchronization/request_handler.go | 1 + model/flow/synchronization.go | 1 + 6 files changed, 23 insertions(+), 2 deletions(-) diff --git a/engine/collection/synchronization/engine.go b/engine/collection/synchronization/engine.go index a4dabdcb61c..5c57cdc3dfc 100644 --- a/engine/collection/synchronization/engine.go +++ b/engine/collection/synchronization/engine.go @@ -264,7 +264,12 @@ func (e *Engine) onSyncResponse(_ flow.Identifier, res *flow.SyncResponse) { e.log.Error().Err(err).Msg("could not get last finalized header") return } - e.core.HandleHeight(final, res.Header.Height) + // backwards compatibility - ignore the Header/QC if they are not present, and use Height field instead + if res.Header.Height == 0 { + e.core.HandleHeight(final, res.Height) + } else { + e.core.HandleHeight(final, res.Header.Height) + } } // onBlockResponse processes a slice of requested block proposals. diff --git a/engine/collection/synchronization/request_handler.go b/engine/collection/synchronization/request_handler.go index d05c144f585..84ae410714f 100644 --- a/engine/collection/synchronization/request_handler.go +++ b/engine/collection/synchronization/request_handler.go @@ -172,6 +172,7 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow // if we're sufficiently ahead of the requester, send a response res := &messages.SyncResponse{ Nonce: req.Nonce, + Height: final.Height, Header: *final, CertifyingQC: *qc, } diff --git a/engine/common/synchronization/engine.go b/engine/common/synchronization/engine.go index b3bbdd8b167..60c4c151683 100644 --- a/engine/common/synchronization/engine.go +++ b/engine/common/synchronization/engine.go @@ -296,7 +296,12 @@ func (e *Engine) processAvailableResponses(ctx context.Context) { func (e *Engine) onSyncResponse(originID flow.Identifier, res *flow.SyncResponse) { e.log.Debug().Str("origin_id", originID.String()).Msg("received sync response") final := e.finalizedHeaderCache.Get() - e.core.HandleHeight(final, res.Header.Height) + // backwards compatibility - ignore the Header/QC if they are not present, and use Height field instead + if res.Header.Height == 0 { + e.core.HandleHeight(final, res.Height) + } else { + e.core.HandleHeight(final, res.Header.Height) + } } // onBlockResponse processes a structurally validated block proposal containing a specifically requested block response. diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index cbce4c2ccf4..b2c474ec9e4 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -119,6 +119,14 @@ func (ss *SyncSuite) TestOnSyncResponse() { // the height should be handled ss.core.On("HandleHeight", ss.head, res.Header.Height) ss.e.onSyncResponse(originID, res) + + // Backwards Compatibility - message with only Height should also be handled + res = &flow.SyncResponse{ + Nonce: nonce + 1, + Height: header.Height + 1, + } + ss.core.On("HandleHeight", ss.head, res.Height) + ss.e.onSyncResponse(originID, res) ss.core.AssertExpectations(ss.T()) } diff --git a/engine/common/synchronization/request_handler.go b/engine/common/synchronization/request_handler.go index f8fa7219269..6931e9e88d0 100644 --- a/engine/common/synchronization/request_handler.go +++ b/engine/common/synchronization/request_handler.go @@ -186,6 +186,7 @@ func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncR } res := &messages.SyncResponse{ Nonce: req.Nonce, + Height: finalizedHeader.Height, Header: *finalizedHeader, CertifyingQC: *qcForFinalizedHeader, } diff --git a/model/flow/synchronization.go b/model/flow/synchronization.go index b9cbc9d2f72..e737e070e41 100644 --- a/model/flow/synchronization.go +++ b/model/flow/synchronization.go @@ -13,6 +13,7 @@ type SyncRequest struct { // of the responding node, via the finalized header and QC certifying that header. type SyncResponse struct { Nonce uint64 + Height uint64 // retained for backwards compatibility Header Header CertifyingQC QuorumCertificate }