Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ processors:
staleThreshold: "5m" # Time after which a block is considered stale
checkInterval: "1m" # How often to check for stale blocks

# Gap detection (optional, disabled by default)
gapDetection:
enabled: false
scanInterval: "5m" # How often to scan for gaps
batchSize: 10 # Max gaps to process per scan
lookbackRange: 10000 # Max blocks to look back, 0 = unlimited

# Leader election configuration (optional, enabled by default)
leaderElection:
enabled: true
Expand Down
31 changes: 31 additions & 0 deletions pkg/processor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@ import (
"github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog_agg"
)

// GapDetectionConfig holds configuration for gap detection.
type GapDetectionConfig struct {
// Enabled enables gap detection (default: false)
Enabled bool `yaml:"enabled"`

// ScanInterval is how often to scan for gaps (default: 5m)
ScanInterval time.Duration `yaml:"scanInterval"`

// BatchSize is max gaps to process per scan (default: 10)
BatchSize int `yaml:"batchSize"`

// LookbackRange is max blocks to look back, 0 = unlimited (default: 10000)
LookbackRange uint64 `yaml:"lookbackRange"`
}

// Config holds the unified processor configuration.
type Config struct {
// Processing interval
Expand All @@ -31,6 +46,9 @@ type Config struct {
// Stale block detection configuration
StaleBlockDetection StaleBlockDetectionConfig `yaml:"staleBlockDetection"`

// Gap detection configuration
GapDetection GapDetectionConfig `yaml:"gapDetection"`

// Processor configurations
TransactionStructlog structlog.Config `yaml:"transactionStructlog"`
TransactionSimple simple.Config `yaml:"transactionSimple"`
Expand Down Expand Up @@ -126,6 +144,19 @@ func (c *Config) Validate() error {
c.StaleBlockDetection.CheckInterval = DefaultStaleBlockCheckInterval
}

// Set gap detection defaults (disabled by default, opt-in)
if c.GapDetection.ScanInterval == 0 {
c.GapDetection.ScanInterval = DefaultGapScanInterval
}

if c.GapDetection.BatchSize == 0 {
c.GapDetection.BatchSize = DefaultGapBatchSize
}

if c.GapDetection.LookbackRange == 0 {
c.GapDetection.LookbackRange = DefaultGapLookbackRange
}

if c.TransactionStructlog.Enabled {
if c.TransactionStructlog.Addr == "" {
return fmt.Errorf("transaction structlog addr is required when enabled")
Expand Down
9 changes: 9 additions & 0 deletions pkg/processor/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,13 @@ const (

// DefaultBackpressureJitterFraction is the fraction of backoff to add as random jitter (0.25 = 25%).
DefaultBackpressureJitterFraction = 0.25

// DefaultGapScanInterval is the default interval for scanning gaps.
DefaultGapScanInterval = 5 * time.Minute

// DefaultGapBatchSize is the default max gaps to process per scan.
DefaultGapBatchSize = 10

// DefaultGapLookbackRange is the default max blocks to look back for gaps.
DefaultGapLookbackRange = uint64(10000)
)
90 changes: 90 additions & 0 deletions pkg/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,11 +709,17 @@ func (m *Manager) runBlockProcessing(ctx context.Context) {
staleBlockTicker := time.NewTicker(m.config.StaleBlockDetection.CheckInterval)
defer staleBlockTicker.Stop()

// Gap detection ticker
gapScanTicker := time.NewTicker(m.config.GapDetection.ScanInterval)
defer gapScanTicker.Stop()

m.log.WithFields(logrus.Fields{
"interval": m.config.Interval,
"processors": len(m.processors),
"stale_check_interval": m.config.StaleBlockDetection.CheckInterval,
"stale_threshold": m.config.StaleBlockDetection.StaleThreshold,
"gap_scan_interval": m.config.GapDetection.ScanInterval,
"gap_detection": m.config.GapDetection.Enabled,
}).Info("Started block processing loop")

// Check if we have any processors
Expand Down Expand Up @@ -751,6 +757,9 @@ func (m *Manager) runBlockProcessing(ctx context.Context) {
case <-staleBlockTicker.C:
m.log.Debug("Stale block detection ticker fired")
m.checkStaleBlocks(ctx)
case <-gapScanTicker.C:
m.log.Debug("Gap detection ticker fired")
m.checkGaps(ctx)
default:
if !m.isLeader {
m.log.Warn("No longer leader but block processing still running - stopping")
Expand Down Expand Up @@ -1406,3 +1415,84 @@ func (m *Manager) checkStaleBlocks(ctx context.Context) {
}
}
}

// checkGaps checks for gaps in block processing across all processors and triggers reprocessing.
func (m *Manager) checkGaps(ctx context.Context) {
if !m.config.GapDetection.Enabled {
return
}

// Get current block from execution node
node := m.pool.GetHealthyExecutionNode()
if node == nil {
m.log.Warn("No healthy execution node for gap scan")

return
}

latestBlockNum, err := node.BlockNumber(ctx)
if err != nil || latestBlockNum == nil {
m.log.WithError(err).Warn("Failed to get current block for gap scan")

return
}

currentBlock := *latestBlockNum

// Check gaps for each enabled processor
for processorName, processor := range m.processors {
// Check for context cancellation between processors
select {
case <-ctx.Done():
return
default:
}

// Get the limiter from each processor
var limiter *tracker.Limiter

switch p := processor.(type) {
case *transaction_structlog.Processor:
limiter = p.Limiter
case *transaction_simple.Processor:
limiter = p.Limiter
case *transaction_structlog_agg.Processor:
limiter = p.Limiter
}

if limiter == nil {
continue
}

gaps, gapErr := limiter.GetGaps(
ctx,
currentBlock,
m.config.GapDetection.LookbackRange,
m.config.GapDetection.BatchSize,
)
if gapErr != nil {
m.log.WithError(gapErr).WithField("processor", processorName).Warn("Failed to scan for gaps")

continue
}

if len(gaps) == 0 {
continue
}

m.log.WithFields(logrus.Fields{
"processor": processorName,
"gap_count": len(gaps),
"current_block": currentBlock,
}).Info("Detected gaps, reprocessing")

for _, gapBlock := range gaps {
if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil {
m.log.WithError(reprocessErr).WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
}).Warn("Failed to reprocess gap block")
}
}
}
}
81 changes: 81 additions & 0 deletions pkg/processor/tracker/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tracker
import (
"context"
"fmt"
"math/big"

"github.com/ethpandaops/execution-processor/pkg/common"
"github.com/sirupsen/logrus"
Expand All @@ -15,6 +16,13 @@ type StateProvider interface {
MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error
}

// GapStateProvider extends StateProvider with gap detection capabilities.
type GapStateProvider interface {
StateProvider
GetIncompleteBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error)
GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (*big.Int, *big.Int, error)
}

// LimiterConfig holds configuration for the Limiter.
type LimiterConfig struct {
MaxPendingBlockRange int
Expand Down Expand Up @@ -207,3 +215,76 @@ func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint6

return nil
}

// GetGaps returns incomplete blocks outside the maxPendingBlockRange window.
// If lookbackRange is 0, scans from the oldest stored block.
// This performs a full-range scan for gap detection, excluding the recent window
// that is already handled by IsBlockedByIncompleteBlocks.
func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) ([]uint64, error) {
gapProvider, ok := l.stateProvider.(GapStateProvider)
if !ok {
return nil, fmt.Errorf("state provider does not support gap detection")
}

var minBlock uint64

if lookbackRange == 0 {
// Unlimited: scan from oldest stored block
minStored, _, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor)
if err != nil {
return nil, fmt.Errorf("failed to get min stored block: %w", err)
}

if minStored == nil {
// No blocks stored yet
return nil, nil
}

minBlock = minStored.Uint64()
} else {
// Limited: scan from currentBlock - lookbackRange
if currentBlock > lookbackRange {
minBlock = currentBlock - lookbackRange
}
}

// Calculate maxBlock to exclude the window handled by IsBlockedByIncompleteBlocks.
// The limiter already handles blocks within [currentBlock - maxPendingBlockRange, currentBlock],
// so we only scan up to (currentBlock - maxPendingBlockRange - 1) to avoid double work.
maxBlock := currentBlock

if l.config.MaxPendingBlockRange > 0 {
exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config

if currentBlock > exclusionWindow {
maxBlock = currentBlock - exclusionWindow - 1
} else {
// Current block is within the exclusion window, nothing to scan
return nil, nil
}
}

// Ensure minBlock doesn't exceed maxBlock
if minBlock > maxBlock {
return nil, nil
}

gaps, err := gapProvider.GetIncompleteBlocksInRange(
ctx, l.network, l.processor,
minBlock, maxBlock, limit,
)
if err != nil {
return nil, fmt.Errorf("failed to get incomplete blocks in range: %w", err)
}

if len(gaps) > 0 {
l.log.WithFields(logrus.Fields{
"min_block": minBlock,
"max_block": maxBlock,
"gap_count": len(gaps),
"first_gap": gaps[0],
}).Debug("Found gaps in block range")
}

return gaps, nil
}
Loading