From 12ee2bd040b35c4d745b2cd2cc27e8564d600ba3 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 12 Feb 2026 08:42:58 +0100 Subject: [PATCH 1/3] Re-use p2p-client --- block/components_test.go | 4 +- node/failover.go | 59 ++++++++-------- node/full.go | 27 ++++++-- node/helpers_test.go | 51 +++++++++----- node/light.go | 13 +--- node/light_test.go | 6 +- node/node.go | 15 ++-- node/single_sequencer_integration_test.go | 12 ++-- pkg/cmd/run_node.go | 10 ++- pkg/cmd/run_node_test.go | 7 +- pkg/p2p/client.go | 38 +++++++--- pkg/p2p/utils_test.go | 6 +- pkg/store/header_store_adapter_test.go | 84 +++++++++++++++++++++++ pkg/store/store_adapter.go | 29 ++++++-- pkg/sync/sync_service.go | 18 +++-- test/e2e/failover_e2e_test.go | 6 ++ 16 files changed, 280 insertions(+), 105 deletions(-) diff --git a/block/components_test.go b/block/components_test.go index 3d1a1f4a1e..8deca85e8e 100644 --- a/block/components_test.go +++ b/block/components_test.go @@ -176,7 +176,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) { zerolog.Nop(), NopMetrics(), DefaultBlockOptions(), - nil, + nil, // raftNode ) require.NoError(t, err) @@ -260,7 +260,7 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) { zerolog.Nop(), NopMetrics(), DefaultBlockOptions(), - nil, + nil, // raftNode ) require.NoError(t, err) diff --git a/node/failover.go b/node/failover.go index 787f627ce6..0dfcb54179 100644 --- a/node/failover.go +++ b/node/failover.go @@ -14,13 +14,11 @@ import ( "github.com/evstack/ev-node/pkg/config" genesispkg "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/p2p" - "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/raft" rpcserver "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" evsync "github.com/evstack/ev-node/pkg/sync" - ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -38,10 +36,7 @@ type failoverState struct { func newSyncMode( nodeConfig config.Config, - nodeKey *key.NodeKey, genesis genesispkg.Genesis, - rootDB ds.Batching, - daStore store.Store, exec coreexecutor.Executor, da block.DAClient, logger zerolog.Logger, @@ -49,6 +44,7 @@ func newSyncMode( blockMetrics *block.Metrics, nodeOpts NodeOptions, raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) { return block.NewSyncComponents( @@ -67,16 +63,13 @@ func newSyncMode( raftNode, ) } - return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode) + return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient) } func newAggregatorMode( nodeConfig config.Config, - nodeKey *key.NodeKey, signer signer.Signer, genesis genesispkg.Genesis, - rootDB ds.Batching, - daStore store.Store, exec coreexecutor.Executor, sequencer coresequencer.Sequencer, da block.DAClient, @@ -85,8 +78,8 @@ func newAggregatorMode( blockMetrics *block.Metrics, nodeOpts NodeOptions, raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { - blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) { return block.NewAggregatorComponents( nodeConfig, @@ -105,31 +98,24 @@ func newAggregatorMode( ) } - return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode) + return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient) } func setupFailoverState( nodeConfig config.Config, - nodeKey *key.NodeKey, - rootDB ds.Batching, - daStore store.Store, genesis genesispkg.Genesis, logger zerolog.Logger, rktStore store.Store, buildComponentsFn func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error), raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { - p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, rootDB, genesis.ChainID, logger, nil) - if err != nil { - return nil, err - } - - headerSyncService, err := evsync.NewHeaderSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger()) + headerSyncService, err := evsync.NewHeaderSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger()) if err != nil { return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } - dataSyncService, err := evsync.NewDataSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger()) + dataSyncService, err := evsync.NewDataSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger()) if err != nil { return nil, fmt.Errorf("error while initializing DataSyncService: %w", err) } @@ -203,19 +189,30 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { return nil }) - if err := f.p2pClient.Start(ctx); err != nil { - return fmt.Errorf("start p2p: %w", err) - } - defer f.p2pClient.Close() // nolint: errcheck + // P2P client persists across mode switches (started/closed by FullNode.Run). + // Reconfigure() was already called in setupFailoverState to re-bootstrap DHT. - if err := f.headerSyncService.Start(ctx); err != nil { - return fmt.Errorf("error while starting header sync service: %w", err) + // Start header and data sync services concurrently. Each service's + // initFromP2PWithRetry can block up to 30s when peers have no blocks + // (e.g. lazy mode sequencer at height 0). Running them in parallel + // avoids a 60s cumulative startup delay. + syncWg, syncCtx := errgroup.WithContext(ctx) + syncWg.Go(func() error { + if err := f.headerSyncService.Start(syncCtx); err != nil { + return fmt.Errorf("error while starting header sync service: %w", err) + } + return nil + }) + syncWg.Go(func() error { + if err := f.dataSyncService.Start(syncCtx); err != nil { + return fmt.Errorf("error while starting data sync service: %w", err) + } + return nil + }) + if err := syncWg.Wait(); err != nil { + return err } defer stopService(f.headerSyncService.Stop, "header sync") - - if err := f.dataSyncService.Start(ctx); err != nil { - return fmt.Errorf("error while starting data sync service: %w", err) - } defer stopService(f.dataSyncService.Stop, "data sync") wg.Go(func() error { diff --git a/node/full.go b/node/full.go index 4fa2ff7c52..42e4b6349f 100644 --- a/node/full.go +++ b/node/full.go @@ -22,7 +22,7 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" genesispkg "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" raftpkg "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/service" "github.com/evstack/ev-node/pkg/signer" @@ -53,7 +53,8 @@ type FullNode struct { nodeConfig config.Config - daClient block.DAClient + daClient block.DAClient + p2pClient *p2p.Client Store store.Store raftNode *raftpkg.Node @@ -66,7 +67,7 @@ type FullNode struct { // newFullNode creates a new Rollkit full node. func newFullNode( nodeConfig config.Config, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, signer signer.Signer, genesis genesispkg.Genesis, database ds.Batching, @@ -103,16 +104,17 @@ func newFullNode( } } + // The p2p client is fully configured and started before leader election. + // SyncService.getPeerIDs() gates peer usage on conf.Node.Aggregator. leaderFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting aggregator-MODE") nodeConfig.Node.Aggregator = true - nodeConfig.P2P.Peers = "" // peers are not supported in aggregator mode - return newAggregatorMode(nodeConfig, nodeKey, signer, genesis, database, evstore, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode) + return newAggregatorMode(nodeConfig, signer, genesis, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } followerFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting sync-MODE") nodeConfig.Node.Aggregator = false - return newSyncMode(nodeConfig, nodeKey, genesis, database, evstore, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode) + return newSyncMode(nodeConfig, genesis, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } // Initialize raft node if enabled (for both aggregator and sync nodes) @@ -136,6 +138,7 @@ func newFullNode( genesis: genesis, nodeConfig: nodeConfig, daClient: daClient, + p2pClient: p2pClient, Store: evstore, leaderElection: leaderElection, raftNode: raftNode, @@ -279,6 +282,18 @@ func (n *FullNode) Run(parentCtx context.Context) error { (n.nodeConfig.Instrumentation.IsPrometheusEnabled() || n.nodeConfig.Instrumentation.IsPprofEnabled()) { n.prometheusSrv, n.pprofSrv = n.startInstrumentationServer() } + + // Start the P2P client once. It persists across mode switches so that + // the host and PubSub (including externally registered topics) survive. + if err := n.p2pClient.Start(ctx); err != nil { + return fmt.Errorf("start p2p: %w", err) + } + defer func() { + if err := n.p2pClient.Close(); err != nil { + n.Logger.Error().Err(err).Msg("error closing p2p client") + } + }() + // Start leader election if n.raftNode != nil { if err := n.raftNode.Start(ctx); err != nil { diff --git a/node/helpers_test.go b/node/helpers_test.go index 94989d2f7b..2127ef9aaf 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -16,12 +16,13 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/test/testda" "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" "github.com/stretchr/testify/require" evconfig "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" remote_signer "github.com/evstack/ev-node/pkg/signer/noop" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" @@ -67,22 +68,18 @@ func newDummyDAClient(maxBlobSize uint64) *testda.DummyDA { return getSharedDummyDA(maxBlobSize) } -func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, *key.NodeKey, datastore.Batching, func()) { +func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, crypto.PrivKey, datastore.Batching, func()) { executor := coreexecutor.NewDummyExecutor() sequencer := coresequencer.NewDummySequencer() daClient := newDummyDAClient(0) // Create genesis and keys for P2P client _, genesisValidatorKey, _ := types.GetGenesisWithPrivkey("test-chain") - p2pKey := &key.NodeKey{ - PrivKey: genesisValidatorKey, - PubKey: genesisValidatorKey.GetPublic(), - } ds, err := store.NewTestInMemoryKVStore() require.NoError(t, err) stop := daClient.StartHeightTicker(config.DA.BlockTime.Duration) - return executor, sequencer, daClient, p2pKey, ds, stop + return executor, sequencer, daClient, genesisValidatorKey, ds, stop } func getTestConfig(t *testing.T, n int) evconfig.Config { @@ -120,7 +117,7 @@ func newTestNode( executor coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, - nodeKey *key.NodeKey, + privKey crypto.PrivKey, ds datastore.Batching, stopDAHeightTicker func(), ) (*FullNode, func()) { @@ -133,13 +130,17 @@ func newTestNode( if testing.Verbose() { logger = zerolog.New(zerolog.NewTestWriter(t)) } + + p2pClient, err := newTestP2PClient(config, privKey, ds, genesis.ChainID, logger) + require.NoError(t, err) + node, err := NewNode( config, executor, sequencer, daClient, remoteSigner, - nodeKey, + p2pClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -159,8 +160,8 @@ func newTestNode( func createNodeWithCleanup(t *testing.T, config evconfig.Config) (*FullNode, func()) { resetSharedDummyDA() - executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker := createTestComponents(t, config) - return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker) + executor, sequencer, daClient, privKey, ds, stopDAHeightTicker := createTestComponents(t, config) + return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker) } func createNodeWithCustomComponents( @@ -169,11 +170,11 @@ func createNodeWithCustomComponents( executor coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, - nodeKey *key.NodeKey, + privKey crypto.PrivKey, ds datastore.Batching, stopDAHeightTicker func(), ) (*FullNode, func()) { - return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker) + return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker) } // Creates the given number of nodes the given nodes using the given wait group to synchronize them @@ -192,24 +193,28 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F aggListenAddress := config.P2P.ListenAddress aggPeers := config.P2P.Peers - executor, sequencer, daClient, aggP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, daClient, aggPrivKey, ds, stopDAHeightTicker := createTestComponents(t, config) if d, ok := daClient.(*testda.DummyDA); ok { d.Reset() } - aggPeerID, err := peer.IDFromPrivateKey(aggP2PKey.PrivKey) + aggPeerID, err := peer.IDFromPrivateKey(aggPrivKey) require.NoError(err) logger := zerolog.Nop() if testing.Verbose() { logger = zerolog.New(zerolog.NewTestWriter(t)) } + + aggP2PClient, err := newTestP2PClient(config, aggPrivKey, ds, genesis.ChainID, logger) + require.NoError(err) + aggNode, err := NewNode( config, executor, sequencer, daClient, remoteSigner, - aggP2PKey, + aggP2PClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -236,16 +241,19 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F } config.P2P.ListenAddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 40001+i) config.RPC.Address = fmt.Sprintf("127.0.0.1:%d", 8001+i) - executor, sequencer, daClient, nodeP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, daClient, nodePrivKey, ds, stopDAHeightTicker := createTestComponents(t, config) stopDAHeightTicker() + nodeP2PClient, err := newTestP2PClient(config, nodePrivKey, ds, genesis.ChainID, logger) + require.NoError(err) + node, err := NewNode( config, executor, sequencer, daClient, nil, - nodeP2PKey, + nodeP2PClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -257,7 +265,7 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F // No-op: ticker already stopped } nodes[i], cleanups[i] = node.(*FullNode), cleanup - nodePeerID, err := peer.IDFromPrivateKey(nodeP2PKey.PrivKey) + nodePeerID, err := peer.IDFromPrivateKey(nodePrivKey) require.NoError(err) peersList = append(peersList, fmt.Sprintf("%s/p2p/%s", config.P2P.ListenAddress, nodePeerID.Loggable()["peerID"].(string))) } @@ -265,6 +273,11 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F return nodes, cleanups } +// newTestP2PClient creates a p2p.Client for testing. +func newTestP2PClient(config evconfig.Config, privKey crypto.PrivKey, ds datastore.Batching, chainID string, logger zerolog.Logger) (*p2p.Client, error) { + return p2p.NewClient(config.P2P, privKey, ds, chainID, logger, nil) +} + // Helper to create N contexts and cancel functions func createNodeContexts(n int) ([]context.Context, []context.CancelFunc) { ctxs := make([]context.Context, n) diff --git a/node/light.go b/node/light.go index 8790507a07..7aeb9038b6 100644 --- a/node/light.go +++ b/node/light.go @@ -7,7 +7,6 @@ import ( "net/http" "time" - "github.com/evstack/ev-node/pkg/p2p/key" ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" @@ -39,15 +38,10 @@ type LightNode struct { func newLightNode( conf config.Config, genesis genesis.Genesis, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, database ds.Batching, logger zerolog.Logger, ) (ln *LightNode, err error) { - p2pClient, err := p2p.NewClient(conf.P2P, nodeKey.PrivKey, database, genesis.ChainID, logger, nil) - if err != nil { - return nil, err - } - componentLogger := logger.With().Str("component", "HeaderSyncService").Logger() baseStore := store.New(database) @@ -166,9 +160,8 @@ func (ln *LightNode) Run(parentCtx context.Context) error { } // Stop P2P Client - err = ln.P2P.Close() - if err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("closing P2P client: %w", err)) + if err := ln.P2P.Close(); err != nil { + multiErr = errors.Join(multiErr, fmt.Errorf("closing p2p client: %w", err)) } if err = ln.Store.Close(); err != nil { diff --git a/node/light_test.go b/node/light_test.go index f459ad54b0..964cf66041 100644 --- a/node/light_test.go +++ b/node/light_test.go @@ -13,6 +13,7 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p" p2p_key "github.com/evstack/ev-node/pkg/p2p/key" ) @@ -39,7 +40,10 @@ func TestLightNodeLifecycle(t *testing.T) { logger := zerolog.Nop() db := ds_sync.MutexWrap(ds.NewMapDatastore()) - ln, err := newLightNode(conf, gen, p2pKey, db, logger) + p2pClient, err := p2p.NewClient(conf.P2P, p2pKey.PrivKey, db, gen.ChainID, logger, nil) + require.NoError(err) + + ln, err := newLightNode(conf, gen, p2pClient, db, logger) require.NoError(err) require.NotNil(ln) diff --git a/node/node.go b/node/node.go index 7ce087eeeb..139636d644 100644 --- a/node/node.go +++ b/node/node.go @@ -9,7 +9,7 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/service" "github.com/evstack/ev-node/pkg/signer" ) @@ -25,16 +25,19 @@ type NodeOptions struct { BlockOptions block.BlockOptions } -// NewNode returns a new Full or Light Node based on the config -// This is the entry point for composing a node, when compiling a node, you need to provide an executor +// NewNode returns a new Full or Light Node based on the config. +// This is the entry point for composing a node, when compiling a node, you need to provide an executor. // Example executors can be found in apps/ +// +// The p2pClient owns the node identity (private key) and is shared across +// mode switches. It supports in-place reconfiguration via Reconfigure(). func NewNode( conf config.Config, exec coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, signer signer.Signer, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, genesis genesis.Genesis, database ds.Batching, metricsProvider MetricsProvider, @@ -42,7 +45,7 @@ func NewNode( nodeOptions NodeOptions, ) (Node, error) { if conf.Node.Light { - return newLightNode(conf, genesis, nodeKey, database, logger) + return newLightNode(conf, genesis, p2pClient, database, logger) } if err := nodeOptions.BlockOptions.Validate(); err != nil { @@ -51,7 +54,7 @@ func NewNode( return newFullNode( conf, - nodeKey, + p2pClient, signer, genesis, database, diff --git a/node/single_sequencer_integration_test.go b/node/single_sequencer_integration_test.go index 0a798afa6c..130b62e235 100644 --- a/node/single_sequencer_integration_test.go +++ b/node/single_sequencer_integration_test.go @@ -234,10 +234,10 @@ func TestStateRecovery(t *testing.T) { // Set up one sequencer config := getTestConfig(t, 1) - executor, sequencer, dac, nodeKey, _, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, dac, privKey, _, stopDAHeightTicker := createTestComponents(t, config) ds, err := store.NewDefaultKVStore(config.RootDir, "db", "test") require.NoError(err) - node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dac, nodeKey, ds, stopDAHeightTicker) + node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dac, privKey, ds, stopDAHeightTicker) defer cleanup() var runningWg sync.WaitGroup @@ -262,10 +262,10 @@ func TestStateRecovery(t *testing.T) { shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 60*time.Second) // Create a new node instance using the same components - executor, sequencer, dac, nodeKey, _, stopDAHeightTicker = createTestComponents(t, config) + executor, sequencer, dac, privKey, _, stopDAHeightTicker = createTestComponents(t, config) ds, err = store.NewDefaultKVStore(config.RootDir, "db", "test") require.NoError(err) - node, cleanup = createNodeWithCustomComponents(t, config, executor, sequencer, dac, nodeKey, ds, stopDAHeightTicker) + node, cleanup = createNodeWithCustomComponents(t, config, executor, sequencer, dac, privKey, ds, stopDAHeightTicker) defer cleanup() // Verify state persistence @@ -319,7 +319,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { config.DA.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} // Longer DA time to ensure blocks are produced first // Create test components - executor, sequencer, dummyDA, ds, nodeKey, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, dummyDA, privKey, ds, stopDAHeightTicker := createTestComponents(t, config) defer stopDAHeightTicker() // Cast executor to DummyExecutor so we can inject transactions @@ -331,7 +331,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { require.True(ok, "Expected testda.DummyDA implementation") // Create node with components - node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dummyDAImpl, ds, nodeKey, func() {}) + node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dummyDAImpl, privKey, ds, func() {}) defer cleanup() ctx, cancel := context.WithCancel(t.Context()) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index aa6a01b7ae..df96a0e371 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -23,6 +23,7 @@ import ( rollconf "github.com/evstack/ev-node/pkg/config" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" genesispkg "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/file" @@ -163,6 +164,13 @@ func StartNode( executor = telemetry.WithTracingExecutor(executor) } + // Create the P2P client. It is long-lived and reconfigured in-place + // on mode switches, avoiding costly teardown of the libp2p stack. + p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, datastore, genesis.ChainID, logger, nil) + if err != nil { + return fmt.Errorf("create p2p client: %w", err) + } + // Create and start the node rollnode, err := node.NewNode( nodeConfig, @@ -170,7 +178,7 @@ func StartNode( sequencer, daClient, signer, - nodeKey, + p2pClient, genesis, datastore, metrics, diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index 2e58c1c637..39ff4261b6 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -20,6 +20,7 @@ import ( "github.com/rs/zerolog" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executor, coresequencer.Sequencer, signer.Signer, *key.NodeKey, datastore.Batching, func()) { @@ -33,7 +34,11 @@ func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executo // Create a dummy P2P client and datastore for testing ds := datastore.NewMapDatastore() - return executor, sequencer, keyProvider, nil, ds, func() {} + // Generate a dummy node key for the P2P client + nodeKey, err := key.GenerateNodeKey() + require.NoError(t, err) + + return executor, sequencer, keyProvider, nodeKey, ds, func() {} } func TestParseFlags(t *testing.T) { diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index a1c8de94ba..7859c3cf97 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -49,11 +49,13 @@ type Client struct { chainID string privKey crypto.PrivKey - host host.Host - dht *dht.IpfsDHT - disc *discovery.RoutingDiscovery - gater *conngater.BasicConnectionGater - ps *pubsub.PubSub + rawHost host.Host // unwrapped libp2p host, stored to avoid double-wrapping via routedhost + host host.Host // may be wrapped with routedhost after DHT setup + dht *dht.IpfsDHT + disc *discovery.RoutingDiscovery + gater *conngater.BasicConnectionGater + ps *pubsub.PubSub + started bool metrics *Metrics } @@ -121,9 +123,13 @@ func NewClientWithHost( // 3. Setup DHT, establish connection to seed nodes and initialize peer discovery. // 4. Use active peer discovery to look for peers from same ORU network. func (c *Client) Start(ctx context.Context) error { + if c.started { + return nil // already started — called from FullNode.Run() + } c.logger.Debug().Msg("starting P2P client") if c.host != nil { + c.rawHost = c.host return c.startWithHost(ctx, c.host) } @@ -131,6 +137,7 @@ func (c *Client) Start(ctx context.Context) error { if err != nil { return err } + c.rawHost = h return c.startWithHost(ctx, h) } @@ -165,19 +172,32 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { return err } + c.started = true return nil } // Close gently stops Client. func (c *Client) Close() error { - var dhtErr, hostErr error + var err error if c.dht != nil { - dhtErr = c.dht.Close() + err = errors.Join(err, c.dht.Close()) } if c.host != nil { - hostErr = c.host.Close() + err = errors.Join(err, c.host.Close()) } - return errors.Join(dhtErr, hostErr) + return err +} + +// PrivKey returns the node's private key. +func (c *Client) PrivKey() crypto.PrivKey { + return c.privKey +} + +// Reconfigure updates the mutable P2P configuration without tearing down +// the libp2p host, PubSub, or DHT. Currently this only updates the +// stored config; the sync service gates peer usage on conf.Node.Aggregator. +func (c *Client) Reconfigure(conf config.P2PConfig) { + c.conf = conf } // Addrs returns listen addresses of Client. diff --git a/pkg/p2p/utils_test.go b/pkg/p2p/utils_test.go index 5bd5664dd2..e14a621233 100644 --- a/pkg/p2p/utils_test.go +++ b/pkg/p2p/utils_test.go @@ -3,7 +3,6 @@ package p2p import ( "context" "crypto/rand" - "errors" "fmt" "net" "path/filepath" @@ -25,11 +24,10 @@ import ( type testNet []*Client -func (tn testNet) Close() (err error) { +func (tn testNet) Close() { for i := range tn { - err = errors.Join(err, tn[i].Close()) + _ = tn[i].Close() } - return } func (tn testNet) WaitForDHT() { diff --git a/pkg/store/header_store_adapter_test.go b/pkg/store/header_store_adapter_test.go index 33f804c676..9cb02c809e 100644 --- a/pkg/store/header_store_adapter_test.go +++ b/pkg/store/header_store_adapter_test.go @@ -615,3 +615,87 @@ func TestHeaderStoreAdapter_GetFromPendingByHash(t *testing.T) { require.NoError(t, err) assert.Equal(t, h1.Height(), retrieved.Height()) } + +// TestHeaderStoreGetter_HeightGuard verifies that HeaderStoreGetter.GetByHeight +// and HasAt respect the committed store height. Data written to the datastore +// without updating store.Height() (like the executor's crash-recovery early save) +// must NOT be visible through the getter. +func TestHeaderStoreGetter_HeightGuard(t *testing.T) { + t.Parallel() + ctx := t.Context() + + ds, err := NewTestInMemoryKVStore() + require.NoError(t, err) + store := New(ds) + getter := NewHeaderStoreGetter(store) + + h1, d1 := types.GetRandomBlock(1, 2, "test-chain") + h2, d2 := types.GetRandomBlock(2, 2, "test-chain") + + specs := map[string]struct { + setup func() + height uint64 + expFound bool + expHasAt bool + }{ + "data at height without height update is invisible": { + setup: func() { + // Simulate the executor's early save: write data but do NOT call SetHeight. + batch, bErr := store.NewBatch(ctx) + require.NoError(t, bErr) + require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) + require.NoError(t, batch.Commit()) + }, + height: 1, + expFound: false, + expHasAt: false, + }, + "data becomes visible after height is updated": { + setup: func() { + // Now commit the signed version with SetHeight (the final save). + batch, bErr := store.NewBatch(ctx) + require.NoError(t, bErr) + require.NoError(t, batch.SaveBlockData(h1, d1, &h1.Signature)) + require.NoError(t, batch.SetHeight(1)) + require.NoError(t, batch.Commit()) + }, + height: 1, + expFound: true, + expHasAt: true, + }, + "height above committed store height is invisible": { + setup: func() { + // Save h2 data but only set height to 1. + batch, bErr := store.NewBatch(ctx) + require.NoError(t, bErr) + require.NoError(t, batch.SaveBlockData(h2, d2, &types.Signature{})) + require.NoError(t, batch.Commit()) + }, + height: 2, + expFound: false, + expHasAt: false, + }, + } + + // Run in defined order since each step builds on the previous state. + for _, name := range []string{ + "data at height without height update is invisible", + "data becomes visible after height is updated", + "height above committed store height is invisible", + } { + spec := specs[name] + t.Run(name, func(t *testing.T) { + spec.setup() + + got, err := getter.GetByHeight(ctx, spec.height) + if spec.expFound { + require.NoError(t, err) + assert.Equal(t, spec.height, got.Height()) + } else { + require.Error(t, err) + } + + assert.Equal(t, spec.expHasAt, getter.HasAt(ctx, spec.height)) + }) + } +} diff --git a/pkg/store/store_adapter.go b/pkg/store/store_adapter.go index 5b21ce1613..912d49c2d9 100644 --- a/pkg/store/store_adapter.go +++ b/pkg/store/store_adapter.go @@ -758,7 +758,15 @@ func NewHeaderStoreGetter(store Store) *HeaderStoreGetter { // GetByHeight implements StoreGetter. func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PSignedHeader, error) { - header, err := g.store.GetHeader(ctx, height) + // Guard: only return headers at or below the committed store height. + // The executor's early save writes an unsigned header to the datastore + // before updating store.Height(), so without this check the P2P layer + // could serve an unsigned header to peers. + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return nil, header.ErrNotFound + } + hdr, err := g.store.GetHeader(ctx, height) if err != nil { return nil, err } @@ -766,7 +774,7 @@ func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*ty daHint, _ := g.GetDAHint(ctx, height) return &types.P2PSignedHeader{ - SignedHeader: header, + SignedHeader: hdr, DAHeightHint: daHint, }, nil } @@ -812,7 +820,11 @@ func (g *HeaderStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *HeaderStoreGetter) HasAt(ctx context.Context, height uint64) bool { - _, err := g.store.GetHeader(ctx, height) + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return false + } + _, err = g.store.GetHeader(ctx, height) return err == nil } @@ -828,6 +840,11 @@ func NewDataStoreGetter(store Store) *DataStoreGetter { // GetByHeight implements StoreGetter. func (g *DataStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PData, error) { + // Guard: only return data at or below the committed store height. + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return nil, header.ErrNotFound + } _, data, err := g.store.GetBlockData(ctx, height) if err != nil { return nil, err @@ -882,7 +899,11 @@ func (g *DataStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *DataStoreGetter) HasAt(ctx context.Context, height uint64) bool { - _, _, err := g.store.GetBlockData(ctx, height) + storeHeight, err := g.store.Height(ctx) + if err != nil || height > storeHeight { + return false + } + _, _, err = g.store.GetBlockData(ctx, height) return err == nil } diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 8567e79764..b65b855a4b 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -20,7 +20,6 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) @@ -38,6 +37,15 @@ type HeaderSyncService = SyncService[*types.P2PSignedHeader] // DataSyncService is the P2P Sync Service for blocks. type DataSyncService = SyncService[*types.P2PData] +// P2PClient defines the interface for P2P client operations needed by the sync service. +type P2PClient interface { + PubSub() *pubsub.PubSub + Info() (string, string, string, error) + Host() host.Host + ConnectionGater() *conngater.BasicConnectionGater + PeerIDs() []peer.ID +} + // SyncService is the P2P Sync Service for blocks and headers. // // Uses the go-header library for handling all P2P logic. @@ -48,7 +56,7 @@ type SyncService[H store.EntityWithDAHint[H]] struct { genesis genesis.Genesis - p2p *p2p.Client + p2p P2PClient ex *goheaderp2p.Exchange[H] sub *goheaderp2p.Subscriber[H] @@ -66,7 +74,7 @@ func NewDataSyncService( evStore store.Store, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*DataSyncService, error) { storeAdapter := store.NewDataStoreAdapter(evStore, genesis) @@ -78,7 +86,7 @@ func NewHeaderSyncService( evStore store.Store, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*HeaderSyncService, error) { storeAdapter := store.NewHeaderStoreAdapter(evStore, genesis) @@ -90,7 +98,7 @@ func newSyncService[H store.EntityWithDAHint[H]]( syncType syncType, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*SyncService[H], error) { if p2p == nil { diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 580f3141ce..cc0276e43a 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -384,6 +384,12 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { nodeJWT := getNodeJWT(nodeName) p2pPeers := getP2PPeers(nodeName) + // Kill old process just in case + if nodeDetails.IsRunning() { + _ = nodeDetails.Kill() + time.Sleep(200 * time.Millisecond) + } + restartedProc := setupRaftSequencerNode(t, sut, workDir, nodeName, nodeDetails.raftAddr, nodeJWT, genesisHash, testEndpoints.GetDAAddress(), "", raftCluster, p2pPeers, strings.TrimPrefix(nodeDetails.rpcAddr, "http://"), nodeDetails.p2pAddr, From c6cdb8acdf02d84e849ff74c16f3a54146204ffa Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 12 Feb 2026 15:00:55 +0100 Subject: [PATCH 2/3] x --- block/internal/syncing/syncer.go | 15 ++++- pkg/store/header_store_adapter_test.go | 84 -------------------------- pkg/store/store_adapter.go | 29 ++------- test/e2e/evm_full_node_e2e_test.go | 2 +- test/e2e/evm_test_common.go | 2 + 5 files changed, 20 insertions(+), 112 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 939abb0389..1548b07a55 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -722,7 +722,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve currentState := s.getLastState() headerHash := header.Hash().String() - s.logger.Info().Uint64("height", nextHeight).Msg("syncing block") + s.logger.Info().Uint64("height", nextHeight).Msg("syncing block started") // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. @@ -732,6 +732,17 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve s.cache.RemoveHeaderDAIncluded(headerHash) s.cache.RemoveDataDAIncluded(data.DACommitment().String()) + s.logger.Warn(). + Err(err). + Uint64("height", header.Height()). + Uint64("time", uint64(header.Time().Unix())). + Hex("proposer", header.ProposerAddress). + Str("data_hash", hex.EncodeToString(header.DataHash)). + Str("app_hash", hex.EncodeToString(header.AppHash)). + Hex("last_header_hash", header.LastHeaderHash). + Int("len signature", len(header.Signature)). + Msg("block validation failed") + if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) } @@ -799,7 +810,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve if s.p2pHandler != nil { s.p2pHandler.SetProcessedHeight(newState.LastBlockHeight) } - + s.logger.Info().Uint64("height", nextHeight).Msg("syncing block completed") return nil } diff --git a/pkg/store/header_store_adapter_test.go b/pkg/store/header_store_adapter_test.go index 9cb02c809e..33f804c676 100644 --- a/pkg/store/header_store_adapter_test.go +++ b/pkg/store/header_store_adapter_test.go @@ -615,87 +615,3 @@ func TestHeaderStoreAdapter_GetFromPendingByHash(t *testing.T) { require.NoError(t, err) assert.Equal(t, h1.Height(), retrieved.Height()) } - -// TestHeaderStoreGetter_HeightGuard verifies that HeaderStoreGetter.GetByHeight -// and HasAt respect the committed store height. Data written to the datastore -// without updating store.Height() (like the executor's crash-recovery early save) -// must NOT be visible through the getter. -func TestHeaderStoreGetter_HeightGuard(t *testing.T) { - t.Parallel() - ctx := t.Context() - - ds, err := NewTestInMemoryKVStore() - require.NoError(t, err) - store := New(ds) - getter := NewHeaderStoreGetter(store) - - h1, d1 := types.GetRandomBlock(1, 2, "test-chain") - h2, d2 := types.GetRandomBlock(2, 2, "test-chain") - - specs := map[string]struct { - setup func() - height uint64 - expFound bool - expHasAt bool - }{ - "data at height without height update is invisible": { - setup: func() { - // Simulate the executor's early save: write data but do NOT call SetHeight. - batch, bErr := store.NewBatch(ctx) - require.NoError(t, bErr) - require.NoError(t, batch.SaveBlockData(h1, d1, &types.Signature{})) - require.NoError(t, batch.Commit()) - }, - height: 1, - expFound: false, - expHasAt: false, - }, - "data becomes visible after height is updated": { - setup: func() { - // Now commit the signed version with SetHeight (the final save). - batch, bErr := store.NewBatch(ctx) - require.NoError(t, bErr) - require.NoError(t, batch.SaveBlockData(h1, d1, &h1.Signature)) - require.NoError(t, batch.SetHeight(1)) - require.NoError(t, batch.Commit()) - }, - height: 1, - expFound: true, - expHasAt: true, - }, - "height above committed store height is invisible": { - setup: func() { - // Save h2 data but only set height to 1. - batch, bErr := store.NewBatch(ctx) - require.NoError(t, bErr) - require.NoError(t, batch.SaveBlockData(h2, d2, &types.Signature{})) - require.NoError(t, batch.Commit()) - }, - height: 2, - expFound: false, - expHasAt: false, - }, - } - - // Run in defined order since each step builds on the previous state. - for _, name := range []string{ - "data at height without height update is invisible", - "data becomes visible after height is updated", - "height above committed store height is invisible", - } { - spec := specs[name] - t.Run(name, func(t *testing.T) { - spec.setup() - - got, err := getter.GetByHeight(ctx, spec.height) - if spec.expFound { - require.NoError(t, err) - assert.Equal(t, spec.height, got.Height()) - } else { - require.Error(t, err) - } - - assert.Equal(t, spec.expHasAt, getter.HasAt(ctx, spec.height)) - }) - } -} diff --git a/pkg/store/store_adapter.go b/pkg/store/store_adapter.go index 912d49c2d9..5b21ce1613 100644 --- a/pkg/store/store_adapter.go +++ b/pkg/store/store_adapter.go @@ -758,15 +758,7 @@ func NewHeaderStoreGetter(store Store) *HeaderStoreGetter { // GetByHeight implements StoreGetter. func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PSignedHeader, error) { - // Guard: only return headers at or below the committed store height. - // The executor's early save writes an unsigned header to the datastore - // before updating store.Height(), so without this check the P2P layer - // could serve an unsigned header to peers. - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return nil, header.ErrNotFound - } - hdr, err := g.store.GetHeader(ctx, height) + header, err := g.store.GetHeader(ctx, height) if err != nil { return nil, err } @@ -774,7 +766,7 @@ func (g *HeaderStoreGetter) GetByHeight(ctx context.Context, height uint64) (*ty daHint, _ := g.GetDAHint(ctx, height) return &types.P2PSignedHeader{ - SignedHeader: hdr, + SignedHeader: header, DAHeightHint: daHint, }, nil } @@ -820,11 +812,7 @@ func (g *HeaderStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *HeaderStoreGetter) HasAt(ctx context.Context, height uint64) bool { - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return false - } - _, err = g.store.GetHeader(ctx, height) + _, err := g.store.GetHeader(ctx, height) return err == nil } @@ -840,11 +828,6 @@ func NewDataStoreGetter(store Store) *DataStoreGetter { // GetByHeight implements StoreGetter. func (g *DataStoreGetter) GetByHeight(ctx context.Context, height uint64) (*types.P2PData, error) { - // Guard: only return data at or below the committed store height. - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return nil, header.ErrNotFound - } _, data, err := g.store.GetBlockData(ctx, height) if err != nil { return nil, err @@ -899,11 +882,7 @@ func (g *DataStoreGetter) Height(ctx context.Context) (uint64, error) { // HasAt implements StoreGetter. func (g *DataStoreGetter) HasAt(ctx context.Context, height uint64) bool { - storeHeight, err := g.store.Height(ctx) - if err != nil || height > storeHeight { - return false - } - _, _, err = g.store.GetBlockData(ctx, height) + _, _, err := g.store.GetBlockData(ctx, height) return err == nil } diff --git a/test/e2e/evm_full_node_e2e_test.go b/test/e2e/evm_full_node_e2e_test.go index 9e302959d2..e1981dbc1b 100644 --- a/test/e2e/evm_full_node_e2e_test.go +++ b/test/e2e/evm_full_node_e2e_test.go @@ -146,7 +146,7 @@ func verifyTransactionSync(t *testing.T, sequencerClient, fullNodeClient *ethcli } } return false - }, 60*time.Second, 500*time.Millisecond, "Full node should sync the block containing the transaction") + }, 3*time.Minute, 500*time.Millisecond, "Full node should sync the block containing the transaction") // Final verification - both nodes should have the transaction in the same block sequencerReceipt, err := sequencerClient.TransactionReceipt(ctx, txHash) diff --git a/test/e2e/evm_test_common.go b/test/e2e/evm_test_common.go index 5dda0421bd..d5a7215168 100644 --- a/test/e2e/evm_test_common.go +++ b/test/e2e/evm_test_common.go @@ -378,6 +378,7 @@ func setupSequencerNodeLazy(t *testing.T, sut *SystemUnderTest, sequencerHome, j // Use helper methods to get complete URLs args := []string{ "start", + "--evnode.log.level", "debug", "--evnode.log.format", "json", "--evm.jwt-secret-file", jwtSecretFile, "--evm.genesis-hash", genesisHash, @@ -440,6 +441,7 @@ func setupFullNode(t *testing.T, sut *SystemUnderTest, fullNodeHome, sequencerHo // Use helper methods to get complete URLs args := []string{ "start", + "--evnode.log.level", "debug", "--evnode.log.format", "json", "--home", fullNodeHome, "--evm.jwt-secret-file", fullNodeJwtSecretFile, From f854686c9da4af1b029a2c0e3c156115318913da Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 12 Feb 2026 15:23:21 +0100 Subject: [PATCH 3/3] Minor updates --- block/internal/syncing/syncer.go | 15 ++------------- node/failover.go | 12 +++--------- node/full.go | 5 +---- node/node.go | 3 --- pkg/cmd/run_node.go | 2 -- pkg/p2p/client.go | 20 +++----------------- 6 files changed, 9 insertions(+), 48 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 1548b07a55..939abb0389 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -722,7 +722,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve currentState := s.getLastState() headerHash := header.Hash().String() - s.logger.Info().Uint64("height", nextHeight).Msg("syncing block started") + s.logger.Info().Uint64("height", nextHeight).Msg("syncing block") // Compared to the executor logic where the current block needs to be applied first, // here only the previous block needs to be applied to proceed to the verification. @@ -732,17 +732,6 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve s.cache.RemoveHeaderDAIncluded(headerHash) s.cache.RemoveDataDAIncluded(data.DACommitment().String()) - s.logger.Warn(). - Err(err). - Uint64("height", header.Height()). - Uint64("time", uint64(header.Time().Unix())). - Hex("proposer", header.ProposerAddress). - Str("data_hash", hex.EncodeToString(header.DataHash)). - Str("app_hash", hex.EncodeToString(header.AppHash)). - Hex("last_header_hash", header.LastHeaderHash). - Int("len signature", len(header.Signature)). - Msg("block validation failed") - if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) { return errors.Join(errInvalidBlock, err) } @@ -810,7 +799,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve if s.p2pHandler != nil { s.p2pHandler.SetProcessedHeight(newState.LastBlockHeight) } - s.logger.Info().Uint64("height", nextHeight).Msg("syncing block completed") + return nil } diff --git a/node/failover.go b/node/failover.go index 0dfcb54179..493b28e184 100644 --- a/node/failover.go +++ b/node/failover.go @@ -189,23 +189,17 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { return nil }) - // P2P client persists across mode switches (started/closed by FullNode.Run). - // Reconfigure() was already called in setupFailoverState to re-bootstrap DHT. - - // Start header and data sync services concurrently. Each service's - // initFromP2PWithRetry can block up to 30s when peers have no blocks - // (e.g. lazy mode sequencer at height 0). Running them in parallel - // avoids a 60s cumulative startup delay. + // start header and data sync services concurrently to avoid cumulative startup delay. syncWg, syncCtx := errgroup.WithContext(ctx) syncWg.Go(func() error { if err := f.headerSyncService.Start(syncCtx); err != nil { - return fmt.Errorf("error while starting header sync service: %w", err) + return fmt.Errorf("header sync service: %w", err) } return nil }) syncWg.Go(func() error { if err := f.dataSyncService.Start(syncCtx); err != nil { - return fmt.Errorf("error while starting data sync service: %w", err) + return fmt.Errorf("data sync service: %w", err) } return nil }) diff --git a/node/full.go b/node/full.go index 42e4b6349f..41106de365 100644 --- a/node/full.go +++ b/node/full.go @@ -104,8 +104,6 @@ func newFullNode( } } - // The p2p client is fully configured and started before leader election. - // SyncService.getPeerIDs() gates peer usage on conf.Node.Aggregator. leaderFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting aggregator-MODE") nodeConfig.Node.Aggregator = true @@ -283,8 +281,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { n.prometheusSrv, n.pprofSrv = n.startInstrumentationServer() } - // Start the P2P client once. It persists across mode switches so that - // the host and PubSub (including externally registered topics) survive. + // Start the P2P client once. It persists across mode switches if err := n.p2pClient.Start(ctx); err != nil { return fmt.Errorf("start p2p: %w", err) } diff --git a/node/node.go b/node/node.go index 139636d644..d8aeea333f 100644 --- a/node/node.go +++ b/node/node.go @@ -28,9 +28,6 @@ type NodeOptions struct { // NewNode returns a new Full or Light Node based on the config. // This is the entry point for composing a node, when compiling a node, you need to provide an executor. // Example executors can be found in apps/ -// -// The p2pClient owns the node identity (private key) and is shared across -// mode switches. It supports in-place reconfiguration via Reconfigure(). func NewNode( conf config.Config, exec coreexecutor.Executor, diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index df96a0e371..33b1eba006 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -164,8 +164,6 @@ func StartNode( executor = telemetry.WithTracingExecutor(executor) } - // Create the P2P client. It is long-lived and reconfigured in-place - // on mode switches, avoiding costly teardown of the libp2p stack. p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, datastore, genesis.ChainID, logger, nil) if err != nil { return fmt.Errorf("create p2p client: %w", err) diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index 7859c3cf97..4288f749f9 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -49,8 +49,7 @@ type Client struct { chainID string privKey crypto.PrivKey - rawHost host.Host // unwrapped libp2p host, stored to avoid double-wrapping via routedhost - host host.Host // may be wrapped with routedhost after DHT setup + host host.Host dht *dht.IpfsDHT disc *discovery.RoutingDiscovery gater *conngater.BasicConnectionGater @@ -124,12 +123,11 @@ func NewClientWithHost( // 4. Use active peer discovery to look for peers from same ORU network. func (c *Client) Start(ctx context.Context) error { if c.started { - return nil // already started — called from FullNode.Run() + return nil } c.logger.Debug().Msg("starting P2P client") if c.host != nil { - c.rawHost = c.host return c.startWithHost(ctx, c.host) } @@ -137,7 +135,6 @@ func (c *Client) Start(ctx context.Context) error { if err != nil { return err } - c.rawHost = h return c.startWithHost(ctx, h) } @@ -185,21 +182,10 @@ func (c *Client) Close() error { if c.host != nil { err = errors.Join(err, c.host.Close()) } + c.started = false return err } -// PrivKey returns the node's private key. -func (c *Client) PrivKey() crypto.PrivKey { - return c.privKey -} - -// Reconfigure updates the mutable P2P configuration without tearing down -// the libp2p host, PubSub, or DHT. Currently this only updates the -// stored config; the sync service gates peer usage on conf.Node.Aggregator. -func (c *Client) Reconfigure(conf config.P2PConfig) { - c.conf = conf -} - // Addrs returns listen addresses of Client. func (c *Client) Addrs() []multiaddr.Multiaddr { return c.host.Addrs()