Skip to content
Open
30 changes: 6 additions & 24 deletions engine/collection/synchronization/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,29 +186,6 @@ func (e *Engine) Done() <-chan struct{} {
return e.lm.Stopped()
}

// SubmitLocal submits an event originating on the local node.
func (e *Engine) SubmitLocal(event interface{}) {
err := e.ProcessLocal(event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
err := e.Process(channel, originID, event)
if err != nil {
e.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// ProcessLocal processes an event originating on the local node.
func (e *Engine) ProcessLocal(event interface{}) error {
return e.process(e.me.NodeID(), event)
}

// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
Expand Down Expand Up @@ -287,7 +264,12 @@ func (e *Engine) onSyncResponse(_ flow.Identifier, res *flow.SyncResponse) {
e.log.Error().Err(err).Msg("could not get last finalized header")
return
}
e.core.HandleHeight(final, res.Height)
// backwards compatibility - ignore the Header/QC if they are not present, and use Height field instead
if res.Header.Height == 0 {
e.core.HandleHeight(final, res.Height)
} else {
e.core.HandleHeight(final, res.Header.Height)
}
}

// onBlockResponse processes a slice of requested block proposals.
Expand Down
38 changes: 24 additions & 14 deletions engine/collection/synchronization/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

"github.com/onflow/flow-go/engine"
mockcollection "github.com/onflow/flow-go/engine/collection/mock"
clustermodel "github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -41,6 +40,7 @@ type SyncSuite struct {
myID flow.Identifier
participants flow.IdentityList
head *flow.Header
qc *flow.QuorumCertificate
heights map[uint64]*clustermodel.Proposal
blockIDs map[flow.Identifier]*clustermodel.Proposal
net *mocknetwork.EngineRegistry
Expand All @@ -64,6 +64,8 @@ func (ss *SyncSuite) SetupTest() {
// generate a header for the final state
header := unittest.BlockHeaderFixture()
ss.head = header
// generate a QC certifying the header
ss.qc = unittest.CertifyBlock(ss.head)

// create maps to enable block returns
ss.heights = make(map[uint64]*clustermodel.Proposal)
Expand Down Expand Up @@ -115,6 +117,12 @@ func (ss *SyncSuite) SetupTest() {
},
nil,
)
ss.snapshot.On("QuorumCertificate").Return(
func() *flow.QuorumCertificate {
return ss.qc
},
nil,
)
ss.snapshot.On("Identities", mock.Anything).Return(
func(selector flow.IdentityFilter[flow.Identity]) flow.IdentityList {
return ss.participants.Filter(selector)
Expand Down Expand Up @@ -187,8 +195,10 @@ func (ss *SyncSuite) TestOnSyncRequest() {
ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.SyncResponse)
assert.Equal(ss.T(), ss.head.Height, res.Height, "response should contain head height")
assert.Equal(ss.T(), *ss.head, res.Header, "response should contain header")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
assert.Equal(ss.T(), *ss.qc, res.CertifyingQC, "response should contain QC")
assert.Equal(ss.T(), res.Header.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header")
recipientID := args.Get(1).(flow.Identifier)
assert.Equal(ss.T(), originID, recipientID, "should send response to original sender")
},
Expand All @@ -204,13 +214,15 @@ func (ss *SyncSuite) TestOnSyncRequest() {
func (ss *SyncSuite) TestOnSyncResponse() {
// generate origin ID and response message
originID := unittest.IdentifierFixture()
header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(rand.Uint64()))
res := &flow.SyncResponse{
Nonce: rand.Uint64(),
Height: rand.Uint64(),
Nonce: rand.Uint64(),
Header: *header,
CertifyingQC: *unittest.CertifyBlock(header),
}

// the height should be handled
ss.core.On("HandleHeight", ss.head, res.Height)
ss.core.On("HandleHeight", ss.head, res.Header.Height)
ss.e.onSyncResponse(originID, res)
ss.core.AssertExpectations(ss.T())
}
Expand Down Expand Up @@ -559,11 +571,13 @@ func (ss *SyncSuite) TestProcessingMultipleItems() {

originID := unittest.IdentifierFixture()
for i := 0; i < 5; i++ {
header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i)))
msg := &flow.SyncResponse{
Nonce: uint64(i),
Height: uint64(1000 + i),
Nonce: uint64(i),
Header: *header,
CertifyingQC: *unittest.CertifyBlock(header),
}
ss.core.On("HandleHeight", mock.Anything, msg.Height).Once()
ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once()
ss.metrics.On("MessageSent", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once()
ss.metrics.On("MessageReceived", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once()
ss.metrics.On("MessageHandled", metrics.EngineClusterSynchronization, metrics.MessageSyncResponse).Once()
Expand Down Expand Up @@ -593,18 +607,14 @@ func (ss *SyncSuite) TestProcessingMultipleItems() {
ss.metrics.AssertExpectations(ss.T())
}

// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type
// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type
// was submitted from network layer.
func (ss *SyncSuite) TestProcessUnsupportedMessageType() {
invalidEvent := uint64(42)
engines := []netint.Engine{ss.e, ss.e.requestHandler}
engines := []netint.MessageProcessor{ss.e, ss.e.requestHandler}
for _, e := range engines {
err := e.Process("ch", unittest.IdentifierFixture(), invalidEvent)
// shouldn't result in error since byzantine inputs are expected
require.NoError(ss.T(), err)
// in case of local processing error cannot be consumed since all inputs are trusted
err = e.ProcessLocal(invalidEvent)
require.Error(ss.T(), err)
require.True(ss.T(), engine.IsIncompatibleInputTypeError(err))
}
}
38 changes: 12 additions & 26 deletions engine/collection/synchronization/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,6 @@ func NewRequestHandlerEngine(
return r
}

// SubmitLocal submits an event originating on the local node.
func (r *RequestHandlerEngine) SubmitLocal(event interface{}) {
err := r.ProcessLocal(event)
if err != nil {
r.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// Submit submits the given event from the node with the given origin ID
// for processing in a non-blocking manner. It returns instantly and logs
// a potential processing error internally when done.
func (r *RequestHandlerEngine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) {
err := r.Process(channel, originID, event)
if err != nil {
r.log.Fatal().Err(err).Msg("internal error processing event")
}
}

// ProcessLocal processes an event originating on the local node.
func (r *RequestHandlerEngine) ProcessLocal(event interface{}) error {
return r.process(r.me.NodeID(), event)
}

// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
func (r *RequestHandlerEngine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error {
Expand Down Expand Up @@ -171,11 +148,13 @@ func (r *RequestHandlerEngine) setupRequestMessageHandler() {
// inform the other node of it, so they can organize their block downloads. If
// we have a lower height, we add the difference to our own download queue.
func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow.SyncRequest) error {
final, err := r.state.Final().Head()
finalizedState := r.state.Final()
final, err := finalizedState.Head()
if err != nil {
return fmt.Errorf("could not get last finalized header: %w", err)
}

// TODO(8173) remove this step and only rely on certified sync responses?
// queue any missing heights as needed
r.core.HandleHeight(final, req.Height)

Expand All @@ -185,10 +164,17 @@ func (r *RequestHandlerEngine) onSyncRequest(originID flow.Identifier, req *flow
return nil
}

qc, err := finalizedState.QuorumCertificate()
if err != nil {
return fmt.Errorf("could not get QC for last finalized header: %w", err)
}

// if we're sufficiently ahead of the requester, send a response
res := &messages.SyncResponse{
Height: final.Height,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change, and require all the peers to upgrade, right?

I wonder if we can keep the Height field to be backward compatible, so that this change can be rolled out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it would be breaking; I think we could keep the Height field for now (with the Height field and current code path being used if and only if the Header/QC is empty)

Nonce: req.Nonce,
Nonce: req.Nonce,
Height: final.Height,
Header: *final,
CertifyingQC: *qc,
}
err = r.con.Unicast(res, originID)
if err != nil {
Expand Down
9 changes: 7 additions & 2 deletions engine/common/synchronization/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func New(
return nil, fmt.Errorf("could not register engine: %w", err)
}
e.con = con
e.requestHandler = NewRequestHandler(log, metrics, NewResponseSender(con), me, finalizedHeaderCache, blocks, core, true)
e.requestHandler = NewRequestHandler(log, metrics, NewResponseSender(con), me, state, finalizedHeaderCache, blocks, core, true)

// set up worker routines
builder := component.NewComponentManagerBuilder().
Expand Down Expand Up @@ -296,7 +296,12 @@ func (e *Engine) processAvailableResponses(ctx context.Context) {
func (e *Engine) onSyncResponse(originID flow.Identifier, res *flow.SyncResponse) {
e.log.Debug().Str("origin_id", originID.String()).Msg("received sync response")
final := e.finalizedHeaderCache.Get()
e.core.HandleHeight(final, res.Height)
// backwards compatibility - ignore the Header/QC if they are not present, and use Height field instead
if res.Header.Height == 0 {
Copy link
Member

@zhangchiqing zhangchiqing Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify two test cases:

  1. if an old flow.SyncResponse message is set with a height H, after it's been encoded and decoded to the new flow.SyncResponse message, the res.Height is H, and the res.Header.Height is 0
  2. if a new flow.SyncResponse message is set with a height H, after it's been encoded and decoded as the old flow.SyncResponse message, the res.Height is H.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately:

  1. passes - old response is decoded to the new response successfully (with all the values as expected)
  2. fails - the new response fails to decode as the old response, because the Header and CertifyingQC it receives are unknown fields, and we use a config to enable that error for network messages:
    // DefaultDecMode is the DecMode used for decoding messages over the network.
    // It returns an error if the message contains any extra field not present in the
    // target (struct we are unmarshalling into), which prevents some classes of resource exhaustion attacks.
    var DefaultDecMode, _ = cbor.DecOptions{ExtraReturnErrors: cbor.ExtraDecErrorUnknownField}.DecMode()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it means, new node can fetch blocks from old node, but old node can not fetch blocks from new node.

I think this is probably ok, and we can still roll it out. Because when rolling out, most of the old nodes are all up to date, they don't need to sync blocks from other peers, once an old node is restarted, it becomes new node, and can still fetch blocks from old nodes or other new nodes.

e.core.HandleHeight(final, res.Height)
} else {
e.core.HandleHeight(final, res.Header.Height)
}
}

// onBlockResponse processes a structurally validated block proposal containing a specifically requested block response.
Expand Down
9 changes: 9 additions & 0 deletions engine/common/synchronization/engine_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type SyncSuite struct {
myID flow.Identifier
participants flow.IdentityList
head *flow.Header
qc *flow.QuorumCertificate
heights map[uint64]*flow.Proposal
blockIDs map[flow.Identifier]*flow.Proposal
net *mocknetwork.EngineRegistry
Expand Down Expand Up @@ -63,6 +64,8 @@ func (ss *SyncSuite) SetupTest() {
// generate a header for the final state
header := unittest.BlockHeaderFixture()
ss.head = header
// generate a QC certifying the header
ss.qc = unittest.CertifyBlock(ss.head)

// create maps to enable block returns
ss.heights = make(map[uint64]*flow.Proposal)
Expand Down Expand Up @@ -113,6 +116,12 @@ func (ss *SyncSuite) SetupTest() {
},
nil,
)
ss.snapshot.On("QuorumCertificate").Return(
func() *flow.QuorumCertificate {
return ss.qc
},
nil,
)
ss.snapshot.On("Identities", mock.Anything).Return(
func(selector flow.IdentityFilter[flow.Identity]) flow.IdentityList {
return ss.participants.Filter(selector)
Expand Down
47 changes: 41 additions & 6 deletions engine/common/synchronization/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (ss *SyncSuite) TestOnSyncRequest_LowerThanReceiver_OutsideTolerance() {
ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.SyncResponse)
assert.Equal(ss.T(), ss.head.Height, res.Height, "response should contain head height")
assert.Equal(ss.T(), *ss.head, res.Header, "response should contain header")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
assert.Equal(ss.T(), *ss.qc, res.CertifyingQC, "response should contain QC")
assert.Equal(ss.T(), ss.head.ID(), res.CertifyingQC.BlockID, "response QC should correspond to response Header")
recipientID := args.Get(1).(flow.Identifier)
assert.Equal(ss.T(), originID, recipientID, "should send response to original sender")
},
Expand All @@ -104,20 +106,51 @@ func (ss *SyncSuite) TestOnSyncResponse() {

height, err := rand.Uint64()
require.NoError(ss.T(), err, "should generate height")
header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(height))

// generate origin ID and response message
originID := unittest.IdentifierFixture()
res := &flow.SyncResponse{
Nonce: nonce,
Height: height,
Nonce: nonce,
Header: *header,
CertifyingQC: *unittest.CertifyBlock(header),
}

// the height should be handled
ss.core.On("HandleHeight", ss.head, res.Header.Height)
ss.e.onSyncResponse(originID, res)

// Backwards Compatibility - message with only Height should also be handled
res = &flow.SyncResponse{
Nonce: nonce + 1,
Height: header.Height + 1,
}
ss.core.On("HandleHeight", ss.head, res.Height)
ss.e.onSyncResponse(originID, res)
ss.core.AssertExpectations(ss.T())
}

func (ss *SyncSuite) TestInvalidSyncResponse() {
ss.T().Skip() // TODO(8174) - implement this test
nonce, err := rand.Uint64()
require.NoError(ss.T(), err, "should generate nonce")

height, err := rand.Uint64()
require.NoError(ss.T(), err, "should generate height")
header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(height))

// generate origin ID and response message
originID := unittest.IdentifierFixture()
res := &flow.SyncResponse{
Nonce: nonce,
Header: *header,
CertifyingQC: flow.QuorumCertificate{},
}
// TODO(8174): the response should be rejected and/or a violation should be logged
ss.e.onSyncResponse(originID, res)
ss.core.AssertExpectations(ss.T())
}

func (ss *SyncSuite) TestOnRangeRequest() {
nonce, err := rand.Uint64()
require.NoError(ss.T(), err, "should generate nonce")
Expand Down Expand Up @@ -466,11 +499,13 @@ func (ss *SyncSuite) TestProcessingMultipleItems() {

originID := unittest.IdentifierFixture()
for i := 0; i < 5; i++ {
header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(1000 + i)))
msg := &flow.SyncResponse{
Nonce: uint64(i),
Height: uint64(1000 + i),
Nonce: uint64(i),
Header: *header,
CertifyingQC: *unittest.CertifyBlock(header),
}
ss.core.On("HandleHeight", mock.Anything, msg.Height).Once()
ss.core.On("HandleHeight", mock.Anything, msg.Header.Height).Once()
ss.metrics.On("MessageSent", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once()
ss.metrics.On("MessageHandled", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once()
ss.metrics.On("MessageReceived", metrics.EngineSynchronization, metrics.MessageSyncResponse).Once()
Expand Down
Loading
Loading