diff --git a/example_config.yaml b/example_config.yaml index 619339f..ea1ae3b 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -43,6 +43,13 @@ processors: staleThreshold: "5m" # Time after which a block is considered stale checkInterval: "1m" # How often to check for stale blocks + # Gap detection (optional, disabled by default) + gapDetection: + enabled: false + scanInterval: "5m" # How often to scan for gaps + batchSize: 10 # Max gaps to process per scan + lookbackRange: 10000 # Max blocks to look back, 0 = unlimited + # Leader election configuration (optional, enabled by default) leaderElection: enabled: true diff --git a/pkg/processor/config.go b/pkg/processor/config.go index 95a2a55..3daf998 100644 --- a/pkg/processor/config.go +++ b/pkg/processor/config.go @@ -10,6 +10,21 @@ import ( "github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog_agg" ) +// GapDetectionConfig holds configuration for gap detection. +type GapDetectionConfig struct { + // Enabled enables gap detection (default: false) + Enabled bool `yaml:"enabled"` + + // ScanInterval is how often to scan for gaps (default: 5m) + ScanInterval time.Duration `yaml:"scanInterval"` + + // BatchSize is max gaps to process per scan (default: 10) + BatchSize int `yaml:"batchSize"` + + // LookbackRange is max blocks to look back, 0 = unlimited (default: 10000) + LookbackRange uint64 `yaml:"lookbackRange"` +} + // Config holds the unified processor configuration. type Config struct { // Processing interval @@ -31,6 +46,9 @@ type Config struct { // Stale block detection configuration StaleBlockDetection StaleBlockDetectionConfig `yaml:"staleBlockDetection"` + // Gap detection configuration + GapDetection GapDetectionConfig `yaml:"gapDetection"` + // Processor configurations TransactionStructlog structlog.Config `yaml:"transactionStructlog"` TransactionSimple simple.Config `yaml:"transactionSimple"` @@ -126,6 +144,19 @@ func (c *Config) Validate() error { c.StaleBlockDetection.CheckInterval = DefaultStaleBlockCheckInterval } + // Set gap detection defaults (disabled by default, opt-in) + if c.GapDetection.ScanInterval == 0 { + c.GapDetection.ScanInterval = DefaultGapScanInterval + } + + if c.GapDetection.BatchSize == 0 { + c.GapDetection.BatchSize = DefaultGapBatchSize + } + + if c.GapDetection.LookbackRange == 0 { + c.GapDetection.LookbackRange = DefaultGapLookbackRange + } + if c.TransactionStructlog.Enabled { if c.TransactionStructlog.Addr == "" { return fmt.Errorf("transaction structlog addr is required when enabled") diff --git a/pkg/processor/defaults.go b/pkg/processor/defaults.go index a11308c..395ebb7 100644 --- a/pkg/processor/defaults.go +++ b/pkg/processor/defaults.go @@ -37,4 +37,13 @@ const ( // DefaultBackpressureJitterFraction is the fraction of backoff to add as random jitter (0.25 = 25%). DefaultBackpressureJitterFraction = 0.25 + + // DefaultGapScanInterval is the default interval for scanning gaps. + DefaultGapScanInterval = 5 * time.Minute + + // DefaultGapBatchSize is the default max gaps to process per scan. + DefaultGapBatchSize = 10 + + // DefaultGapLookbackRange is the default max blocks to look back for gaps. + DefaultGapLookbackRange = uint64(10000) ) diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index c0ede65..4f437e1 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -709,11 +709,17 @@ func (m *Manager) runBlockProcessing(ctx context.Context) { staleBlockTicker := time.NewTicker(m.config.StaleBlockDetection.CheckInterval) defer staleBlockTicker.Stop() + // Gap detection ticker + gapScanTicker := time.NewTicker(m.config.GapDetection.ScanInterval) + defer gapScanTicker.Stop() + m.log.WithFields(logrus.Fields{ "interval": m.config.Interval, "processors": len(m.processors), "stale_check_interval": m.config.StaleBlockDetection.CheckInterval, "stale_threshold": m.config.StaleBlockDetection.StaleThreshold, + "gap_scan_interval": m.config.GapDetection.ScanInterval, + "gap_detection": m.config.GapDetection.Enabled, }).Info("Started block processing loop") // Check if we have any processors @@ -751,6 +757,9 @@ func (m *Manager) runBlockProcessing(ctx context.Context) { case <-staleBlockTicker.C: m.log.Debug("Stale block detection ticker fired") m.checkStaleBlocks(ctx) + case <-gapScanTicker.C: + m.log.Debug("Gap detection ticker fired") + m.checkGaps(ctx) default: if !m.isLeader { m.log.Warn("No longer leader but block processing still running - stopping") @@ -1406,3 +1415,84 @@ func (m *Manager) checkStaleBlocks(ctx context.Context) { } } } + +// checkGaps checks for gaps in block processing across all processors and triggers reprocessing. +func (m *Manager) checkGaps(ctx context.Context) { + if !m.config.GapDetection.Enabled { + return + } + + // Get current block from execution node + node := m.pool.GetHealthyExecutionNode() + if node == nil { + m.log.Warn("No healthy execution node for gap scan") + + return + } + + latestBlockNum, err := node.BlockNumber(ctx) + if err != nil || latestBlockNum == nil { + m.log.WithError(err).Warn("Failed to get current block for gap scan") + + return + } + + currentBlock := *latestBlockNum + + // Check gaps for each enabled processor + for processorName, processor := range m.processors { + // Check for context cancellation between processors + select { + case <-ctx.Done(): + return + default: + } + + // Get the limiter from each processor + var limiter *tracker.Limiter + + switch p := processor.(type) { + case *transaction_structlog.Processor: + limiter = p.Limiter + case *transaction_simple.Processor: + limiter = p.Limiter + case *transaction_structlog_agg.Processor: + limiter = p.Limiter + } + + if limiter == nil { + continue + } + + gaps, gapErr := limiter.GetGaps( + ctx, + currentBlock, + m.config.GapDetection.LookbackRange, + m.config.GapDetection.BatchSize, + ) + if gapErr != nil { + m.log.WithError(gapErr).WithField("processor", processorName).Warn("Failed to scan for gaps") + + continue + } + + if len(gaps) == 0 { + continue + } + + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "gap_count": len(gaps), + "current_block": currentBlock, + }).Info("Detected gaps, reprocessing") + + for _, gapBlock := range gaps { + if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil { + m.log.WithError(reprocessErr).WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + }).Warn("Failed to reprocess gap block") + } + } + } +} diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index 8ef4466..a1260be 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -3,6 +3,7 @@ package tracker import ( "context" "fmt" + "math/big" "github.com/ethpandaops/execution-processor/pkg/common" "github.com/sirupsen/logrus" @@ -15,6 +16,13 @@ type StateProvider interface { MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error } +// GapStateProvider extends StateProvider with gap detection capabilities. +type GapStateProvider interface { + StateProvider + GetIncompleteBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error) + GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (*big.Int, *big.Int, error) +} + // LimiterConfig holds configuration for the Limiter. type LimiterConfig struct { MaxPendingBlockRange int @@ -207,3 +215,76 @@ func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint6 return nil } + +// GetGaps returns incomplete blocks outside the maxPendingBlockRange window. +// If lookbackRange is 0, scans from the oldest stored block. +// This performs a full-range scan for gap detection, excluding the recent window +// that is already handled by IsBlockedByIncompleteBlocks. +func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) ([]uint64, error) { + gapProvider, ok := l.stateProvider.(GapStateProvider) + if !ok { + return nil, fmt.Errorf("state provider does not support gap detection") + } + + var minBlock uint64 + + if lookbackRange == 0 { + // Unlimited: scan from oldest stored block + minStored, _, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor) + if err != nil { + return nil, fmt.Errorf("failed to get min stored block: %w", err) + } + + if minStored == nil { + // No blocks stored yet + return nil, nil + } + + minBlock = minStored.Uint64() + } else { + // Limited: scan from currentBlock - lookbackRange + if currentBlock > lookbackRange { + minBlock = currentBlock - lookbackRange + } + } + + // Calculate maxBlock to exclude the window handled by IsBlockedByIncompleteBlocks. + // The limiter already handles blocks within [currentBlock - maxPendingBlockRange, currentBlock], + // so we only scan up to (currentBlock - maxPendingBlockRange - 1) to avoid double work. + maxBlock := currentBlock + + if l.config.MaxPendingBlockRange > 0 { + exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config + + if currentBlock > exclusionWindow { + maxBlock = currentBlock - exclusionWindow - 1 + } else { + // Current block is within the exclusion window, nothing to scan + return nil, nil + } + } + + // Ensure minBlock doesn't exceed maxBlock + if minBlock > maxBlock { + return nil, nil + } + + gaps, err := gapProvider.GetIncompleteBlocksInRange( + ctx, l.network, l.processor, + minBlock, maxBlock, limit, + ) + if err != nil { + return nil, fmt.Errorf("failed to get incomplete blocks in range: %w", err) + } + + if len(gaps) > 0 { + l.log.WithFields(logrus.Fields{ + "min_block": minBlock, + "max_block": maxBlock, + "gap_count": len(gaps), + "first_gap": gaps[0], + }).Debug("Found gaps in block range") + } + + return gaps, nil +} diff --git a/pkg/processor/tracker/limiter_gap_test.go b/pkg/processor/tracker/limiter_gap_test.go new file mode 100644 index 0000000..45f0c5b --- /dev/null +++ b/pkg/processor/tracker/limiter_gap_test.go @@ -0,0 +1,292 @@ +package tracker + +import ( + "context" + "math/big" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockGapStateProvider implements GapStateProvider for testing. +type mockGapStateProvider struct { + mockStateProvider + incompleteBlocksInRange []uint64 + minStoredBlock *big.Int + maxStoredBlock *big.Int + getIncompleteErr error + getMinMaxErr error +} + +func (m *mockGapStateProvider) GetIncompleteBlocksInRange( + _ context.Context, _, _ string, _, _ uint64, _ int, +) ([]uint64, error) { + if m.getIncompleteErr != nil { + return nil, m.getIncompleteErr + } + + return m.incompleteBlocksInRange, nil +} + +func (m *mockGapStateProvider) GetMinMaxStoredBlocks( + _ context.Context, _, _ string, +) (*big.Int, *big.Int, error) { + if m.getMinMaxErr != nil { + return nil, nil, m.getMinMaxErr + } + + return m.minStoredBlock, m.maxStoredBlock, nil +} + +func TestGetGaps_FindsMissingBlocks(t *testing.T) { + // Setup: blocks 5,6,7,9,10...100 are complete, block 8 is incomplete + // With maxPendingBlockRange=2 and currentBlock=100, gap scanner looks at blocks up to 97 + // (excluding the window 98-100 that the limiter handles) + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(5), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{8}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) // Unlimited + + // Gap scanner searches [5, 97] (excludes 98-100 handled by limiter) + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{8}, gaps) +} + +func TestGetGaps_RespectsLookbackRange(t *testing.T) { + mockProvider := &mockGapStateProvider{ + // GetMinMaxStoredBlocks should NOT be called when lookbackRange is set + incompleteBlocksInRange: []uint64{75, 80}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(50) // Only look back 50 blocks + + // Should query from block 50 (100 - 50) to 100 + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{75, 80}, gaps) +} + +func TestGetGaps_NoGaps(t *testing.T) { + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Empty(t, gaps) +} + +func TestGetGaps_DoesNotLookBeforeOldestStoredBlock(t *testing.T) { + // Ensure we don't query for blocks before the oldest stored block + mockProvider := &mockGapStateProvider{ + // Oldest stored is 50, so we should only query from 50 onwards + minStoredBlock: big.NewInt(50), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) // Unlimited + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Empty(t, gaps) +} + +func TestGetGaps_NoBlocksStored(t *testing.T) { + mockProvider := &mockGapStateProvider{ + minStoredBlock: nil, // No blocks stored + maxStoredBlock: nil, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Nil(t, gaps) +} + +func TestGetGaps_StateProviderDoesNotSupportGapDetection(t *testing.T) { + // Use the basic mockStateProvider which doesn't implement GapStateProvider + mockProvider := &mockStateProvider{} + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.Error(t, err) + assert.Contains(t, err.Error(), "state provider does not support gap detection") + assert.Nil(t, gaps) +} + +func TestGetGaps_MultipleGaps(t *testing.T) { + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{5, 10, 15, 20, 25}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{5, 10, 15, 20, 25}, gaps) +} + +func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) { + // When lookbackRange is greater than currentBlock, minBlock should be 0 + // With maxPendingBlockRange=2 and currentBlock=10, gap scanner looks at blocks up to 7 + mockProvider := &mockGapStateProvider{ + incompleteBlocksInRange: []uint64{3}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(10) + lookbackRange := uint64(100) // Greater than currentBlock + + // Gap scanner searches [0, 7] (excludes 8-10 handled by limiter) + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{3}, gaps) +} + +func TestGetGaps_ExcludesMaxPendingBlockRangeWindow(t *testing.T) { + // Verify that gaps within maxPendingBlockRange are NOT returned + // because they're already handled by IsBlockedByIncompleteBlocks + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{50}, // Gap at block 50, outside the exclusion window + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 5}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + // With maxPendingBlockRange=5, gap scanner searches [1, 94] (excludes 95-100) + // Block 50 should be found since it's outside the exclusion window + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{50}, gaps) +} + +func TestGetGaps_CurrentBlockWithinExclusionWindow(t *testing.T) { + // When currentBlock is smaller than or equal to maxPendingBlockRange, + // there's nothing to scan outside the exclusion window + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(5), + incompleteBlocksInRange: []uint64{2}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 10}) // Larger than currentBlock + + ctx := context.Background() + currentBlock := uint64(5) + lookbackRange := uint64(0) + + // currentBlock (5) <= maxPendingBlockRange (10), so nothing to scan + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Nil(t, gaps) +} diff --git a/pkg/state/manager.go b/pkg/state/manager.go index 9181c02..cba4c01 100644 --- a/pkg/state/manager.go +++ b/pkg/state/manager.go @@ -683,6 +683,42 @@ func (s *Manager) GetIncompleteBlocks(ctx context.Context, network, processor st return blocks, nil } +// GetIncompleteBlocksInRange returns all incomplete blocks between minBlock and maxBlock. +// This is used for gap detection across the full processed range. +func (s *Manager) GetIncompleteBlocksInRange( + ctx context.Context, + network, processor string, + minBlock, maxBlock uint64, + limit int, +) ([]uint64, error) { + query := fmt.Sprintf(` + SELECT block_number + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND complete = 0 + AND block_number >= %d + AND block_number <= %d + ORDER BY block_number ASC + LIMIT %d + `, s.storageTable, processor, network, minBlock, maxBlock, limit) + + s.log.WithFields(logrus.Fields{ + "processor": processor, + "network": network, + "min_block": minBlock, + "max_block": maxBlock, + "limit": limit, + }).Debug("Querying for incomplete blocks in range") + + blocks, err := s.storageClient.QueryUInt64Slice(ctx, query, "block_number") + if err != nil { + return nil, fmt.Errorf("failed to get incomplete blocks in range: %w", err) + } + + return blocks, nil +} + func (s *Manager) GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (minBlock, maxBlock *big.Int, err error) { query := fmt.Sprintf(` SELECT min(block_number) as min, max(block_number) as max