diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index c01af2ca0..0032a079d 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -6,7 +6,6 @@ import ( _ "embed" "errors" "fmt" - "os" "os/signal" "syscall" "time" @@ -52,6 +51,10 @@ type ( ShouldWriteTransactions bool ShouldWriteTransactionEvents bool ShouldWritePeers bool + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool ShouldRunPprof bool PprofPort uint ShouldRunPrometheus bool @@ -74,6 +77,9 @@ type ( RequestsCache p2p.CacheOptions ParentsCache p2p.CacheOptions BlocksCache p2p.CacheOptions + TxsCache p2p.CacheOptions + KnownTxsCache p2p.CacheOptions + KnownBlocksCache p2p.CacheOptions bootnodes []*enode.Node staticNodes []*enode.Node @@ -166,7 +172,10 @@ var SensorCmd = &cobra.Command{ return nil }, RunE: func(cmd *cobra.Command, args []string) error { - db, err := newDatabase(cmd.Context()) + ctx, stop := signal.NotifyContext(cmd.Context(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + db, err := newDatabase(ctx) if err != nil { return err } @@ -195,21 +204,32 @@ var SensorCmd = &cobra.Command{ // Create peer connection manager for broadcasting transactions // and managing the global blocks cache conns := p2p.NewConns(p2p.ConnsOptions{ - BlocksCache: inputSensorParams.BlocksCache, - Head: head, + BlocksCache: inputSensorParams.BlocksCache, + TxsCache: inputSensorParams.TxsCache, + KnownTxsCache: inputSensorParams.KnownTxsCache, + KnownBlocksCache: inputSensorParams.KnownBlocksCache, + Head: head, + ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx, + ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes, + ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks, + ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes, }) opts := p2p.EthProtocolOptions{ - Context: cmd.Context(), - Database: db, - GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), - RPC: inputSensorParams.RPC, - SensorID: inputSensorParams.SensorID, - NetworkID: inputSensorParams.NetworkID, - Conns: conns, - ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, - RequestsCache: inputSensorParams.RequestsCache, - ParentsCache: inputSensorParams.ParentsCache, + Context: ctx, + Database: db, + GenesisHash: common.HexToHash(inputSensorParams.GenesisHash), + RPC: inputSensorParams.RPC, + SensorID: inputSensorParams.SensorID, + NetworkID: inputSensorParams.NetworkID, + Conns: conns, + ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)}, + RequestsCache: inputSensorParams.RequestsCache, + ParentsCache: inputSensorParams.ParentsCache, + ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx, + ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes, + ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks, + ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes, } config := ethp2p.Config{ @@ -241,20 +261,14 @@ var SensorCmd = &cobra.Command{ if err = server.Start(); err != nil { return err } - defer server.Stop() + defer stopServer(&server) events := make(chan *ethp2p.PeerEvent) sub := server.SubscribeEvents(events) defer sub.Unsubscribe() - ticker := time.NewTicker(2 * time.Second) // Ticker for recurring tasks every 2 seconds. - ticker1h := time.NewTicker(time.Hour) // Ticker for running DNS discovery every hour. + ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() - defer ticker1h.Stop() - - dnsLock := make(chan struct{}, 1) - signals := make(chan os.Signal, 1) - signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) if inputSensorParams.ShouldRunPprof { go handlePprof() @@ -265,34 +279,17 @@ var SensorCmd = &cobra.Command{ } go handleAPI(&server, conns) - - // Start the RPC server for receiving transactions go handleRPC(conns, inputSensorParams.NetworkID) - - // Run DNS discovery immediately at startup. - go handleDNSDiscovery(&server, dnsLock) + go handleDNSDiscovery(&server) for { select { case <-ticker.C: peersGauge.Set(float64(server.PeerCount())) - db.WritePeers(cmd.Context(), server.Peers(), time.Now()) - + db.WritePeers(ctx, server.Peers(), time.Now()) metrics.Update(conns.HeadBlock().Block, conns.OldestBlock()) - - urls := []string{} - for _, peer := range server.Peers() { - urls = append(urls, peer.Node().URLv4()) - } - - if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil { - log.Error().Err(err).Msg("Failed to write nodes to file") - } - case <-ticker1h.C: - go handleDNSDiscovery(&server, dnsLock) - case <-signals: - // This gracefully stops the sensor so that the peers can be written to - // the nodes file. + writePeers(server.Peers()) + case <-ctx.Done(): log.Info().Msg("Stopping sensor...") return nil case event := <-events: @@ -304,6 +301,34 @@ var SensorCmd = &cobra.Command{ }, } +// writePeers writes the enode URLs of connected peers to the nodes file. +func writePeers(peers []*ethp2p.Peer) { + urls := make([]string, 0, len(peers)) + for _, peer := range peers { + urls = append(urls, peer.Node().URLv4()) + } + + if err := p2p.WritePeers(inputSensorParams.NodesFile, urls); err != nil { + log.Error().Err(err).Msg("Failed to write nodes to file") + } +} + +// stopServer stops the p2p server with a timeout to avoid hanging on shutdown. +// This is necessary because go-ethereum's discovery shutdown can deadlock. +func stopServer(server *ethp2p.Server) { + done := make(chan struct{}) + + go func() { + server.Stop() + close(done) + }() + + select { + case <-done: + case <-time.After(5 * time.Second): + } +} + // handlePprof starts a server for performance profiling using pprof on the // specified port. This allows for real-time monitoring and analysis of the // sensor's performance. The port number is configured through @@ -330,20 +355,24 @@ func handlePrometheus() { // handleDNSDiscovery performs DNS-based peer discovery and adds new peers to // the p2p server. It uses an iterator to discover peers incrementally rather -// than loading all nodes at once. The lock channel prevents concurrent runs. -func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) { +// than loading all nodes at once. Runs immediately and then hourly. +func handleDNSDiscovery(server *ethp2p.Server) { if len(inputSensorParams.DiscoveryDNS) == 0 { return } - select { - case lock <- struct{}{}: - defer func() { <-lock }() - default: - log.Warn().Msg("DNS discovery already running, skipping") - return + discoverPeers(server) + + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + + for range ticker.C { + discoverPeers(server) } +} +// discoverPeers performs a single DNS discovery round. +func discoverPeers(server *ethp2p.Server) { log.Info(). Str("discovery-dns", inputSensorParams.DiscoveryDNS). Msg("Starting DNS discovery") @@ -356,7 +385,6 @@ func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) { } defer iter.Close() - // Add DNS-discovered peers using the iterator. count := 0 for iter.Next() { node := iter.Node() @@ -364,9 +392,6 @@ func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) { Str("enode", node.URLv4()). Msg("Discovered peer through DNS") - // Add the peer to the static node set. The server itself handles whether to - // connect to the peer if it's already connected. If a node is part of the - // static peer set, the server will handle reconnecting after disconnects. server.AddPeer(node) count++ } @@ -449,6 +474,10 @@ will result in less chance of missing data but can significantly increase memory f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, `write transaction events to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database") + f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers") + f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers") f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server") f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on") f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server") @@ -482,4 +511,10 @@ will result in less chance of missing data but can significantly increase memory f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)") f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)") f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)") + f.IntVar(&inputSensorParams.TxsCache.MaxSize, "max-txs", 8192, "maximum transactions to cache for serving to peers (0 for no limit)") + f.DurationVar(&inputSensorParams.TxsCache.TTL, "txs-cache-ttl", 10*time.Minute, "time to live for transaction cache entries (0 for no expiration)") + f.IntVar(&inputSensorParams.KnownTxsCache.MaxSize, "max-known-txs", 8192, "maximum transaction hashes to track per peer (0 for no limit)") + f.DurationVar(&inputSensorParams.KnownTxsCache.TTL, "known-txs-cache-ttl", 5*time.Minute, "time to live for known transaction cache entries (0 for no expiration)") + f.IntVar(&inputSensorParams.KnownBlocksCache.MaxSize, "max-known-blocks", 1024, "maximum block hashes to track per peer (0 for no limit)") + f.DurationVar(&inputSensorParams.KnownBlocksCache.TTL, "known-blocks-cache-ttl", 5*time.Minute, "time to live for known block cache entries (0 for no expiration)") } diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 3c7ce877a..eed8195cc 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -91,50 +91,60 @@ polycli p2p sensor amoy-nodes.json \ ## Flags ```bash - --api-port uint port API server will listen on (default 8080) - --blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s) - -b, --bootnodes string comma separated nodes used for bootstrapping - --database string which database to persist data to, options are: - - datastore (GCP Datastore) - - json (output to stdout) - - none (no persistence) (default "none") - -d, --database-id string datastore database ID - --dial-ratio int ratio of inbound to dialed connections (dial ratio of 2 allows 1/2 of connections to be dialed, setting to 0 defaults to 3) - --discovery-dns string DNS discovery ENR tree URL - --discovery-port int UDP P2P discovery port (default 30303) - --fork-id bytesHex hex encoded fork ID (omit 0x) (default 22D523B2) - --genesis-hash string genesis block hash (default "0xa9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b") - -h, --help help for sensor - --key string hex-encoded private key (cannot be set with --key-file) - -k, --key-file string private key file (cannot be set with --key) - --max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024) - -D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this - will result in less chance of missing data but can significantly increase memory usage) (default 10000) - --max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024) - -m, --max-peers int maximum number of peers to connect to (default 2000) - --max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048) - --nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:|extip:) (default "any") - -n, --network-id uint filter discovered nodes by this network ID - --no-discovery disable P2P peer discovery - --parents-cache-ttl duration time to live for parent hash cache entries (0 for no expiration) (default 5m0s) - --port int TCP network listening port (default 30303) - --pprof run pprof server - --pprof-port uint port pprof runs on (default 6060) - -p, --project-id string GCP project ID - --prom run Prometheus server (default true) - --prom-port uint port Prometheus runs on (default 2112) - --requests-cache-ttl duration time to live for requests cache entries (0 for no expiration) (default 5m0s) - --rpc string RPC endpoint used to fetch latest block (default "https://polygon-rpc.com") - --rpc-port uint port for JSON-RPC server to receive transactions (default 8545) - -s, --sensor-id string sensor ID when writing block/tx events - --static-nodes string static nodes file - --trusted-nodes string trusted nodes file - --ttl duration time to live (default 336h0m0s) - --write-block-events write block events to database (default true) - -B, --write-blocks write blocks to database (default true) - --write-peers write peers to database (default true) - --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) - -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) + --api-port uint port API server will listen on (default 8080) + --blocks-cache-ttl duration time to live for block cache entries (0 for no expiration) (default 10m0s) + -b, --bootnodes string comma separated nodes used for bootstrapping + --broadcast-block-hashes broadcast block hashes to peers + --broadcast-blocks broadcast full blocks to peers + --broadcast-tx-hashes broadcast transaction hashes to peers + --broadcast-txs broadcast full transactions to peers + --database string which database to persist data to, options are: + - datastore (GCP Datastore) + - json (output to stdout) + - none (no persistence) (default "none") + -d, --database-id string datastore database ID + --dial-ratio int ratio of inbound to dialed connections (dial ratio of 2 allows 1/2 of connections to be dialed, setting to 0 defaults to 3) + --discovery-dns string DNS discovery ENR tree URL + --discovery-port int UDP P2P discovery port (default 30303) + --fork-id bytesHex hex encoded fork ID (omit 0x) (default 22D523B2) + --genesis-hash string genesis block hash (default "0xa9c28ce2141b56c474f1dc504bee9b01eb1bd7d1a507580d5519d4437a97de1b") + -h, --help help for sensor + --key string hex-encoded private key (cannot be set with --key-file) + -k, --key-file string private key file (cannot be set with --key) + --known-blocks-cache-ttl duration time to live for known block cache entries (0 for no expiration) (default 5m0s) + --known-txs-cache-ttl duration time to live for known transaction cache entries (0 for no expiration) (default 5m0s) + --max-blocks int maximum blocks to track across all peers (0 for no limit) (default 1024) + -D, --max-db-concurrency int maximum number of concurrent database operations to perform (increasing this + will result in less chance of missing data but can significantly increase memory usage) (default 10000) + --max-known-blocks int maximum block hashes to track per peer (0 for no limit) (default 1024) + --max-known-txs int maximum transaction hashes to track per peer (0 for no limit) (default 8192) + --max-parents int maximum parent block hashes to track per peer (0 for no limit) (default 1024) + -m, --max-peers int maximum number of peers to connect to (default 2000) + --max-requests int maximum request IDs to track per peer (0 for no limit) (default 2048) + --max-txs int maximum transactions to cache for serving to peers (0 for no limit) (default 8192) + --nat string NAT port mapping mechanism (any|none|upnp|pmp|pmp:|extip:) (default "any") + -n, --network-id uint filter discovered nodes by this network ID + --no-discovery disable P2P peer discovery + --parents-cache-ttl duration time to live for parent hash cache entries (0 for no expiration) (default 5m0s) + --port int TCP network listening port (default 30303) + --pprof run pprof server + --pprof-port uint port pprof runs on (default 6060) + -p, --project-id string GCP project ID + --prom run Prometheus server (default true) + --prom-port uint port Prometheus runs on (default 2112) + --requests-cache-ttl duration time to live for requests cache entries (0 for no expiration) (default 5m0s) + --rpc string RPC endpoint used to fetch latest block (default "https://polygon-rpc.com") + --rpc-port uint port for JSON-RPC server to receive transactions (default 8545) + -s, --sensor-id string sensor ID when writing block/tx events + --static-nodes string static nodes file + --trusted-nodes string trusted nodes file + --ttl duration time to live (default 336h0m0s) + --txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s) + --write-block-events write block events to database (default true) + -B, --write-blocks write blocks to database (default true) + --write-peers write peers to database (default true) + --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) + -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) ``` The command also inherits flags from parent commands. diff --git a/p2p/cache.go b/p2p/cache.go index def29458b..b73309743 100644 --- a/p2p/cache.go +++ b/p2p/cache.go @@ -24,7 +24,7 @@ type Cache[K comparable, V any] struct { type entry[K comparable, V any] struct { key K value V - expiresAt time.Time + expiresAt *time.Time // nil when TTL=0, saves 16 bytes per entry } // NewCache creates a new cache with the given options. @@ -44,10 +44,10 @@ func (c *Cache[K, V]) Add(key K, value V) { c.mu.Lock() defer c.mu.Unlock() - now := time.Now() - expiresAt := time.Time{} + var expiresAt *time.Time if c.ttl > 0 { - expiresAt = now.Add(c.ttl) + t := time.Now().Add(c.ttl) + expiresAt = &t } if elem, ok := c.items[key]; ok { @@ -89,7 +89,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) { e := elem.Value.(*entry[K, V]) - if c.ttl > 0 && time.Now().After(e.expiresAt) { + if e.expiresAt != nil && time.Now().After(*e.expiresAt) { c.list.Remove(elem) delete(c.items, key) var zero V @@ -114,7 +114,7 @@ func (c *Cache[K, V]) Peek(key K) (V, bool) { e := elem.Value.(*entry[K, V]) - if c.ttl > 0 && time.Now().After(e.expiresAt) { + if e.expiresAt != nil && time.Now().After(*e.expiresAt) { var zero V return zero, false } @@ -131,15 +131,16 @@ func (c *Cache[K, V]) Update(key K, updateFn func(V) V) { defer c.mu.Unlock() now := time.Now() - expiresAt := time.Time{} + var expiresAt *time.Time if c.ttl > 0 { - expiresAt = now.Add(c.ttl) + t := now.Add(c.ttl) + expiresAt = &t } var currentVal V if elem, ok := c.items[key]; ok { e := elem.Value.(*entry[K, V]) - if c.ttl == 0 || !now.After(e.expiresAt) { + if e.expiresAt == nil || !now.After(*e.expiresAt) { currentVal = e.value // Update existing entry c.list.MoveToFront(elem) @@ -186,7 +187,7 @@ func (c *Cache[K, V]) Contains(key K) bool { e := elem.Value.(*entry[K, V]) - if c.ttl > 0 && time.Now().After(e.expiresAt) { + if e.expiresAt != nil && time.Now().After(*e.expiresAt) { return false } diff --git a/p2p/conns.go b/p2p/conns.go index a44960483..7c291ae03 100644 --- a/p2p/conns.go +++ b/p2p/conns.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/eth/protocols/eth" ethp2p "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/rs/zerolog/log" ) // BlockCache stores the actual block data to avoid duplicate fetches and database queries. @@ -19,6 +20,19 @@ type BlockCache struct { TD *big.Int } +// ConnsOptions contains configuration options for creating a new Conns manager. +type ConnsOptions struct { + BlocksCache CacheOptions + TxsCache CacheOptions + KnownTxsCache CacheOptions + KnownBlocksCache CacheOptions + Head eth.NewBlockPacket + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool +} + // Conns manages a collection of active peer connections for transaction broadcasting. // It also maintains a global cache of blocks written to the database. type Conns struct { @@ -29,18 +43,25 @@ type Conns struct { // to avoid duplicate writes and requests. blocks *Cache[common.Hash, BlockCache] + // txs caches transactions for serving to peers and duplicate detection + txs *Cache[common.Hash, *types.Transaction] + + // knownTxsOpts and knownBlocksOpts store cache options for per-peer caches + knownTxsOpts CacheOptions + knownBlocksOpts CacheOptions + // oldest stores the first block the sensor has seen so when fetching // parent blocks, it does not request blocks older than this. oldest *Locked[*types.Header] // head keeps track of the current head block of the chain. head *Locked[eth.NewBlockPacket] -} -// ConnsOptions contains configuration options for creating a new Conns manager. -type ConnsOptions struct { - BlocksCache CacheOptions - Head eth.NewBlockPacket + // Broadcast flags control what gets cached and rebroadcasted + shouldBroadcastTx bool + shouldBroadcastTxHashes bool + shouldBroadcastBlocks bool + shouldBroadcastBlockHashes bool } // NewConns creates a new connection manager with a blocks cache. @@ -52,10 +73,17 @@ func NewConns(opts ConnsOptions) *Conns { oldest.Set(opts.Head.Block.Header()) return &Conns{ - conns: make(map[string]*conn), - blocks: NewCache[common.Hash, BlockCache](opts.BlocksCache), - oldest: oldest, - head: head, + conns: make(map[string]*conn), + blocks: NewCache[common.Hash, BlockCache](opts.BlocksCache), + txs: NewCache[common.Hash, *types.Transaction](opts.TxsCache), + knownTxsOpts: opts.KnownTxsCache, + knownBlocksOpts: opts.KnownBlocksCache, + oldest: oldest, + head: head, + shouldBroadcastTx: opts.ShouldBroadcastTx, + shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes, + shouldBroadcastBlocks: opts.ShouldBroadcastBlocks, + shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes, } } @@ -81,9 +109,15 @@ func (c *Conns) BroadcastTx(tx *types.Transaction) int { return c.BroadcastTxs(types.Transactions{tx}) } -// BroadcastTxs broadcasts multiple transactions to all connected peers. +// BroadcastTxs broadcasts multiple transactions to all connected peers, +// filtering out transactions that each peer already knows about. // Returns the number of peers the transactions were successfully sent to. +// If broadcast flags are disabled, this is a no-op. func (c *Conns) BroadcastTxs(txs types.Transactions) int { + if !c.shouldBroadcastTx { + return 0 + } + c.mu.RLock() defer c.mu.RUnlock() @@ -93,12 +127,191 @@ func (c *Conns) BroadcastTxs(txs types.Transactions) int { count := 0 for _, cn := range c.conns { - if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, txs); err != nil { + // Filter transactions this peer doesn't know about + unknownTxs := make(types.Transactions, 0, len(txs)) + for _, tx := range txs { + if !cn.hasKnownTx(tx.Hash()) { + unknownTxs = append(unknownTxs, tx) + } + } + + if len(unknownTxs) == 0 { + continue + } + + // Send as TransactionsPacket + packet := eth.TransactionsPacket(unknownTxs) + cn.countMsgSent(packet.Name(), float64(len(unknownTxs))) + if err := ethp2p.Send(cn.rw, eth.TransactionsMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send transactions") + continue + } + + // Mark transactions as known for this peer + for _, tx := range unknownTxs { + cn.addKnownTx(tx.Hash()) + } + + count++ + } + + if count > 0 { + log.Debug(). + Int("peers", count). + Int("txs", len(txs)). + Msg("Broadcasted transactions") + } + + return count +} + +// BroadcastTxHashes broadcasts transaction hashes to peers that don't already +// know about them and returns the number of peers the hashes were successfully +// sent to. If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastTxHashes(hashes []common.Hash) int { + if !c.shouldBroadcastTxHashes { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if len(hashes) == 0 { + return 0 + } + + count := 0 + for _, cn := range c.conns { + // Filter hashes this peer doesn't know about + unknownHashes := make([]common.Hash, 0, len(hashes)) + for _, hash := range hashes { + if !cn.hasKnownTx(hash) { + unknownHashes = append(unknownHashes, hash) + } + } + + if len(unknownHashes) == 0 { + continue + } + + // Send NewPooledTransactionHashesPacket + packet := eth.NewPooledTransactionHashesPacket{ + Types: make([]byte, len(unknownHashes)), + Sizes: make([]uint32, len(unknownHashes)), + Hashes: unknownHashes, + } + + cn.countMsgSent(packet.Name(), float64(len(unknownHashes))) + if err := ethp2p.Send(cn.rw, eth.NewPooledTransactionHashesMsg, packet); err != nil { + cn.logger.Debug(). + Err(err). + Msg("Failed to send transaction hashes") + continue + } + + // Mark hashes as known for this peer + for _, hash := range unknownHashes { + cn.addKnownTx(hash) + } + + count++ + } + + if count > 0 { + log.Debug(). + Int("peers", count). + Int("hashes", len(hashes)). + Msg("Broadcasted transaction hashes") + } + + return count +} + +// BroadcastBlock broadcasts a full block to peers that don't already know +// about it and returns the number of peers the block was successfully sent to. +// If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastBlock(block *types.Block, td *big.Int) int { + if !c.shouldBroadcastBlocks { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if block == nil { + return 0 + } + + hash := block.Hash() + count := 0 + + for _, cn := range c.conns { + // Skip if peer already knows about this block + if cn.hasKnownBlock(hash) { continue } + + // Send NewBlockPacket + packet := eth.NewBlockPacket{ + Block: block, + TD: td, + } + + cn.countMsgSent(packet.Name(), 1) + if err := ethp2p.Send(cn.rw, eth.NewBlockMsg, &packet); err != nil { + cn.logger.Debug(). + Err(err). + Uint64("number", block.Number().Uint64()). + Msg("Failed to send block") + continue + } + + // Mark block as known for this peer + cn.addKnownBlock(hash) count++ } + if count > 0 { + log.Debug(). + Int("peers", count). + Uint64("number", block.NumberU64()). + Msg("Broadcasted block") + } + + return count +} + +// BroadcastBlockHashes broadcasts block hashes with their corresponding block +// numbers to peers that don't already know about them and returns the number +// of peers the hashes were successfully sent to. If broadcast flags are disabled, this is a no-op. +func (c *Conns) BroadcastBlockHashes(hashes []common.Hash, numbers []uint64) int { + if !c.shouldBroadcastBlockHashes { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + if len(hashes) == 0 || len(hashes) != len(numbers) { + return 0 + } + + count := 0 + for _, cn := range c.conns { + if cn.sendBlockHashes(hashes, numbers) { + count++ + } + } + + if count > 0 { + log.Debug(). + Int("peers", count). + Int("hashes", len(hashes)). + Msg("Broadcasted block hashes") + } + return count } @@ -128,6 +341,16 @@ func (c *Conns) PeerConnectedAt(peerID string) time.Time { return time.Time{} } +// AddTx adds a transaction to the shared cache for duplicate detection and serving. +func (c *Conns) AddTx(hash common.Hash, tx *types.Transaction) { + c.txs.Add(hash, tx) +} + +// GetTx retrieves a transaction from the shared cache. +func (c *Conns) GetTx(hash common.Hash) (*types.Transaction, bool) { + return c.txs.Get(hash) +} + // Blocks returns the global blocks cache. func (c *Conns) Blocks() *Cache[common.Hash, BlockCache] { return c.blocks @@ -156,6 +379,36 @@ func (c *Conns) UpdateHeadBlock(packet eth.NewBlockPacket) bool { }) } +// KnownTxsOpts returns the cache options for per-peer known tx caches. +func (c *Conns) KnownTxsOpts() CacheOptions { + return c.knownTxsOpts +} + +// KnownBlocksOpts returns the cache options for per-peer known block caches. +func (c *Conns) KnownBlocksOpts() CacheOptions { + return c.knownBlocksOpts +} + +// ShouldBroadcastTx returns whether full transaction broadcasting is enabled. +func (c *Conns) ShouldBroadcastTx() bool { + return c.shouldBroadcastTx +} + +// ShouldBroadcastTxHashes returns whether transaction hash broadcasting is enabled. +func (c *Conns) ShouldBroadcastTxHashes() bool { + return c.shouldBroadcastTxHashes +} + +// ShouldBroadcastBlocks returns whether full block broadcasting is enabled. +func (c *Conns) ShouldBroadcastBlocks() bool { + return c.shouldBroadcastBlocks +} + +// ShouldBroadcastBlockHashes returns whether block hash broadcasting is enabled. +func (c *Conns) ShouldBroadcastBlockHashes() bool { + return c.shouldBroadcastBlockHashes +} + // GetPeerMessages returns a snapshot of message counts for a specific peer. // Returns nil if the peer is not found. func (c *Conns) GetPeerMessages(peerID string) *PeerMessages { diff --git a/p2p/protocol.go b/p2p/protocol.go index da7895915..7262ae0fb 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -51,6 +51,16 @@ type conn struct { // peerURL is cached to avoid repeated URLv4() calls. peerURL string + // Broadcast flags control what gets rebroadcasted to other peers + shouldBroadcastTx bool + shouldBroadcastTxHashes bool + shouldBroadcastBlocks bool + shouldBroadcastBlockHashes bool + + // Known caches track what this peer has seen to avoid redundant sends. + knownTxs *Cache[common.Hash, struct{}] + knownBlocks *Cache[common.Hash, struct{}] + // messages tracks per-peer message counts for API visibility. messages *PeerMessages } @@ -69,6 +79,12 @@ type EthProtocolOptions struct { // Cache configurations RequestsCache CacheOptions ParentsCache CacheOptions + + // Broadcast flags control what gets rebroadcasted to other peers + ShouldBroadcastTx bool + ShouldBroadcastTxHashes bool + ShouldBroadcastBlocks bool + ShouldBroadcastBlockHashes bool } // NewEthProtocol creates the new eth protocol. This will handle writing the @@ -81,19 +97,25 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol { Run: func(p *ethp2p.Peer, rw ethp2p.MsgReadWriter) error { peerURL := p.Node().URLv4() c := &conn{ - sensorID: opts.SensorID, - node: p.Node(), - logger: log.With().Str("peer", peerURL).Logger(), - rw: rw, - db: opts.Database, - requests: NewCache[uint64, common.Hash](opts.RequestsCache), - requestNum: 0, - parents: NewCache[common.Hash, struct{}](opts.ParentsCache), - peer: p, - conns: opts.Conns, - connectedAt: time.Now(), - peerURL: peerURL, - messages: NewPeerMessages(), + sensorID: opts.SensorID, + node: p.Node(), + logger: log.With().Str("peer", peerURL).Logger(), + rw: rw, + db: opts.Database, + requests: NewCache[uint64, common.Hash](opts.RequestsCache), + requestNum: 0, + parents: NewCache[common.Hash, struct{}](opts.ParentsCache), + peer: p, + conns: opts.Conns, + connectedAt: time.Now(), + peerURL: peerURL, + shouldBroadcastTx: opts.ShouldBroadcastTx, + shouldBroadcastTxHashes: opts.ShouldBroadcastTxHashes, + shouldBroadcastBlocks: opts.ShouldBroadcastBlocks, + shouldBroadcastBlockHashes: opts.ShouldBroadcastBlockHashes, + knownTxs: NewCache[common.Hash, struct{}](opts.Conns.KnownTxsOpts()), + knownBlocks: NewCache[common.Hash, struct{}](opts.Conns.KnownBlocksOpts()), + messages: NewPeerMessages(), } head := c.conns.HeadBlock() @@ -340,12 +362,16 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), float64(len(packet))) - // Collect unique hashes for database write. + // Collect unique hashes and numbers for database write and broadcasting. uniqueHashes := make([]common.Hash, 0, len(packet)) + uniqueNumbers := make([]uint64, 0, len(packet)) for _, entry := range packet { hash := entry.Hash + // Mark as known from this peer + c.addKnownBlock(hash) + // Check what parts of the block we already have cache, ok := c.conns.Blocks().Get(hash) if ok { @@ -362,16 +388,97 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.conns.Blocks().Add(hash, BlockCache{}) uniqueHashes = append(uniqueHashes, hash) + uniqueNumbers = append(uniqueNumbers, entry.Number) } // Write only unique hashes to the database. - if len(uniqueHashes) > 0 { - c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs) + if len(uniqueHashes) == 0 { + return nil } + c.db.WriteBlockHashes(ctx, c.node, uniqueHashes, tfs) + + // Broadcast block hashes to other peers + c.conns.BroadcastBlockHashes(uniqueHashes, uniqueNumbers) + return nil } +// addKnownTx adds a transaction hash to the known tx cache. +func (c *conn) addKnownTx(hash common.Hash) { + if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { + return + } + + c.knownTxs.Add(hash, struct{}{}) +} + +// addKnownBlock adds a block hash to the known block cache. +func (c *conn) addKnownBlock(hash common.Hash) { + if !c.shouldBroadcastBlocks && !c.shouldBroadcastBlockHashes { + return + } + + c.knownBlocks.Add(hash, struct{}{}) +} + +// hasKnownTx checks if a transaction hash is in the known tx cache. +func (c *conn) hasKnownTx(hash common.Hash) bool { + if !c.shouldBroadcastTx && !c.shouldBroadcastTxHashes { + return false + } + + return c.knownTxs.Contains(hash) +} + +// hasKnownBlock checks if a block hash is in the known block cache. +func (c *conn) hasKnownBlock(hash common.Hash) bool { + if !c.shouldBroadcastBlocks && !c.shouldBroadcastBlockHashes { + return false + } + + return c.knownBlocks.Contains(hash) +} + +// sendBlockHashes sends block hashes to a peer, filtering out hashes the peer +// already knows about. Returns true if the send was successful. +func (c *conn) sendBlockHashes(hashes []common.Hash, numbers []uint64) bool { + // Filter hashes this peer doesn't know about + unknownHashes := make([]common.Hash, 0, len(hashes)) + unknownNumbers := make([]uint64, 0, len(numbers)) + + for i, hash := range hashes { + if !c.hasKnownBlock(hash) { + unknownHashes = append(unknownHashes, hash) + unknownNumbers = append(unknownNumbers, numbers[i]) + } + } + + if len(unknownHashes) == 0 { + return false + } + + // Send NewBlockHashesPacket + packet := make(eth.NewBlockHashesPacket, len(unknownHashes)) + for i := range unknownHashes { + packet[i].Hash = unknownHashes[i] + packet[i].Number = unknownNumbers[i] + } + + c.countMsgSent(packet.Name(), float64(len(unknownHashes))) + if err := ethp2p.Send(c.rw, eth.NewBlockHashesMsg, packet); err != nil { + c.logger.Debug().Err(err).Msg("Failed to send block hashes") + return false + } + + // Mark hashes as known for this peer + for _, hash := range unknownHashes { + c.addKnownBlock(hash) + } + + return true +} + func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { payload, err := io.ReadAll(msg.Payload) if err != nil { @@ -389,10 +496,26 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived((ð.TransactionsPacket{}).Name(), float64(len(txs))) + // Mark transactions as known from this peer + for _, tx := range txs { + c.addKnownTx(tx.Hash()) + } + if len(txs) > 0 { c.db.WriteTransactions(ctx, c.node, txs, tfs) } + // Cache transactions for duplicate detection and serving to peers + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + c.conns.AddTx(tx.Hash(), tx) + hashes[i] = tx.Hash() + } + + // Broadcast transactions or hashes to other peers + c.conns.BroadcastTxs(types.Transactions(txs)) + c.conns.BroadcastTxHashes(hashes) + return nil } @@ -404,8 +527,17 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), 1) - response := ð.BlockHeadersPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache if we have the block + var headers []*types.Header + if cache, ok := c.conns.Blocks().Peek(request.Origin.Hash); ok && cache.Header != nil { + headers = []*types.Header{cache.Header} + } + + response := ð.BlockHeadersPacket{ + RequestId: request.RequestId, + BlockHeadersRequest: headers, + } + c.countMsgSent(response.Name(), float64(len(headers))) return ethp2p.Send(c.rw, eth.BlockHeadersMsg, response) } @@ -455,8 +587,19 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), float64(len(request.GetBlockBodiesRequest))) - response := ð.BlockBodiesPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache + var bodies []*eth.BlockBody + for _, hash := range request.GetBlockBodiesRequest { + if cache, ok := c.conns.Blocks().Peek(hash); ok && cache.Body != nil { + bodies = append(bodies, cache.Body) + } + } + + response := ð.BlockBodiesPacket{ + RequestId: request.RequestId, + BlockBodiesResponse: bodies, + } + c.countMsgSent(response.Name(), float64(len(bodies))) return ethp2p.Send(c.rw, eth.BlockBodiesMsg, response) } @@ -533,6 +676,9 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { c.countMsgReceived(packet.Name(), 1) + // Mark block as known from this peer + c.addKnownBlock(hash) + // Set the head block if newer. if c.conns.UpdateHeadBlock(*packet) { c.logger.Info(). @@ -564,6 +710,13 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { TD: packet.TD, }) + // Broadcast block or block hash to other peers + c.conns.BroadcastBlock(packet.Block, packet.TD) + c.conns.BroadcastBlockHashes( + []common.Hash{hash}, + []uint64{packet.Block.Number().Uint64()}, + ) + return nil } @@ -575,8 +728,19 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error { c.countMsgReceived(request.Name(), float64(len(request.GetPooledTransactionsRequest))) - response := ð.PooledTransactionsPacket{RequestId: request.RequestId} - c.countMsgSent(response.Name(), 0) + // Try to serve from cache + var txs []*types.Transaction + for _, hash := range request.GetPooledTransactionsRequest { + if tx, ok := c.conns.GetTx(hash); ok { + txs = append(txs, tx) + } + } + + response := ð.PooledTransactionsPacket{ + RequestId: request.RequestId, + PooledTransactionsResponse: txs, + } + c.countMsgSent(response.Name(), float64(len(txs))) return ethp2p.Send(c.rw, eth.PooledTransactionsMsg, response) } @@ -627,10 +791,26 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err c.countMsgReceived(packet.Name(), float64(len(packet.PooledTransactionsResponse))) + // Mark transactions as known from this peer + for _, tx := range packet.PooledTransactionsResponse { + c.addKnownTx(tx.Hash()) + } + if len(packet.PooledTransactionsResponse) > 0 { c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse, tfs) } + // Cache transactions for duplicate detection and serving to peers + hashes := make([]common.Hash, len(packet.PooledTransactionsResponse)) + for i, tx := range packet.PooledTransactionsResponse { + c.conns.AddTx(tx.Hash(), tx) + hashes[i] = tx.Hash() + } + + // Broadcast transactions or hashes to other peers + c.conns.BroadcastTxs(types.Transactions(packet.PooledTransactionsResponse)) + c.conns.BroadcastTxHashes(hashes) + return nil }