Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions cmd/p2p/sensor/api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sensor

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand Down Expand Up @@ -57,7 +58,8 @@ type apiData struct {
// handleAPI sets up the API for interacting with the sensor. All endpoints
// return information about the sensor node and all connected peers, including
// the types and counts of eth packets sent and received by each peer.
func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
// The server gracefully shuts down when the context is cancelled.
func handleAPI(ctx context.Context, server *ethp2p.Server, conns *p2p.Conns) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Expand Down Expand Up @@ -117,7 +119,16 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
})

addr := fmt.Sprintf(":%d", inputSensorParams.APIPort)
if err := http.ListenAndServe(addr, mux); err != nil {
httpServer := &http.Server{Addr: addr, Handler: mux}

go func() {
<-ctx.Done()
if err := httpServer.Shutdown(context.Background()); err != nil {
log.Error().Err(err).Msg("Failed to shutdown API server")
}
}()

if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("Failed to start API handler")
}
}
Expand Down
15 changes: 13 additions & 2 deletions cmd/p2p/sensor/rpc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sensor

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -41,7 +42,8 @@ type rpcError struct {
// It handles eth_sendRawTransaction requests, validates transaction signatures,
// and broadcasts valid transactions to all connected peers.
// Supports both single requests and batch requests per JSON-RPC 2.0 specification.
func handleRPC(conns *p2p.Conns, networkID uint64) {
// The server gracefully shuts down when the context is cancelled.
func handleRPC(ctx context.Context, conns *p2p.Conns, networkID uint64) {
// Use network ID as chain ID for signature validation
chainID := new(big.Int).SetUint64(networkID)

Expand Down Expand Up @@ -85,8 +87,17 @@ func handleRPC(conns *p2p.Conns, networkID uint64) {
})

addr := fmt.Sprintf(":%d", inputSensorParams.RPCPort)
server := &http.Server{Addr: addr, Handler: mux}

go func() {
<-ctx.Done()
if err := server.Shutdown(context.Background()); err != nil {
log.Error().Err(err).Msg("Failed to shutdown RPC server")
}
}()

log.Info().Str("addr", addr).Msg("Starting JSON-RPC server")
if err := http.ListenAndServe(addr, mux); err != nil {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("Failed to start RPC server")
}
}
Expand Down
121 changes: 94 additions & 27 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type (
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
ShouldBroadcastTx bool
ShouldBroadcastTxHashes bool
ShouldBroadcastBlocks bool
ShouldBroadcastBlockHashes bool
ShouldRunPprof bool
PprofPort uint
ShouldRunPrometheus bool
Expand All @@ -74,6 +78,9 @@ type (
RequestsCache p2p.CacheOptions
ParentsCache p2p.CacheOptions
BlocksCache p2p.CacheOptions
TxsCache p2p.CacheOptions
KnownTxsCache p2p.CacheOptions
KnownBlocksCache p2p.CacheOptions

bootnodes []*enode.Node
staticNodes []*enode.Node
Expand Down Expand Up @@ -195,21 +202,32 @@ var SensorCmd = &cobra.Command{
// Create peer connection manager for broadcasting transactions
// and managing the global blocks cache
conns := p2p.NewConns(p2p.ConnsOptions{
BlocksCache: inputSensorParams.BlocksCache,
Head: head,
BlocksCache: inputSensorParams.BlocksCache,
TxsCache: inputSensorParams.TxsCache,
KnownTxsCache: inputSensorParams.KnownTxsCache,
KnownBlocksCache: inputSensorParams.KnownBlocksCache,
Head: head,
ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx,
ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes,
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
})

opts := p2p.EthProtocolOptions{
Context: cmd.Context(),
Database: db,
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
RPC: inputSensorParams.RPC,
SensorID: inputSensorParams.SensorID,
NetworkID: inputSensorParams.NetworkID,
Conns: conns,
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
RequestsCache: inputSensorParams.RequestsCache,
ParentsCache: inputSensorParams.ParentsCache,
Context: cmd.Context(),
Database: db,
GenesisHash: common.HexToHash(inputSensorParams.GenesisHash),
RPC: inputSensorParams.RPC,
SensorID: inputSensorParams.SensorID,
NetworkID: inputSensorParams.NetworkID,
Conns: conns,
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
RequestsCache: inputSensorParams.RequestsCache,
ParentsCache: inputSensorParams.ParentsCache,
ShouldBroadcastTx: inputSensorParams.ShouldBroadcastTx,
ShouldBroadcastTxHashes: inputSensorParams.ShouldBroadcastTxHashes,
ShouldBroadcastBlocks: inputSensorParams.ShouldBroadcastBlocks,
ShouldBroadcastBlockHashes: inputSensorParams.ShouldBroadcastBlockHashes,
}

config := ethp2p.Config{
Expand Down Expand Up @@ -256,21 +274,25 @@ var SensorCmd = &cobra.Command{
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

// Create a cancellable context for graceful shutdown of background goroutines.
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

if inputSensorParams.ShouldRunPprof {
go handlePprof()
go handlePprof(ctx)
}

if inputSensorParams.ShouldRunPrometheus {
go handlePrometheus()
go handlePrometheus(ctx)
}

go handleAPI(&server, conns)
go handleAPI(ctx, &server, conns)

// Start the RPC server for receiving transactions
go handleRPC(conns, inputSensorParams.NetworkID)
go handleRPC(ctx, conns, inputSensorParams.NetworkID)

// Run DNS discovery immediately at startup.
go handleDNSDiscovery(&server, dnsLock)
go handleDNSDiscovery(ctx, &server, dnsLock)

for {
select {
Expand All @@ -289,11 +311,12 @@ var SensorCmd = &cobra.Command{
log.Error().Err(err).Msg("Failed to write nodes to file")
}
case <-ticker1h.C:
go handleDNSDiscovery(&server, dnsLock)
go handleDNSDiscovery(ctx, &server, dnsLock)
case <-signals:
// This gracefully stops the sensor so that the peers can be written to
// the nodes file.
log.Info().Msg("Stopping sensor...")
cancel()
return nil
case event := <-events:
log.Debug().Any("event", event).Send()
Expand All @@ -307,38 +330,62 @@ var SensorCmd = &cobra.Command{
// handlePprof starts a server for performance profiling using pprof on the
// specified port. This allows for real-time monitoring and analysis of the
// sensor's performance. The port number is configured through
// inputSensorParams.PprofPort. An error is logged if the server fails to start.
func handlePprof() {
// inputSensorParams.PprofPort. The server gracefully shuts down when the
// context is cancelled.
func handlePprof(ctx context.Context) {
addr := fmt.Sprintf(":%d", inputSensorParams.PprofPort)
if err := http.ListenAndServe(addr, nil); err != nil {
server := &http.Server{Addr: addr}

go func() {
<-ctx.Done()
if err := server.Shutdown(context.Background()); err != nil {
log.Error().Err(err).Msg("Failed to shutdown pprof server")
}
}()

if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("Failed to start pprof")
}
}

// handlePrometheus starts a server to expose Prometheus metrics at the /metrics
// endpoint. This enables Prometheus to scrape and collect metrics data for
// monitoring purposes. The port number is configured through
// inputSensorParams.PrometheusPort. An error is logged if the server fails to
// start.
func handlePrometheus() {
http.Handle("/metrics", promhttp.Handler())
// inputSensorParams.PrometheusPort. The server gracefully shuts down when the
// context is cancelled.
func handlePrometheus(ctx context.Context) {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())

addr := fmt.Sprintf(":%d", inputSensorParams.PrometheusPort)
if err := http.ListenAndServe(addr, nil); err != nil {
server := &http.Server{Addr: addr, Handler: mux}

go func() {
<-ctx.Done()
if err := server.Shutdown(context.Background()); err != nil {
log.Error().Err(err).Msg("Failed to shutdown Prometheus server")
}
}()

if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Error().Err(err).Msg("Failed to start Prometheus handler")
}
}

// handleDNSDiscovery performs DNS-based peer discovery and adds new peers to
// the p2p server. It uses an iterator to discover peers incrementally rather
// than loading all nodes at once. The lock channel prevents concurrent runs.
func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
// Discovery stops when the context is cancelled.
func handleDNSDiscovery(ctx context.Context, server *ethp2p.Server, lock chan struct{}) {
if len(inputSensorParams.DiscoveryDNS) == 0 {
return
}

select {
case lock <- struct{}{}:
defer func() { <-lock }()
case <-ctx.Done():
return
default:
log.Warn().Msg("DNS discovery already running, skipping")
return
Expand All @@ -359,6 +406,16 @@ func handleDNSDiscovery(server *ethp2p.Server, lock chan struct{}) {
// Add DNS-discovered peers using the iterator.
count := 0
for iter.Next() {
// Check for context cancellation to stop discovery promptly.
select {
case <-ctx.Done():
log.Info().
Int("discovered_peers", count).
Msg("DNS discovery interrupted")
return
default:
}

node := iter.Node()
log.Debug().
Str("enode", node.URLv4()).
Expand Down Expand Up @@ -449,6 +506,10 @@ will result in less chance of missing data but can significantly increase memory
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
`write transaction events to database (this option can significantly increase CPU and memory usage)`)
f.BoolVar(&inputSensorParams.ShouldWritePeers, "write-peers", true, "write peers to database")
f.BoolVar(&inputSensorParams.ShouldBroadcastTx, "broadcast-txs", false, "broadcast full transactions to peers")
f.BoolVar(&inputSensorParams.ShouldBroadcastTxHashes, "broadcast-tx-hashes", false, "broadcast transaction hashes to peers")
f.BoolVar(&inputSensorParams.ShouldBroadcastBlocks, "broadcast-blocks", false, "broadcast full blocks to peers")
f.BoolVar(&inputSensorParams.ShouldBroadcastBlockHashes, "broadcast-block-hashes", false, "broadcast block hashes to peers")
f.BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "run pprof server")
f.UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "port pprof runs on")
f.BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "run Prometheus server")
Expand Down Expand Up @@ -482,4 +543,10 @@ will result in less chance of missing data but can significantly increase memory
f.DurationVar(&inputSensorParams.ParentsCache.TTL, "parents-cache-ttl", 5*time.Minute, "time to live for parent hash cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.BlocksCache.MaxSize, "max-blocks", 1024, "maximum blocks to track across all peers (0 for no limit)")
f.DurationVar(&inputSensorParams.BlocksCache.TTL, "blocks-cache-ttl", 10*time.Minute, "time to live for block cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.TxsCache.MaxSize, "max-txs", 8192, "maximum transactions to cache for serving to peers (0 for no limit)")
f.DurationVar(&inputSensorParams.TxsCache.TTL, "txs-cache-ttl", 10*time.Minute, "time to live for transaction cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.KnownTxsCache.MaxSize, "max-known-txs", 8192, "maximum transaction hashes to track per peer (0 for no limit)")
f.DurationVar(&inputSensorParams.KnownTxsCache.TTL, "known-txs-cache-ttl", 5*time.Minute, "time to live for known transaction cache entries (0 for no expiration)")
f.IntVar(&inputSensorParams.KnownBlocksCache.MaxSize, "max-known-blocks", 1024, "maximum block hashes to track per peer (0 for no limit)")
f.DurationVar(&inputSensorParams.KnownBlocksCache.TTL, "known-blocks-cache-ttl", 5*time.Minute, "time to live for known block cache entries (0 for no expiration)")
}
Loading