diff --git a/engine/collection/synchronization/engine.go b/engine/collection/synchronization/engine.go index d87910e2fa7..5c57cdc3dfc 100644 --- a/engine/collection/synchronization/engine.go +++ b/engine/collection/synchronization/engine.go @@ -186,29 +186,6 @@ func (e *Engine) Done() <-chan struct{} { return e.lm.Stopped() } -// SubmitLocal submits an event originating on the local node. -func (e *Engine) SubmitLocal(event interface{}) { - err := e.ProcessLocal(event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := e.Process(channel, originID, event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - return e.process(e.me.NodeID(), event) -} - // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { @@ -287,7 +264,12 @@ func (e *Engine) onSyncResponse(_ flow.Identifier, res *flow.SyncResponse) { e.log.Error().Err(err).Msg("could not get last finalized header") return } - e.core.HandleHeight(final, res.Height) + // backwards compatibility - ignore the Header/QC if they are not present, and use Height field instead + if res.Header.Height == 0 { + e.core.HandleHeight(final, res.Height) + } else { + e.core.HandleHeight(final, res.Header.Height) + } } // onBlockResponse processes a slice of requested block proposals. diff --git a/engine/collection/synchronization/engine_test.go b/engine/collection/synchronization/engine_test.go index 80203ef8aed..d088a803f17 100644 --- a/engine/collection/synchronization/engine_test.go +++ b/engine/collection/synchronization/engine_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "github.com/onflow/flow-go/engine" mockcollection "github.com/onflow/flow-go/engine/collection/mock" clustermodel "github.com/onflow/flow-go/model/cluster" "github.com/onflow/flow-go/model/flow" @@ -41,6 +40,7 @@ type SyncSuite struct { myID flow.Identifier participants flow.IdentityList head *flow.Header + qc *flow.QuorumCertificate heights map[uint64]*clustermodel.Proposal blockIDs map[flow.Identifier]*clustermodel.Proposal net *mocknetwork.EngineRegistry @@ -64,6 +64,8 @@ func (ss *SyncSuite) SetupTest() { // generate a header for the final state header := unittest.BlockHeaderFixture() ss.head = header + // generate a QC certifying the header + ss.qc = unittest.CertifyBlock(ss.head) // create maps to enable block returns ss.heights = make(map[uint64]*clustermodel.Proposal) @@ -115,6 +117,12 @@ func (ss *SyncSuite) SetupTest() { }, nil, ) + ss.snapshot.On("QuorumCertificate").Return( + func() *flow.QuorumCertificate { + return ss.qc + }, + nil, + ) ss.snapshot.On("Identities", mock.Anything).Return( func(selector flow.IdentityFilter[flow.Identity]) flow.IdentityList { return ss.participants.Filter(selector) @@ -187,8 +195,10 @@ func (ss *SyncSuite) TestOnSyncRequest() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head.Height, res.Height, "response should contain head height") + assert.Equal(ss.T(), *ss.head, res.Header, "response should contain header") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") + assert.Equal(ss.T(), *ss.qc, res.CertifyingQC, "response should contain QC") + assert.Equal(ss.T(), res.Header.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") }, @@ -204,13 +214,15 @@ func (ss *SyncSuite) TestOnSyncRequest() { func (ss *SyncSuite) TestOnSyncResponse() { // generate origin ID and response message originID := unittest.IdentifierFixture() + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(rand.Uint64())) res := &flow.SyncResponse{ - Nonce: rand.Uint64(), - Height: rand.Uint64(), + Nonce: rand.Uint64(), + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } // the height should be handled - ss.core.On("HandleHeight", ss.head, res.Height) + ss.core.On("HandleHeight", ss.head, res.Header.Height) ss.e.onSyncResponse(originID, res) ss.core.AssertExpectations(ss.T()) } @@ -559,11 +571,13 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { originID := unittest.IdentifierFixture() for i := 0; i < 5; i++ { + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i))) msg := &flow.SyncResponse{ - Nonce: uint64(i), - Height: uint64(1000 + i), + Nonce: uint64(i), + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } - ss.core.On("HandleHeight", mock.Anything, msg.Height).Once() + ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once() ss.metrics.On("MessageSent", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageReceived", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageHandled", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once() @@ -593,18 +607,14 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { ss.metrics.AssertExpectations(ss.T()) } -// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type +// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type // was submitted from network layer. func (ss *SyncSuite) TestProcessUnsupportedMessageType() { invalidEvent := uint64(42) - engines := []netint.Engine{ss.e, ss.e.requestHandler} + engines := []netint.MessageProcessor{ss.e, ss.e.requestHandler} for _, e := range engines { err := e.Process("ch", unittest.IdentifierFixture(), invalidEvent) // shouldn't result in error since byzantine inputs are expected require.NoError(ss.T(), err) - // in case of local processing error cannot be consumed since all inputs are trusted - err = e.ProcessLocal(invalidEvent) - require.Error(ss.T(), err) - require.True(ss.T(), engine.IsIncompatibleInputTypeError(err)) } } diff --git a/engine/collection/synchronization/request_handler.go b/engine/collection/synchronization/request_handler.go index 428af23aaee..84ae410714f 100644 --- a/engine/collection/synchronization/request_handler.go +++ b/engine/collection/synchronization/request_handler.go @@ -78,29 +78,6 @@ func NewRequestHandlerEngine( return r } -// SubmitLocal submits an event originating on the local node. -func (r *RequestHandlerEngine) SubmitLocal(event interface{}) { - err := r.ProcessLocal(event) - if err != nil { - r.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (r *RequestHandlerEngine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := r.Process(channel, originID, event) - if err != nil { - r.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// ProcessLocal processes an event originating on the local node. -func (r *RequestHandlerEngine) ProcessLocal(event interface{}) error { - return r.process(r.me.NodeID(), event) -} - // Process processes the given event from the node with the given origin ID in // a blocking manner. It returns the potential processing error when done. func (r *RequestHandlerEngine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { @@ -171,11 +148,13 @@ func (r *RequestHandlerEngine) setupRequestMessageHandler() { // inform the other node of it, so they can organize their block downloads. If // we have a lower height, we add the difference to our own download queue. func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow.SyncRequest) error { - final, err := r.state.Final().Head() + finalizedState := r.state.Final() + final, err := finalizedState.Head() if err != nil { return fmt.Errorf("could not get last finalized header: %w", err) } + // TODO(8173) remove this step and only rely on certified sync responses? // queue any missing heights as needed r.core.HandleHeight(final, req.Height) @@ -185,10 +164,17 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow return nil } + qc, err := finalizedState.QuorumCertificate() + if err != nil { + return fmt.Errorf("could not get QC for last finalized header: %w", err) + } + // if we're sufficiently ahead of the requester, send a response res := &messages.SyncResponse{ - Height: final.Height, - Nonce: req.Nonce, + Nonce: req.Nonce, + Height: final.Height, + Header: *final, + CertifyingQC: *qc, } err = r.con.Unicast(res, originID) if err != nil { diff --git a/engine/common/synchronization/engine.go b/engine/common/synchronization/engine.go index cc6ec369064..60c4c151683 100644 --- a/engine/common/synchronization/engine.go +++ b/engine/common/synchronization/engine.go @@ -117,7 +117,7 @@ func New( return nil, fmt.Errorf("could not register engine: %w", err) } e.con = con - e.requestHandler = NewRequestHandler(log, metrics, NewResponseSender(con), me, finalizedHeaderCache, blocks, core, true) + e.requestHandler = NewRequestHandler(log, metrics, NewResponseSender(con), me, state, finalizedHeaderCache, blocks, core, true) // set up worker routines builder := component.NewComponentManagerBuilder(). @@ -296,7 +296,12 @@ func (e *Engine) processAvailableResponses(ctx context.Context) { func (e *Engine) onSyncResponse(originID flow.Identifier, res *flow.SyncResponse) { e.log.Debug().Str("origin_id", originID.String()).Msg("received sync response") final := e.finalizedHeaderCache.Get() - e.core.HandleHeight(final, res.Height) + // backwards compatibility - ignore the Header/QC if they are not present, and use Height field instead + if res.Header.Height == 0 { + e.core.HandleHeight(final, res.Height) + } else { + e.core.HandleHeight(final, res.Header.Height) + } } // onBlockResponse processes a structurally validated block proposal containing a specifically requested block response. diff --git a/engine/common/synchronization/engine_suite_test.go b/engine/common/synchronization/engine_suite_test.go index 99090f02f2b..8469ffdf111 100644 --- a/engine/common/synchronization/engine_suite_test.go +++ b/engine/common/synchronization/engine_suite_test.go @@ -36,6 +36,7 @@ type SyncSuite struct { myID flow.Identifier participants flow.IdentityList head *flow.Header + qc *flow.QuorumCertificate heights map[uint64]*flow.Proposal blockIDs map[flow.Identifier]*flow.Proposal net *mocknetwork.EngineRegistry @@ -63,6 +64,8 @@ func (ss *SyncSuite) SetupTest() { // generate a header for the final state header := unittest.BlockHeaderFixture() ss.head = header + // generate a QC certifying the header + ss.qc = unittest.CertifyBlock(ss.head) // create maps to enable block returns ss.heights = make(map[uint64]*flow.Proposal) @@ -113,6 +116,12 @@ func (ss *SyncSuite) SetupTest() { }, nil, ) + ss.snapshot.On("QuorumCertificate").Return( + func() *flow.QuorumCertificate { + return ss.qc + }, + nil, + ) ss.snapshot.On("Identities", mock.Anything).Return( func(selector flow.IdentityFilter[flow.Identity]) flow.IdentityList { return ss.participants.Filter(selector) diff --git a/engine/common/synchronization/engine_test.go b/engine/common/synchronization/engine_test.go index 2f5c399c6f8..b2c474ec9e4 100644 --- a/engine/common/synchronization/engine_test.go +++ b/engine/common/synchronization/engine_test.go @@ -84,8 +84,10 @@ func (ss *SyncSuite) TestOnSyncRequest_LowerThanReceiver_OutsideTolerance() { ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run( func(args mock.Arguments) { res := args.Get(0).(*messages.SyncResponse) - assert.Equal(ss.T(), ss.head.Height, res.Height, "response should contain head height") + assert.Equal(ss.T(), *ss.head, res.Header, "response should contain header") assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce") + assert.Equal(ss.T(), *ss.qc, res.CertifyingQC, "response should contain QC") + assert.Equal(ss.T(), ss.head.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header") recipientID := args.Get(1).(flow.Identifier) assert.Equal(ss.T(), originID, recipientID, "should send response to original sender") }, @@ -104,20 +106,51 @@ func (ss *SyncSuite) TestOnSyncResponse() { height, err := rand.Uint64() require.NoError(ss.T(), err, "should generate height") + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(height)) // generate origin ID and response message originID := unittest.IdentifierFixture() res := &flow.SyncResponse{ - Nonce: nonce, - Height: height, + Nonce: nonce, + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } // the height should be handled + ss.core.On("HandleHeight", ss.head, res.Header.Height) + ss.e.onSyncResponse(originID, res) + + // Backwards Compatibility - message with only Height should also be handled + res = &flow.SyncResponse{ + Nonce: nonce + 1, + Height: header.Height + 1, + } ss.core.On("HandleHeight", ss.head, res.Height) ss.e.onSyncResponse(originID, res) ss.core.AssertExpectations(ss.T()) } +func (ss *SyncSuite) TestInvalidSyncResponse() { + ss.T().Skip() // TODO(8174) - implement this test + nonce, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate nonce") + + height, err := rand.Uint64() + require.NoError(ss.T(), err, "should generate height") + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(height)) + + // generate origin ID and response message + originID := unittest.IdentifierFixture() + res := &flow.SyncResponse{ + Nonce: nonce, + Header: *header, + CertifyingQC: flow.QuorumCertificate{}, + } + // TODO(8174): the response should be rejected and/or a violation should be logged + ss.e.onSyncResponse(originID, res) + ss.core.AssertExpectations(ss.T()) +} + func (ss *SyncSuite) TestOnRangeRequest() { nonce, err := rand.Uint64() require.NoError(ss.T(), err, "should generate nonce") @@ -466,11 +499,13 @@ func (ss *SyncSuite) TestProcessingMultipleItems() { originID := unittest.IdentifierFixture() for i := 0; i < 5; i++ { + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i))) msg := &flow.SyncResponse{ - Nonce: uint64(i), - Height: uint64(1000 + i), + Nonce: uint64(i), + Header: *header, + CertifyingQC: *unittest.CertifyBlock(header), } - ss.core.On("HandleHeight", mock.Anything, msg.Height).Once() + ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once() ss.metrics.On("MessageSent", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageHandled", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once() ss.metrics.On("MessageReceived", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once() diff --git a/engine/common/synchronization/request_handler.go b/engine/common/synchronization/request_handler.go index 75367e0d616..6931e9e88d0 100644 --- a/engine/common/synchronization/request_handler.go +++ b/engine/common/synchronization/request_handler.go @@ -17,6 +17,7 @@ import ( "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network/channels" + "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" "github.com/onflow/flow-go/utils/logging" ) @@ -50,6 +51,7 @@ type RequestHandler struct { metrics module.EngineMetrics blocks storage.Blocks + state protocol.State finalizedHeaderCache module.FinalizedHeaderCache core module.SyncCore responseSender ResponseSender @@ -67,6 +69,7 @@ func NewRequestHandler( metrics module.EngineMetrics, responseSender ResponseSender, me module.Local, + state protocol.State, finalizedHeaderCache *events.FinalizedHeaderCache, blocks storage.Blocks, core module.SyncCore, @@ -76,6 +79,7 @@ func NewRequestHandler( me: me, log: log.With().Str("engine", "synchronization").Logger(), metrics: metrics, + state: state, finalizedHeaderCache: finalizedHeaderCache, blocks: blocks, core: core, @@ -151,7 +155,11 @@ func (r *RequestHandler) setupRequestMessageHandler() { // we have a lower height, we add the difference to our own download queue. // No errors are expected during normal operation. func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncRequest) error { - finalizedHeader := r.finalizedHeaderCache.Get() + finalizedState := r.state.Final() + finalizedHeader, err := finalizedState.Head() + if err != nil { + return err + } logger := r.log.With().Str("origin_id", originID.String()).Logger() logger.Debug(). @@ -160,6 +168,7 @@ func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncR Msg("received new sync request") if r.queueMissingHeights { + // TODO(8174) remove this step and only rely on certified sync responses? // queue any missing heights as needed r.core.HandleHeight(finalizedHeader, req.Height) } @@ -171,11 +180,17 @@ func (r *RequestHandler) onSyncRequest(originID flow.Identifier, req *flow.SyncR } // if we're sufficiently ahead of the requester, send a response + qcForFinalizedHeader, err := finalizedState.QuorumCertificate() + if err != nil { + return err + } res := &messages.SyncResponse{ - Height: finalizedHeader.Height, - Nonce: req.Nonce, + Nonce: req.Nonce, + Height: finalizedHeader.Height, + Header: *finalizedHeader, + CertifyingQC: *qcForFinalizedHeader, } - err := r.responseSender.SendResponse(res, originID) + err = r.responseSender.SendResponse(res, originID) if err != nil { logger.Warn().Err(err).Msg("sending sync response failed") return nil diff --git a/engine/common/synchronization/request_handler_engine.go b/engine/common/synchronization/request_handler_engine.go index 72321306466..6ae5ce119bd 100644 --- a/engine/common/synchronization/request_handler_engine.go +++ b/engine/common/synchronization/request_handler_engine.go @@ -92,6 +92,7 @@ func NewRequestHandlerEngine( metrics, NewResponseSender(con), me, + state, finalizedHeaderCache, blocks, core, diff --git a/model/flow/synchronization.go b/model/flow/synchronization.go index ff98d2937a5..e737e070e41 100644 --- a/model/flow/synchronization.go +++ b/model/flow/synchronization.go @@ -9,11 +9,13 @@ type SyncRequest struct { } // SyncResponse is part of the synchronization protocol and represents the reply -// to a synchronization request that contains the latest finalized block height -// of the responding node. +// to a synchronization request. It contains the latest finalized block height +// of the responding node, via the finalized header and QC certifying that header. type SyncResponse struct { - Nonce uint64 - Height uint64 + Nonce uint64 + Height uint64 // retained for backwards compatibility + Header Header + CertifyingQC QuorumCertificate } // BatchRequest is part of the synchronization protocol and represents an active diff --git a/state/cluster/badger/mutator_test.go b/state/cluster/badger/mutator_test.go index cbf68967be3..37291cab3a5 100644 --- a/state/cluster/badger/mutator_test.go +++ b/state/cluster/badger/mutator_test.go @@ -497,6 +497,10 @@ func (suite *MutatorSuite) TestExtend_WithOrphanedReferenceBlock() { // create a block extending genesis (conflicting with previous) which is finalized finalized := unittest.BlockWithParentProtocolState(suite.protoGenesis) finalized.Payload.Guarantees = nil + if finalized.View == orphaned.View { + // ensure we don't have two certified blocks in a single view + finalized.View = orphaned.View + 1 + } err = suite.protoState.ExtendCertified(context.Background(), unittest.NewCertifiedBlock(finalized)) suite.Require().NoError(err) err = suite.protoState.Finalize(context.Background(), finalized.ID()) diff --git a/state/cluster/badger/snapshot.go b/state/cluster/badger/snapshot.go index 6dd8dad653e..7a3066db622 100644 --- a/state/cluster/badger/snapshot.go +++ b/state/cluster/badger/snapshot.go @@ -68,6 +68,34 @@ func (s *Snapshot) Head() (*flow.Header, error) { return &head, err } +// QuorumCertificate returns a valid quorum certificate for the header at this snapshot, if one exists. +// +// Expected error returns during normal operations: +// - [storage.ErrNotFound] is returned if the QC is unknown. +func (s *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { + // Implementation detail: QuorumCertificates storage / operation.RetrieveQuorumCertificate only indexes QCs + // for main consensus blocks, not cluster blocks, so we directly check for any children that would have a + // QC for the cluster header. + var pendingIDs flow.IdentifierList + err := operation.RetrieveBlockChildren(s.state.db.Reader(), s.blockID, &pendingIDs) + if err != nil { + // The low-level storage returns `storage.ErrNotFound` in two cases: + // 1. the block/collection is unknown + // 2. the block/collection is known but no children have been indexed yet + // By contract of the constructor, the blockID must correspond to a known collection in the database. + // A snapshot with s.err == nil is only created for known blocks. Hence, only case 2 is + // possible here (no children). + return nil, fmt.Errorf("could not get QC for finalized head %v: %w", s.blockID, err) + } + // at least one child exists (pendingIDs[0]) - guaranteed by RetrieveBlockChildren + var child flow.Header + err = operation.RetrieveHeader(s.state.db.Reader(), pendingIDs[0], &child) + if err != nil { + return nil, fmt.Errorf("could not retrieve header for unfinalized child %v: %w", pendingIDs[0], err) + } + return child.ParentQC(), nil +} + // Pending returns the IDs of all collections descending from the snapshot's head collection. // The result is ordered such that parents are included before their children. While only valid // descendants will be returned, note that the descendants may not be finalized yet. diff --git a/state/cluster/invalid/snapshot.go b/state/cluster/invalid/snapshot.go index 02ccb6503ae..51cf379aa8b 100644 --- a/state/cluster/invalid/snapshot.go +++ b/state/cluster/invalid/snapshot.go @@ -42,6 +42,10 @@ func (u *Snapshot) Head() (*flow.Header, error) { return nil, u.err } +func (u *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { + return nil, u.err +} + func (u *Snapshot) Pending() ([]flow.Identifier, error) { return nil, u.err } diff --git a/state/cluster/mock/snapshot.go b/state/cluster/mock/snapshot.go index 08938de8e26..14197fedeb0 100644 --- a/state/cluster/mock/snapshot.go +++ b/state/cluster/mock/snapshot.go @@ -102,6 +102,36 @@ func (_m *Snapshot) Pending() ([]flow.Identifier, error) { return r0, r1 } +// QuorumCertificate provides a mock function with no fields +func (_m *Snapshot) QuorumCertificate() (*flow.QuorumCertificate, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for QuorumCertificate") + } + + var r0 *flow.QuorumCertificate + var r1 error + if rf, ok := ret.Get(0).(func() (*flow.QuorumCertificate, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() *flow.QuorumCertificate); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*flow.QuorumCertificate) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewSnapshot creates a new instance of Snapshot. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewSnapshot(t interface { diff --git a/state/cluster/snapshot.go b/state/cluster/snapshot.go index 32f5539f44e..2dc593cefb0 100644 --- a/state/cluster/snapshot.go +++ b/state/cluster/snapshot.go @@ -23,6 +23,13 @@ type Snapshot interface { // - If the snapshot is for an unknown collection [state.ErrUnknownSnapshotReference] Head() (*flow.Header, error) + // QuorumCertificate returns a valid quorum certificate for the header at this snapshot, if one exists. + // + // Expected error returns during normal operations: + // - [storage.ErrNotFound] is returned if the QC is unknown. + // - If the snapshot is for an unknown collection [state.ErrUnknownSnapshotReference] + QuorumCertificate() (*flow.QuorumCertificate, error) + // Pending returns the IDs of *all* collections descending from the snapshot's head collection. // The result is ordered such that parents are included before their children. While only valid // descendants will be returned, note that the descendants may not be finalized yet. diff --git a/storage/operation/qcs.go b/storage/operation/qcs.go index 00f1cabe7b8..2008e1839b8 100644 --- a/storage/operation/qcs.go +++ b/storage/operation/qcs.go @@ -23,7 +23,7 @@ import ( // inserting the QC after checking that no QC is already stored. // // Expected error returns: -// - [storage.ErrAlreadyExists] if any QuorumCertificate certifying the samn block already exists +// - [storage.ErrAlreadyExists] if any QuorumCertificate certifying the same block already exists func InsertQuorumCertificate(lctx lockctx.Proof, rw storage.ReaderBatchWriter, qc *flow.QuorumCertificate) error { if !lctx.HoldsLock(storage.LockInsertBlock) { return fmt.Errorf("cannot insert quorum certificate without holding lock %s", storage.LockInsertBlock)