Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions block/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
zerolog.Nop(),
NopMetrics(),
DefaultBlockOptions(),
nil,
nil, // raftNode
)

require.NoError(t, err)
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) {
zerolog.Nop(),
NopMetrics(),
DefaultBlockOptions(),
nil,
nil, // raftNode
)
require.NoError(t, err)

Expand Down
55 changes: 23 additions & 32 deletions node/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,11 @@ import (
"github.com/evstack/ev-node/pkg/config"
genesispkg "github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/p2p"
"github.com/evstack/ev-node/pkg/p2p/key"
"github.com/evstack/ev-node/pkg/raft"
rpcserver "github.com/evstack/ev-node/pkg/rpc/server"
"github.com/evstack/ev-node/pkg/signer"
"github.com/evstack/ev-node/pkg/store"
evsync "github.com/evstack/ev-node/pkg/sync"
ds "github.com/ipfs/go-datastore"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
)
Expand All @@ -38,17 +36,15 @@ type failoverState struct {

func newSyncMode(
nodeConfig config.Config,
nodeKey *key.NodeKey,
genesis genesispkg.Genesis,
rootDB ds.Batching,
daStore store.Store,
exec coreexecutor.Executor,
da block.DAClient,
logger zerolog.Logger,
rktStore store.Store,
blockMetrics *block.Metrics,
nodeOpts NodeOptions,
raftNode *raft.Node,
p2pClient *p2p.Client,
) (*failoverState, error) {
blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) {
return block.NewSyncComponents(
Expand All @@ -67,16 +63,13 @@ func newSyncMode(
raftNode,
)
}
return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode)
return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient)
}

func newAggregatorMode(
nodeConfig config.Config,
nodeKey *key.NodeKey,
signer signer.Signer,
genesis genesispkg.Genesis,
rootDB ds.Batching,
daStore store.Store,
exec coreexecutor.Executor,
sequencer coresequencer.Sequencer,
da block.DAClient,
Expand All @@ -85,8 +78,8 @@ func newAggregatorMode(
blockMetrics *block.Metrics,
nodeOpts NodeOptions,
raftNode *raft.Node,
p2pClient *p2p.Client,
) (*failoverState, error) {

blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) {
return block.NewAggregatorComponents(
nodeConfig,
Expand All @@ -105,31 +98,24 @@ func newAggregatorMode(
)
}

return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode)
return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient)
}

func setupFailoverState(
nodeConfig config.Config,
nodeKey *key.NodeKey,
rootDB ds.Batching,
daStore store.Store,
genesis genesispkg.Genesis,
logger zerolog.Logger,
rktStore store.Store,
buildComponentsFn func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error),
raftNode *raft.Node,
p2pClient *p2p.Client,
) (*failoverState, error) {
p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, rootDB, genesis.ChainID, logger, nil)
if err != nil {
return nil, err
}

headerSyncService, err := evsync.NewHeaderSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger())
headerSyncService, err := evsync.NewHeaderSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger())
if err != nil {
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
}

dataSyncService, err := evsync.NewDataSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger())
dataSyncService, err := evsync.NewDataSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger())
if err != nil {
return nil, fmt.Errorf("error while initializing DataSyncService: %w", err)
}
Expand Down Expand Up @@ -203,19 +189,24 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
return nil
})

if err := f.p2pClient.Start(ctx); err != nil {
return fmt.Errorf("start p2p: %w", err)
}
defer f.p2pClient.Close() // nolint: errcheck

if err := f.headerSyncService.Start(ctx); err != nil {
return fmt.Errorf("error while starting header sync service: %w", err)
// start header and data sync services concurrently to avoid cumulative startup delay.
syncWg, syncCtx := errgroup.WithContext(ctx)
syncWg.Go(func() error {
if err := f.headerSyncService.Start(syncCtx); err != nil {
return fmt.Errorf("header sync service: %w", err)
}
return nil
})
syncWg.Go(func() error {
if err := f.dataSyncService.Start(syncCtx); err != nil {
return fmt.Errorf("data sync service: %w", err)
}
return nil
})
if err := syncWg.Wait(); err != nil {
return err
}
defer stopService(f.headerSyncService.Stop, "header sync")

if err := f.dataSyncService.Start(ctx); err != nil {
return fmt.Errorf("error while starting data sync service: %w", err)
}
defer stopService(f.dataSyncService.Stop, "data sync")

wg.Go(func() error {
Expand Down
24 changes: 18 additions & 6 deletions node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/config"
genesispkg "github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/p2p/key"
"github.com/evstack/ev-node/pkg/p2p"
raftpkg "github.com/evstack/ev-node/pkg/raft"
"github.com/evstack/ev-node/pkg/service"
"github.com/evstack/ev-node/pkg/signer"
Expand Down Expand Up @@ -53,7 +53,8 @@ type FullNode struct {

nodeConfig config.Config

daClient block.DAClient
daClient block.DAClient
p2pClient *p2p.Client

Store store.Store
raftNode *raftpkg.Node
Expand All @@ -66,7 +67,7 @@ type FullNode struct {
// newFullNode creates a new Rollkit full node.
func newFullNode(
nodeConfig config.Config,
nodeKey *key.NodeKey,
p2pClient *p2p.Client,
signer signer.Signer,
genesis genesispkg.Genesis,
database ds.Batching,
Expand Down Expand Up @@ -106,13 +107,12 @@ func newFullNode(
leaderFactory := func() (raftpkg.Runnable, error) {
logger.Info().Msg("Starting aggregator-MODE")
nodeConfig.Node.Aggregator = true
nodeConfig.P2P.Peers = "" // peers are not supported in aggregator mode
return newAggregatorMode(nodeConfig, nodeKey, signer, genesis, database, evstore, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode)
return newAggregatorMode(nodeConfig, signer, genesis, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient)
}
followerFactory := func() (raftpkg.Runnable, error) {
logger.Info().Msg("Starting sync-MODE")
nodeConfig.Node.Aggregator = false
return newSyncMode(nodeConfig, nodeKey, genesis, database, evstore, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode)
return newSyncMode(nodeConfig, genesis, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient)
}

// Initialize raft node if enabled (for both aggregator and sync nodes)
Expand All @@ -136,6 +136,7 @@ func newFullNode(
genesis: genesis,
nodeConfig: nodeConfig,
daClient: daClient,
p2pClient: p2pClient,
Store: evstore,
leaderElection: leaderElection,
raftNode: raftNode,
Expand Down Expand Up @@ -279,6 +280,17 @@ func (n *FullNode) Run(parentCtx context.Context) error {
(n.nodeConfig.Instrumentation.IsPrometheusEnabled() || n.nodeConfig.Instrumentation.IsPprofEnabled()) {
n.prometheusSrv, n.pprofSrv = n.startInstrumentationServer()
}

// Start the P2P client once. It persists across mode switches
if err := n.p2pClient.Start(ctx); err != nil {
return fmt.Errorf("start p2p: %w", err)
}
defer func() {
if err := n.p2pClient.Close(); err != nil {
n.Logger.Error().Err(err).Msg("error closing p2p client")
}
}()

// Start leader election
if n.raftNode != nil {
if err := n.raftNode.Start(ctx); err != nil {
Expand Down
51 changes: 32 additions & 19 deletions node/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/test/testda"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog"
"github.com/stretchr/testify/require"

evconfig "github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/p2p/key"
"github.com/evstack/ev-node/pkg/p2p"
remote_signer "github.com/evstack/ev-node/pkg/signer/noop"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/types"
Expand Down Expand Up @@ -67,22 +68,18 @@ func newDummyDAClient(maxBlobSize uint64) *testda.DummyDA {
return getSharedDummyDA(maxBlobSize)
}

func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, *key.NodeKey, datastore.Batching, func()) {
func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, crypto.PrivKey, datastore.Batching, func()) {
executor := coreexecutor.NewDummyExecutor()
sequencer := coresequencer.NewDummySequencer()
daClient := newDummyDAClient(0)

// Create genesis and keys for P2P client
_, genesisValidatorKey, _ := types.GetGenesisWithPrivkey("test-chain")
p2pKey := &key.NodeKey{
PrivKey: genesisValidatorKey,
PubKey: genesisValidatorKey.GetPublic(),
}
ds, err := store.NewTestInMemoryKVStore()
require.NoError(t, err)

stop := daClient.StartHeightTicker(config.DA.BlockTime.Duration)
return executor, sequencer, daClient, p2pKey, ds, stop
return executor, sequencer, daClient, genesisValidatorKey, ds, stop
}

func getTestConfig(t *testing.T, n int) evconfig.Config {
Expand Down Expand Up @@ -120,7 +117,7 @@ func newTestNode(
executor coreexecutor.Executor,
sequencer coresequencer.Sequencer,
daClient block.DAClient,
nodeKey *key.NodeKey,
privKey crypto.PrivKey,
ds datastore.Batching,
stopDAHeightTicker func(),
) (*FullNode, func()) {
Expand All @@ -133,13 +130,17 @@ func newTestNode(
if testing.Verbose() {
logger = zerolog.New(zerolog.NewTestWriter(t))
}

p2pClient, err := newTestP2PClient(config, privKey, ds, genesis.ChainID, logger)
require.NoError(t, err)

node, err := NewNode(
config,
executor,
sequencer,
daClient,
remoteSigner,
nodeKey,
p2pClient,
genesis,
ds,
DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()),
Expand All @@ -159,8 +160,8 @@ func newTestNode(

func createNodeWithCleanup(t *testing.T, config evconfig.Config) (*FullNode, func()) {
resetSharedDummyDA()
executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker := createTestComponents(t, config)
return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker)
executor, sequencer, daClient, privKey, ds, stopDAHeightTicker := createTestComponents(t, config)
return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker)
}

func createNodeWithCustomComponents(
Expand All @@ -169,11 +170,11 @@ func createNodeWithCustomComponents(
executor coreexecutor.Executor,
sequencer coresequencer.Sequencer,
daClient block.DAClient,
nodeKey *key.NodeKey,
privKey crypto.PrivKey,
ds datastore.Batching,
stopDAHeightTicker func(),
) (*FullNode, func()) {
return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker)
return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker)
}

// Creates the given number of nodes the given nodes using the given wait group to synchronize them
Expand All @@ -192,24 +193,28 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F

aggListenAddress := config.P2P.ListenAddress
aggPeers := config.P2P.Peers
executor, sequencer, daClient, aggP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config)
executor, sequencer, daClient, aggPrivKey, ds, stopDAHeightTicker := createTestComponents(t, config)
if d, ok := daClient.(*testda.DummyDA); ok {
d.Reset()
}
aggPeerID, err := peer.IDFromPrivateKey(aggP2PKey.PrivKey)
aggPeerID, err := peer.IDFromPrivateKey(aggPrivKey)
require.NoError(err)

logger := zerolog.Nop()
if testing.Verbose() {
logger = zerolog.New(zerolog.NewTestWriter(t))
}

aggP2PClient, err := newTestP2PClient(config, aggPrivKey, ds, genesis.ChainID, logger)
require.NoError(err)

aggNode, err := NewNode(
config,
executor,
sequencer,
daClient,
remoteSigner,
aggP2PKey,
aggP2PClient,
genesis,
ds,
DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()),
Expand All @@ -236,16 +241,19 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F
}
config.P2P.ListenAddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 40001+i)
config.RPC.Address = fmt.Sprintf("127.0.0.1:%d", 8001+i)
executor, sequencer, daClient, nodeP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config)
executor, sequencer, daClient, nodePrivKey, ds, stopDAHeightTicker := createTestComponents(t, config)
stopDAHeightTicker()

nodeP2PClient, err := newTestP2PClient(config, nodePrivKey, ds, genesis.ChainID, logger)
require.NoError(err)

node, err := NewNode(
config,
executor,
sequencer,
daClient,
nil,
nodeP2PKey,
nodeP2PClient,
genesis,
ds,
DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()),
Expand All @@ -257,14 +265,19 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F
// No-op: ticker already stopped
}
nodes[i], cleanups[i] = node.(*FullNode), cleanup
nodePeerID, err := peer.IDFromPrivateKey(nodeP2PKey.PrivKey)
nodePeerID, err := peer.IDFromPrivateKey(nodePrivKey)
require.NoError(err)
peersList = append(peersList, fmt.Sprintf("%s/p2p/%s", config.P2P.ListenAddress, nodePeerID.Loggable()["peerID"].(string)))
}

return nodes, cleanups
}

// newTestP2PClient creates a p2p.Client for testing.
func newTestP2PClient(config evconfig.Config, privKey crypto.PrivKey, ds datastore.Batching, chainID string, logger zerolog.Logger) (*p2p.Client, error) {
return p2p.NewClient(config.P2P, privKey, ds, chainID, logger, nil)
}

// Helper to create N contexts and cancel functions
func createNodeContexts(n int) ([]context.Context, []context.CancelFunc) {
ctxs := make([]context.Context, n)
Expand Down
Loading
Loading