From 0592bc1181edde8262d39af1da4fd398cf008229 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Tue, 3 Feb 2026 13:36:19 +1000 Subject: [PATCH] feat(processor): add gap detection for block processing Add ClickHouse gap detection to find missing blocks between the oldest processed block and the current block. This catches gaps that fall outside the maxPendingBlockRange window. Changes: - Add GetIncompleteBlocksInRange() to state manager for range queries - Add GapStateProvider interface extending StateProvider - Add GetGaps() method to limiter for full-range gap detection - Add GapDetectionConfig with configurable scan interval, batch size, and lookback range - Add periodic gap scanner goroutine to processor - Exclude maxPendingBlockRange window from gap scan to avoid double work The gap scanner runs periodically (default: 5m) and reprocesses any incomplete blocks found outside the limiter's normal detection window. --- example_config.yaml | 7 + pkg/processor/config.go | 31 +++ pkg/processor/defaults.go | 9 + pkg/processor/manager.go | 90 +++++++ pkg/processor/tracker/limiter.go | 81 ++++++ pkg/processor/tracker/limiter_gap_test.go | 292 ++++++++++++++++++++++ pkg/state/manager.go | 36 +++ 7 files changed, 546 insertions(+) create mode 100644 pkg/processor/tracker/limiter_gap_test.go diff --git a/example_config.yaml b/example_config.yaml index 619339f..ea1ae3b 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -43,6 +43,13 @@ processors: staleThreshold: "5m" # Time after which a block is considered stale checkInterval: "1m" # How often to check for stale blocks + # Gap detection (optional, disabled by default) + gapDetection: + enabled: false + scanInterval: "5m" # How often to scan for gaps + batchSize: 10 # Max gaps to process per scan + lookbackRange: 10000 # Max blocks to look back, 0 = unlimited + # Leader election configuration (optional, enabled by default) leaderElection: enabled: true diff --git a/pkg/processor/config.go b/pkg/processor/config.go index 95a2a55..3daf998 100644 --- a/pkg/processor/config.go +++ b/pkg/processor/config.go @@ -10,6 +10,21 @@ import ( "github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog_agg" ) +// GapDetectionConfig holds configuration for gap detection. +type GapDetectionConfig struct { + // Enabled enables gap detection (default: false) + Enabled bool `yaml:"enabled"` + + // ScanInterval is how often to scan for gaps (default: 5m) + ScanInterval time.Duration `yaml:"scanInterval"` + + // BatchSize is max gaps to process per scan (default: 10) + BatchSize int `yaml:"batchSize"` + + // LookbackRange is max blocks to look back, 0 = unlimited (default: 10000) + LookbackRange uint64 `yaml:"lookbackRange"` +} + // Config holds the unified processor configuration. type Config struct { // Processing interval @@ -31,6 +46,9 @@ type Config struct { // Stale block detection configuration StaleBlockDetection StaleBlockDetectionConfig `yaml:"staleBlockDetection"` + // Gap detection configuration + GapDetection GapDetectionConfig `yaml:"gapDetection"` + // Processor configurations TransactionStructlog structlog.Config `yaml:"transactionStructlog"` TransactionSimple simple.Config `yaml:"transactionSimple"` @@ -126,6 +144,19 @@ func (c *Config) Validate() error { c.StaleBlockDetection.CheckInterval = DefaultStaleBlockCheckInterval } + // Set gap detection defaults (disabled by default, opt-in) + if c.GapDetection.ScanInterval == 0 { + c.GapDetection.ScanInterval = DefaultGapScanInterval + } + + if c.GapDetection.BatchSize == 0 { + c.GapDetection.BatchSize = DefaultGapBatchSize + } + + if c.GapDetection.LookbackRange == 0 { + c.GapDetection.LookbackRange = DefaultGapLookbackRange + } + if c.TransactionStructlog.Enabled { if c.TransactionStructlog.Addr == "" { return fmt.Errorf("transaction structlog addr is required when enabled") diff --git a/pkg/processor/defaults.go b/pkg/processor/defaults.go index a11308c..395ebb7 100644 --- a/pkg/processor/defaults.go +++ b/pkg/processor/defaults.go @@ -37,4 +37,13 @@ const ( // DefaultBackpressureJitterFraction is the fraction of backoff to add as random jitter (0.25 = 25%). DefaultBackpressureJitterFraction = 0.25 + + // DefaultGapScanInterval is the default interval for scanning gaps. + DefaultGapScanInterval = 5 * time.Minute + + // DefaultGapBatchSize is the default max gaps to process per scan. + DefaultGapBatchSize = 10 + + // DefaultGapLookbackRange is the default max blocks to look back for gaps. + DefaultGapLookbackRange = uint64(10000) ) diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index c0ede65..4f437e1 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -709,11 +709,17 @@ func (m *Manager) runBlockProcessing(ctx context.Context) { staleBlockTicker := time.NewTicker(m.config.StaleBlockDetection.CheckInterval) defer staleBlockTicker.Stop() + // Gap detection ticker + gapScanTicker := time.NewTicker(m.config.GapDetection.ScanInterval) + defer gapScanTicker.Stop() + m.log.WithFields(logrus.Fields{ "interval": m.config.Interval, "processors": len(m.processors), "stale_check_interval": m.config.StaleBlockDetection.CheckInterval, "stale_threshold": m.config.StaleBlockDetection.StaleThreshold, + "gap_scan_interval": m.config.GapDetection.ScanInterval, + "gap_detection": m.config.GapDetection.Enabled, }).Info("Started block processing loop") // Check if we have any processors @@ -751,6 +757,9 @@ func (m *Manager) runBlockProcessing(ctx context.Context) { case <-staleBlockTicker.C: m.log.Debug("Stale block detection ticker fired") m.checkStaleBlocks(ctx) + case <-gapScanTicker.C: + m.log.Debug("Gap detection ticker fired") + m.checkGaps(ctx) default: if !m.isLeader { m.log.Warn("No longer leader but block processing still running - stopping") @@ -1406,3 +1415,84 @@ func (m *Manager) checkStaleBlocks(ctx context.Context) { } } } + +// checkGaps checks for gaps in block processing across all processors and triggers reprocessing. +func (m *Manager) checkGaps(ctx context.Context) { + if !m.config.GapDetection.Enabled { + return + } + + // Get current block from execution node + node := m.pool.GetHealthyExecutionNode() + if node == nil { + m.log.Warn("No healthy execution node for gap scan") + + return + } + + latestBlockNum, err := node.BlockNumber(ctx) + if err != nil || latestBlockNum == nil { + m.log.WithError(err).Warn("Failed to get current block for gap scan") + + return + } + + currentBlock := *latestBlockNum + + // Check gaps for each enabled processor + for processorName, processor := range m.processors { + // Check for context cancellation between processors + select { + case <-ctx.Done(): + return + default: + } + + // Get the limiter from each processor + var limiter *tracker.Limiter + + switch p := processor.(type) { + case *transaction_structlog.Processor: + limiter = p.Limiter + case *transaction_simple.Processor: + limiter = p.Limiter + case *transaction_structlog_agg.Processor: + limiter = p.Limiter + } + + if limiter == nil { + continue + } + + gaps, gapErr := limiter.GetGaps( + ctx, + currentBlock, + m.config.GapDetection.LookbackRange, + m.config.GapDetection.BatchSize, + ) + if gapErr != nil { + m.log.WithError(gapErr).WithField("processor", processorName).Warn("Failed to scan for gaps") + + continue + } + + if len(gaps) == 0 { + continue + } + + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "gap_count": len(gaps), + "current_block": currentBlock, + }).Info("Detected gaps, reprocessing") + + for _, gapBlock := range gaps { + if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil { + m.log.WithError(reprocessErr).WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + }).Warn("Failed to reprocess gap block") + } + } + } +} diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index 8ef4466..a1260be 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -3,6 +3,7 @@ package tracker import ( "context" "fmt" + "math/big" "github.com/ethpandaops/execution-processor/pkg/common" "github.com/sirupsen/logrus" @@ -15,6 +16,13 @@ type StateProvider interface { MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error } +// GapStateProvider extends StateProvider with gap detection capabilities. +type GapStateProvider interface { + StateProvider + GetIncompleteBlocksInRange(ctx context.Context, network, processor string, minBlock, maxBlock uint64, limit int) ([]uint64, error) + GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (*big.Int, *big.Int, error) +} + // LimiterConfig holds configuration for the Limiter. type LimiterConfig struct { MaxPendingBlockRange int @@ -207,3 +215,76 @@ func (l *Limiter) ValidateBatchWithinLeash(ctx context.Context, startBlock uint6 return nil } + +// GetGaps returns incomplete blocks outside the maxPendingBlockRange window. +// If lookbackRange is 0, scans from the oldest stored block. +// This performs a full-range scan for gap detection, excluding the recent window +// that is already handled by IsBlockedByIncompleteBlocks. +func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRange uint64, limit int) ([]uint64, error) { + gapProvider, ok := l.stateProvider.(GapStateProvider) + if !ok { + return nil, fmt.Errorf("state provider does not support gap detection") + } + + var minBlock uint64 + + if lookbackRange == 0 { + // Unlimited: scan from oldest stored block + minStored, _, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor) + if err != nil { + return nil, fmt.Errorf("failed to get min stored block: %w", err) + } + + if minStored == nil { + // No blocks stored yet + return nil, nil + } + + minBlock = minStored.Uint64() + } else { + // Limited: scan from currentBlock - lookbackRange + if currentBlock > lookbackRange { + minBlock = currentBlock - lookbackRange + } + } + + // Calculate maxBlock to exclude the window handled by IsBlockedByIncompleteBlocks. + // The limiter already handles blocks within [currentBlock - maxPendingBlockRange, currentBlock], + // so we only scan up to (currentBlock - maxPendingBlockRange - 1) to avoid double work. + maxBlock := currentBlock + + if l.config.MaxPendingBlockRange > 0 { + exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config + + if currentBlock > exclusionWindow { + maxBlock = currentBlock - exclusionWindow - 1 + } else { + // Current block is within the exclusion window, nothing to scan + return nil, nil + } + } + + // Ensure minBlock doesn't exceed maxBlock + if minBlock > maxBlock { + return nil, nil + } + + gaps, err := gapProvider.GetIncompleteBlocksInRange( + ctx, l.network, l.processor, + minBlock, maxBlock, limit, + ) + if err != nil { + return nil, fmt.Errorf("failed to get incomplete blocks in range: %w", err) + } + + if len(gaps) > 0 { + l.log.WithFields(logrus.Fields{ + "min_block": minBlock, + "max_block": maxBlock, + "gap_count": len(gaps), + "first_gap": gaps[0], + }).Debug("Found gaps in block range") + } + + return gaps, nil +} diff --git a/pkg/processor/tracker/limiter_gap_test.go b/pkg/processor/tracker/limiter_gap_test.go new file mode 100644 index 0000000..45f0c5b --- /dev/null +++ b/pkg/processor/tracker/limiter_gap_test.go @@ -0,0 +1,292 @@ +package tracker + +import ( + "context" + "math/big" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockGapStateProvider implements GapStateProvider for testing. +type mockGapStateProvider struct { + mockStateProvider + incompleteBlocksInRange []uint64 + minStoredBlock *big.Int + maxStoredBlock *big.Int + getIncompleteErr error + getMinMaxErr error +} + +func (m *mockGapStateProvider) GetIncompleteBlocksInRange( + _ context.Context, _, _ string, _, _ uint64, _ int, +) ([]uint64, error) { + if m.getIncompleteErr != nil { + return nil, m.getIncompleteErr + } + + return m.incompleteBlocksInRange, nil +} + +func (m *mockGapStateProvider) GetMinMaxStoredBlocks( + _ context.Context, _, _ string, +) (*big.Int, *big.Int, error) { + if m.getMinMaxErr != nil { + return nil, nil, m.getMinMaxErr + } + + return m.minStoredBlock, m.maxStoredBlock, nil +} + +func TestGetGaps_FindsMissingBlocks(t *testing.T) { + // Setup: blocks 5,6,7,9,10...100 are complete, block 8 is incomplete + // With maxPendingBlockRange=2 and currentBlock=100, gap scanner looks at blocks up to 97 + // (excluding the window 98-100 that the limiter handles) + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(5), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{8}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) // Unlimited + + // Gap scanner searches [5, 97] (excludes 98-100 handled by limiter) + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{8}, gaps) +} + +func TestGetGaps_RespectsLookbackRange(t *testing.T) { + mockProvider := &mockGapStateProvider{ + // GetMinMaxStoredBlocks should NOT be called when lookbackRange is set + incompleteBlocksInRange: []uint64{75, 80}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(50) // Only look back 50 blocks + + // Should query from block 50 (100 - 50) to 100 + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{75, 80}, gaps) +} + +func TestGetGaps_NoGaps(t *testing.T) { + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Empty(t, gaps) +} + +func TestGetGaps_DoesNotLookBeforeOldestStoredBlock(t *testing.T) { + // Ensure we don't query for blocks before the oldest stored block + mockProvider := &mockGapStateProvider{ + // Oldest stored is 50, so we should only query from 50 onwards + minStoredBlock: big.NewInt(50), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) // Unlimited + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Empty(t, gaps) +} + +func TestGetGaps_NoBlocksStored(t *testing.T) { + mockProvider := &mockGapStateProvider{ + minStoredBlock: nil, // No blocks stored + maxStoredBlock: nil, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Nil(t, gaps) +} + +func TestGetGaps_StateProviderDoesNotSupportGapDetection(t *testing.T) { + // Use the basic mockStateProvider which doesn't implement GapStateProvider + mockProvider := &mockStateProvider{} + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.Error(t, err) + assert.Contains(t, err.Error(), "state provider does not support gap detection") + assert.Nil(t, gaps) +} + +func TestGetGaps_MultipleGaps(t *testing.T) { + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{5, 10, 15, 20, 25}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{5, 10, 15, 20, 25}, gaps) +} + +func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) { + // When lookbackRange is greater than currentBlock, minBlock should be 0 + // With maxPendingBlockRange=2 and currentBlock=10, gap scanner looks at blocks up to 7 + mockProvider := &mockGapStateProvider{ + incompleteBlocksInRange: []uint64{3}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 2}) + + ctx := context.Background() + currentBlock := uint64(10) + lookbackRange := uint64(100) // Greater than currentBlock + + // Gap scanner searches [0, 7] (excludes 8-10 handled by limiter) + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{3}, gaps) +} + +func TestGetGaps_ExcludesMaxPendingBlockRangeWindow(t *testing.T) { + // Verify that gaps within maxPendingBlockRange are NOT returned + // because they're already handled by IsBlockedByIncompleteBlocks + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(100), + incompleteBlocksInRange: []uint64{50}, // Gap at block 50, outside the exclusion window + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 5}) + + ctx := context.Background() + currentBlock := uint64(100) + lookbackRange := uint64(0) + + // With maxPendingBlockRange=5, gap scanner searches [1, 94] (excludes 95-100) + // Block 50 should be found since it's outside the exclusion window + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Equal(t, []uint64{50}, gaps) +} + +func TestGetGaps_CurrentBlockWithinExclusionWindow(t *testing.T) { + // When currentBlock is smaller than or equal to maxPendingBlockRange, + // there's nothing to scan outside the exclusion window + mockProvider := &mockGapStateProvider{ + minStoredBlock: big.NewInt(1), + maxStoredBlock: big.NewInt(5), + incompleteBlocksInRange: []uint64{2}, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.NewEntry(logrus.New()), + StateProvider: mockProvider, + Network: "mainnet", + Processor: "simple", + }, LimiterConfig{MaxPendingBlockRange: 10}) // Larger than currentBlock + + ctx := context.Background() + currentBlock := uint64(5) + lookbackRange := uint64(0) + + // currentBlock (5) <= maxPendingBlockRange (10), so nothing to scan + gaps, err := limiter.GetGaps(ctx, currentBlock, lookbackRange, 100) + + require.NoError(t, err) + assert.Nil(t, gaps) +} diff --git a/pkg/state/manager.go b/pkg/state/manager.go index 9181c02..cba4c01 100644 --- a/pkg/state/manager.go +++ b/pkg/state/manager.go @@ -683,6 +683,42 @@ func (s *Manager) GetIncompleteBlocks(ctx context.Context, network, processor st return blocks, nil } +// GetIncompleteBlocksInRange returns all incomplete blocks between minBlock and maxBlock. +// This is used for gap detection across the full processed range. +func (s *Manager) GetIncompleteBlocksInRange( + ctx context.Context, + network, processor string, + minBlock, maxBlock uint64, + limit int, +) ([]uint64, error) { + query := fmt.Sprintf(` + SELECT block_number + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND complete = 0 + AND block_number >= %d + AND block_number <= %d + ORDER BY block_number ASC + LIMIT %d + `, s.storageTable, processor, network, minBlock, maxBlock, limit) + + s.log.WithFields(logrus.Fields{ + "processor": processor, + "network": network, + "min_block": minBlock, + "max_block": maxBlock, + "limit": limit, + }).Debug("Querying for incomplete blocks in range") + + blocks, err := s.storageClient.QueryUInt64Slice(ctx, query, "block_number") + if err != nil { + return nil, fmt.Errorf("failed to get incomplete blocks in range: %w", err) + } + + return blocks, nil +} + func (s *Manager) GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (minBlock, maxBlock *big.Int, err error) { query := fmt.Sprintf(` SELECT min(block_number) as min, max(block_number) as max