From 0d08eecca42a7778f9a7c42b2bb878d3197210be Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Wed, 4 Feb 2026 13:20:44 +1000 Subject: [PATCH] fix: add IsBlockComplete check to prevent gap detection race condition Add verification that a block is still incomplete in ClickHouse before calling ReprocessBlock in gap detection. This closes a TOCTOU race window where a block could complete (MarkBlockComplete writes complete=1 and cleans up Redis tracking) between when gap detection queries ClickHouse and when it decides to reprocess. Previously, gap detection only checked HasBlockTracking() which returns false after a block completes and Redis is cleaned up, causing already- complete blocks to be unnecessarily reprocessed. - Add IsBlockComplete() to state manager for fresh ClickHouse query - Add completion check in checkGaps() before ReprocessBlock call - Add unit test for race condition logic --- pkg/processor/manager.go | 98 ++++++++++++++----- pkg/processor/manager_test.go | 62 ++++++++++++ pkg/processor/tracker/limiter.go | 47 +++++---- pkg/processor/tracker/limiter_gap_test.go | 9 +- pkg/processor/tracker/processor.go | 3 + .../transaction/simple/block_processing.go | 53 +++++++++- pkg/processor/transaction/simple/processor.go | 17 ++-- .../transaction/structlog/block_processing.go | 47 +++++++++ .../transaction/structlog/processor.go | 17 ++-- .../structlog_agg/block_processing.go | 47 +++++++++ .../transaction/structlog_agg/processor.go | 17 ++-- pkg/state/manager.go | 50 ++++++++-- 12 files changed, 391 insertions(+), 76 deletions(-) diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index 2b3e2f7..6251eb9 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -74,10 +74,11 @@ type Manager struct { processors map[string]tracker.BlockProcessor // Redis/Asynq for distributed processing - redisClient *r.Client - redisPrefix string - asynqClient *asynq.Client - asynqServer *asynq.Server + redisClient *r.Client + redisPrefix string + asynqClient *asynq.Client + asynqServer *asynq.Server + asynqInspector *asynq.Inspector network *ethereum.Network @@ -121,6 +122,9 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta // Initialize Asynq client with its own Redis connection asynqClient := asynq.NewClient(asynqRedisOpt) + // Initialize Asynq inspector for task management (deletion before reprocessing) + asynqInspector := asynq.NewInspector(asynqRedisOpt) + var asynqServer *asynq.Server // Setup queue priorities dynamically based on processors // This will be populated after processors are initialized @@ -145,6 +149,7 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta redisPrefix: redisPrefix, asynqClient: asynqClient, asynqServer: asynqServer, + asynqInspector: asynqInspector, stopChan: make(chan struct{}), blockProcessStop: make(chan struct{}), queueHighWaterMarks: make(map[string]int), @@ -369,6 +374,13 @@ func (m *Manager) Stop(ctx context.Context) error { } } + // Close Asynq inspector + if m.asynqInspector != nil { + if err := m.asynqInspector.Close(); err != nil { + m.log.WithError(err).Error("Failed to close Asynq inspector") + } + } + // Wait for all goroutines to complete m.wg.Wait() m.log.Info("All goroutines stopped") @@ -384,13 +396,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { m.log.Debug("Transaction structlog processor is enabled, initializing...") processor, err := transaction_structlog.New(&transaction_structlog.Dependencies{ - Log: m.log.WithField("processor", "transaction_structlog"), - Pool: m.pool, - State: m.state, - AsynqClient: m.asynqClient, - RedisClient: m.redisClient, - Network: m.network, - RedisPrefix: m.redisPrefix, + Log: m.log.WithField("processor", "transaction_structlog"), + Pool: m.pool, + State: m.state, + AsynqClient: m.asynqClient, + AsynqInspector: m.asynqInspector, + RedisClient: m.redisClient, + Network: m.network, + RedisPrefix: m.redisPrefix, }, &m.config.TransactionStructlog) if err != nil { return fmt.Errorf("failed to create transaction_structlog processor: %w", err) @@ -415,13 +428,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { m.log.Debug("Transaction simple processor is enabled, initializing...") processor, err := transaction_simple.New(&transaction_simple.Dependencies{ - Log: m.log.WithField("processor", "transaction_simple"), - Pool: m.pool, - State: m.state, - AsynqClient: m.asynqClient, - RedisClient: m.redisClient, - Network: m.network, - RedisPrefix: m.redisPrefix, + Log: m.log.WithField("processor", "transaction_simple"), + Pool: m.pool, + State: m.state, + AsynqClient: m.asynqClient, + AsynqInspector: m.asynqInspector, + RedisClient: m.redisClient, + Network: m.network, + RedisPrefix: m.redisPrefix, }, &m.config.TransactionSimple) if err != nil { return fmt.Errorf("failed to create transaction_simple processor: %w", err) @@ -446,13 +460,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { m.log.Debug("Transaction structlog_agg processor is enabled, initializing...") processor, err := transaction_structlog_agg.New(&transaction_structlog_agg.Dependencies{ - Log: m.log.WithField("processor", "transaction_structlog_agg"), - Pool: m.pool, - State: m.state, - AsynqClient: m.asynqClient, - RedisClient: m.redisClient, - Network: m.network, - RedisPrefix: m.redisPrefix, + Log: m.log.WithField("processor", "transaction_structlog_agg"), + Pool: m.pool, + State: m.state, + AsynqClient: m.asynqClient, + AsynqInspector: m.asynqInspector, + RedisClient: m.redisClient, + Network: m.network, + RedisPrefix: m.redisPrefix, }, &m.config.TransactionStructlogAgg) if err != nil { return fmt.Errorf("failed to create transaction_structlog_agg processor: %w", err) @@ -1508,6 +1523,39 @@ func (m *Manager) checkGaps(ctx context.Context) { // Handle INCOMPLETE blocks -> ReprocessBlock (row exists, just stuck) for _, gapBlock := range gapResult.Incomplete { + // Skip if block already has active Redis tracking (being processed) + hasTracking, trackErr := processor.GetCompletionTracker().HasBlockTracking( + ctx, gapBlock, m.network.Name, processorName, m.config.Mode) + if trackErr != nil { + m.log.WithError(trackErr).WithField("block", gapBlock).Debug("Failed to check block tracking") + } + + if hasTracking { + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + }).Debug("Skipping incomplete block - already has active Redis tracking") + + continue + } + + // Re-verify block is still incomplete in ClickHouse. + // This closes a race window where a block completes between + // the gap scan query and now (Redis tracking cleaned up). + isComplete, completeErr := m.state.IsBlockComplete(ctx, gapBlock, m.network.Name, processorName) + if completeErr != nil { + m.log.WithError(completeErr).WithField("block", gapBlock).Debug("Failed to verify block complete status") + } + + if isComplete { + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "block": gapBlock, + }).Debug("Skipping incomplete block - completed during gap scan (race avoided)") + + continue + } + if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil { m.log.WithError(reprocessErr).WithFields(logrus.Fields{ "processor": processorName, diff --git a/pkg/processor/manager_test.go b/pkg/processor/manager_test.go index 06be9c9..3f98e7e 100644 --- a/pkg/processor/manager_test.go +++ b/pkg/processor/manager_test.go @@ -435,6 +435,68 @@ func TestManager_LeaderElectionDisabled(t *testing.T) { // Race condition tests +// TestGapDetectionRaceConditionLogic tests the race condition where a block +// completes between gap scan and reprocess decision. +// +// The race occurs when: +// 1. Gap detection queries ClickHouse - returns block 100 as incomplete +// 2. Block 100 completes (MarkBlockComplete writes complete=1, cleans Redis) +// 3. Gap detection checks Redis - HasBlockTracking returns false +// 4. Gap detection calls ReprocessBlock - WRONG! Block is already complete +// +// Current (buggy) logic in checkGaps(): +// +// if !hasTracking { ReprocessBlock() } // BUG: doesn't check if complete +// +// Fixed logic: +// +// if !hasTracking && !isComplete { ReprocessBlock() } +func TestGapDetectionRaceConditionLogic(t *testing.T) { + tests := []struct { + name string + hasTracking bool // Redis tracking exists + isComplete bool // Block is complete in ClickHouse + shouldReprocess bool // Expected decision + }{ + { + name: "has tracking - skip (being processed)", + hasTracking: true, + isComplete: false, + shouldReprocess: false, + }, + { + name: "no tracking, incomplete - reprocess (orphaned)", + hasTracking: false, + isComplete: false, + shouldReprocess: true, + }, + { + name: "no tracking, complete - skip (race condition)", + hasTracking: false, + isComplete: true, + shouldReprocess: false, // THIS CASE IS THE BUG - currently would reprocess! + }, + { + name: "has tracking, complete - skip (already being processed)", + hasTracking: true, + isComplete: true, + shouldReprocess: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // FIXED logic (now in checkGaps): + // Only reprocess if no Redis tracking AND not complete in ClickHouse + shouldReprocess := !tt.hasTracking && !tt.isComplete + + assert.Equal(t, tt.shouldReprocess, shouldReprocess, + "block with hasTracking=%v, isComplete=%v should reprocess=%v, but got %v", + tt.hasTracking, tt.isComplete, tt.shouldReprocess, shouldReprocess) + }) + } +} + // TestManager_RaceConditions specifically tests for race conditions in manager. func TestManager_RaceConditions(t *testing.T) { log := logrus.New() diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index 79101a0..2342b75 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -240,40 +240,51 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang return nil, fmt.Errorf("state provider does not support gap detection") } + // Get min and max stored blocks to constrain our search range. + // We can only find gaps within blocks that have actually been stored. + minStored, maxStored, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor) + if err != nil { + return nil, fmt.Errorf("failed to get min/max stored blocks: %w", err) + } + + if minStored == nil || maxStored == nil { + // No blocks stored yet + return &GapResult{ScanDuration: time.Since(startTime)}, nil + } + + // Use the stored max as reference point, not the chain head. + // We can only find gaps within data we've actually stored. + referenceBlock := maxStored.Uint64() + var minBlock uint64 if lookbackRange == 0 { // Unlimited: scan from oldest stored block - minStored, _, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor) - if err != nil { - return nil, fmt.Errorf("failed to get min stored block: %w", err) - } - - if minStored == nil { - // No blocks stored yet - return &GapResult{ScanDuration: time.Since(startTime)}, nil - } - minBlock = minStored.Uint64() } else { - // Limited: scan from currentBlock - lookbackRange - if currentBlock > lookbackRange { - minBlock = currentBlock - lookbackRange + // Limited: scan from referenceBlock - lookbackRange + if referenceBlock > lookbackRange { + minBlock = referenceBlock - lookbackRange + } + + // Constrain to actual stored range - can't find gaps before the first stored block + if minBlock < minStored.Uint64() { + minBlock = minStored.Uint64() } } // Calculate maxBlock to exclude the window handled by IsBlockedByIncompleteBlocks. // The limiter already handles blocks within [currentBlock - maxPendingBlockRange, currentBlock], - // so we only scan up to (currentBlock - maxPendingBlockRange - 1) to avoid double work. - maxBlock := currentBlock + // so we only scan up to (referenceBlock - maxPendingBlockRange - 1) to avoid double work. + maxBlock := referenceBlock if l.config.MaxPendingBlockRange > 0 { exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config - if currentBlock > exclusionWindow { - maxBlock = currentBlock - exclusionWindow - 1 + if referenceBlock > exclusionWindow { + maxBlock = referenceBlock - exclusionWindow - 1 } else { - // Current block is within the exclusion window, nothing to scan + // Reference block is within the exclusion window, nothing to scan return &GapResult{ScanDuration: time.Since(startTime)}, nil } } diff --git a/pkg/processor/tracker/limiter_gap_test.go b/pkg/processor/tracker/limiter_gap_test.go index 2c73c27..b857a2d 100644 --- a/pkg/processor/tracker/limiter_gap_test.go +++ b/pkg/processor/tracker/limiter_gap_test.go @@ -246,7 +246,7 @@ func TestGetGaps_ErrorFromGetMinMax(t *testing.T) { require.Error(t, err) assert.Nil(t, result) - assert.Contains(t, err.Error(), "failed to get min stored block") + assert.Contains(t, err.Error(), "failed to get min/max stored blocks") } func TestGetGaps_NoBlocksStored(t *testing.T) { @@ -431,7 +431,9 @@ func TestGetGaps_ScanDurationTracked(t *testing.T) { func TestGetGaps_RespectsLookbackRange(t *testing.T) { mockProvider := &mockGapStateProvider{ - // GetMinMaxStoredBlocks should NOT be called when lookbackRange is set + // minStoredBlock is always needed now to constrain the search range + minStoredBlock: big.NewInt(50), // Matches the calculated min from lookback + maxStoredBlock: big.NewInt(100), incompleteBlocksInRange: []uint64{75, 80}, missingBlocksInRange: []uint64{77}, } @@ -459,9 +461,12 @@ func TestGetGaps_RespectsLookbackRange(t *testing.T) { func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) { // When lookbackRange is greater than currentBlock, minBlock should be 0 + // but constrained to minStoredBlock mockProvider := &mockGapStateProvider{ incompleteBlocksInRange: []uint64{3}, missingBlocksInRange: []uint64{5}, + minStoredBlock: big.NewInt(0), // Set min stored to 0 so gaps at 3,5 are valid + maxStoredBlock: big.NewInt(10), } limiter := NewLimiter(&LimiterDeps{ diff --git a/pkg/processor/tracker/processor.go b/pkg/processor/tracker/processor.go index ee1e4af..9f0322f 100644 --- a/pkg/processor/tracker/processor.go +++ b/pkg/processor/tracker/processor.go @@ -126,6 +126,9 @@ type BlockProcessor interface { // ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks. // This is used for gap filling of missing blocks (blocks with no row in DB). ProcessBlock(ctx context.Context, block execution.Block) error + + // GetCompletionTracker returns the block completion tracker for checking tracking status. + GetCompletionTracker() *BlockCompletionTracker } // QueueInfo contains information about a processor queue. diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index 631b130..5d8786d 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -265,6 +265,45 @@ func (p *Processor) ProcessBlock(ctx context.Context, block execution.Block) err return nil } +// deleteTaskFromMainQueue attempts to delete a task from the main processing queue. +// Returns nil if deleted, not found, or active (acceptable to skip active tasks). +func (p *Processor) deleteTaskFromMainQueue(taskID string) error { + if p.asynqInspector == nil { + return nil + } + + var mainQueue string + if p.processingMode == tracker.BACKWARDS_MODE { + mainQueue = p.getProcessBackwardsQueue() + } else { + mainQueue = p.getProcessForwardsQueue() + } + + err := p.asynqInspector.DeleteTask(mainQueue, taskID) + if err == nil { + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + }).Debug("Deleted task from main queue before reprocess") + + return nil + } + + // Task not found or queue not found - fine, proceed + if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) { + return nil + } + + // Active tasks can't be deleted - that's OK, they'll complete naturally + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + "error": err, + }).Debug("Could not delete task from main queue (may be active)") + + return nil +} + // ReprocessBlock re-enqueues tasks for an orphaned block. // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. // TaskID deduplication ensures no duplicate tasks are created. @@ -330,6 +369,13 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { return fmt.Errorf("failed to create task: %w", err) } + // Try to delete existing task from main queue before re-enqueueing + var deletedFromMain bool + + if delErr := p.deleteTaskFromMainQueue(taskID); delErr == nil { + deletedFromMain = true + } + // Enqueue to the high-priority reprocess queue err = p.EnqueueTask(ctx, task, asynq.Queue(queue), @@ -349,9 +395,10 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { } p.log.WithFields(logrus.Fields{ - "block_number": blockNum, - "tx_count": len(block.Transactions()), - "queue": queue, + "block_number": blockNum, + "tx_count": len(block.Transactions()), + "queue": queue, + "deleted_from_main": deletedFromMain, }).Info("Reprocessed orphaned block to high-priority queue") return nil diff --git a/pkg/processor/transaction/simple/processor.go b/pkg/processor/transaction/simple/processor.go index 2550aa7..f8b0ada 100644 --- a/pkg/processor/transaction/simple/processor.go +++ b/pkg/processor/transaction/simple/processor.go @@ -24,13 +24,14 @@ var _ tracker.BlockProcessor = (*Processor)(nil) // Dependencies contains the dependencies needed for the processor. type Dependencies struct { - Log logrus.FieldLogger - Pool *ethereum.Pool - Network *ethereum.Network - State *state.Manager - AsynqClient *asynq.Client - RedisClient *redis.Client - RedisPrefix string + Log logrus.FieldLogger + Pool *ethereum.Pool + Network *ethereum.Network + State *state.Manager + AsynqClient *asynq.Client + AsynqInspector *asynq.Inspector + RedisClient *redis.Client + RedisPrefix string } // Processor handles simple transaction processing. @@ -42,6 +43,7 @@ type Processor struct { config *Config network *ethereum.Network asynqClient *asynq.Client + asynqInspector *asynq.Inspector processingMode string redisPrefix string @@ -120,6 +122,7 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { config: config, network: deps.Network, asynqClient: deps.AsynqClient, + asynqInspector: deps.AsynqInspector, processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, Limiter: limiter, diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 774b612..22b104b 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -364,6 +364,45 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution return enqueuedCount + skippedCount, nil } +// deleteTaskFromMainQueue attempts to delete a task from the main processing queue. +// Returns nil if deleted, not found, or active (acceptable to skip active tasks). +func (p *Processor) deleteTaskFromMainQueue(taskID string) error { + if p.asynqInspector == nil { + return nil + } + + var mainQueue string + if p.processingMode == tracker.BACKWARDS_MODE { + mainQueue = p.getProcessBackwardsQueue() + } else { + mainQueue = p.getProcessForwardsQueue() + } + + err := p.asynqInspector.DeleteTask(mainQueue, taskID) + if err == nil { + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + }).Debug("Deleted task from main queue before reprocess") + + return nil + } + + // Task not found or queue not found - fine, proceed + if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) { + return nil + } + + // Active tasks can't be deleted - that's OK, they'll complete naturally + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + "error": err, + }).Debug("Could not delete task from main queue (may be active)") + + return nil +} + // ReprocessBlock re-enqueues tasks for an orphaned block. // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. // TaskID deduplication ensures no duplicate tasks are created. @@ -414,6 +453,8 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { var skippedCount int + var deletedCount int + for index, tx := range block.Transactions() { payload := &ProcessPayload{ BlockNumber: *block.Number(), @@ -438,6 +479,11 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { return fmt.Errorf("failed to create task for tx %s: %w", tx.Hash().String(), err) } + // Try to delete existing task from main queue before re-enqueueing + if delErr := p.deleteTaskFromMainQueue(taskID); delErr == nil { + deletedCount++ + } + // Enqueue to the high-priority reprocess queue err = p.EnqueueTask(ctx, task, asynq.Queue(queue), @@ -462,6 +508,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { "expected_count": expectedCount, "enqueued_count": enqueuedCount, "skipped_count": skippedCount, + "deleted_count": deletedCount, "queue": queue, }).Info("Reprocessed orphaned block to high-priority queue") diff --git a/pkg/processor/transaction/structlog/processor.go b/pkg/processor/transaction/structlog/processor.go index f58e69d..34e1281 100644 --- a/pkg/processor/transaction/structlog/processor.go +++ b/pkg/processor/transaction/structlog/processor.go @@ -30,13 +30,14 @@ var _ tracker.BlockProcessor = (*Processor)(nil) // Dependencies contains the dependencies needed for the processor. type Dependencies struct { - Log logrus.FieldLogger - Pool *ethereum.Pool - Network *ethereum.Network - State *state.Manager - AsynqClient *asynq.Client - RedisClient *redis.Client - RedisPrefix string + Log logrus.FieldLogger + Pool *ethereum.Pool + Network *ethereum.Network + State *state.Manager + AsynqClient *asynq.Client + AsynqInspector *asynq.Inspector + RedisClient *redis.Client + RedisPrefix string } // Processor handles transaction structlog processing. @@ -48,6 +49,7 @@ type Processor struct { config *Config network *ethereum.Network asynqClient *asynq.Client + asynqInspector *asynq.Inspector processingMode string redisPrefix string @@ -127,6 +129,7 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { clickhouse: clickhouseClient, config: config, asynqClient: deps.AsynqClient, + asynqInspector: deps.AsynqInspector, processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, Limiter: limiter, diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 5dcd245..8030daa 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -364,6 +364,45 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution return enqueuedCount + skippedCount, nil } +// deleteTaskFromMainQueue attempts to delete a task from the main processing queue. +// Returns nil if deleted, not found, or active (acceptable to skip active tasks). +func (p *Processor) deleteTaskFromMainQueue(taskID string) error { + if p.asynqInspector == nil { + return nil + } + + var mainQueue string + if p.processingMode == tracker.BACKWARDS_MODE { + mainQueue = p.getProcessBackwardsQueue() + } else { + mainQueue = p.getProcessForwardsQueue() + } + + err := p.asynqInspector.DeleteTask(mainQueue, taskID) + if err == nil { + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + }).Debug("Deleted task from main queue before reprocess") + + return nil + } + + // Task not found or queue not found - fine, proceed + if errors.Is(err, asynq.ErrTaskNotFound) || errors.Is(err, asynq.ErrQueueNotFound) { + return nil + } + + // Active tasks can't be deleted - that's OK, they'll complete naturally + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "queue": mainQueue, + "error": err, + }).Debug("Could not delete task from main queue (may be active)") + + return nil +} + // ReprocessBlock re-enqueues tasks for an orphaned block. // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. // TaskID deduplication ensures no duplicate tasks are created. @@ -414,6 +453,8 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { var skippedCount int + var deletedCount int + for index, tx := range block.Transactions() { payload := &ProcessPayload{ BlockNumber: *block.Number(), @@ -438,6 +479,11 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { return fmt.Errorf("failed to create task for tx %s: %w", tx.Hash().String(), err) } + // Try to delete existing task from main queue before re-enqueueing + if delErr := p.deleteTaskFromMainQueue(taskID); delErr == nil { + deletedCount++ + } + // Enqueue to the high-priority reprocess queue err = p.EnqueueTask(ctx, task, asynq.Queue(queue), @@ -462,6 +508,7 @@ func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { "expected_count": expectedCount, "enqueued_count": enqueuedCount, "skipped_count": skippedCount, + "deleted_count": deletedCount, "queue": queue, }).Info("Reprocessed orphaned block to high-priority queue") diff --git a/pkg/processor/transaction/structlog_agg/processor.go b/pkg/processor/transaction/structlog_agg/processor.go index 4a765a7..7f7c03b 100644 --- a/pkg/processor/transaction/structlog_agg/processor.go +++ b/pkg/processor/transaction/structlog_agg/processor.go @@ -30,13 +30,14 @@ var _ tracker.BlockProcessor = (*Processor)(nil) // Dependencies contains the dependencies needed for the processor. type Dependencies struct { - Log logrus.FieldLogger - Pool *ethereum.Pool - Network *ethereum.Network - State *state.Manager - AsynqClient *asynq.Client - RedisClient *redis.Client - RedisPrefix string + Log logrus.FieldLogger + Pool *ethereum.Pool + Network *ethereum.Network + State *state.Manager + AsynqClient *asynq.Client + AsynqInspector *asynq.Inspector + RedisClient *redis.Client + RedisPrefix string } // insertRow wraps CallFrameRow with additional context needed for batched inserts. @@ -58,6 +59,7 @@ type Processor struct { config *Config network *ethereum.Network asynqClient *asynq.Client + asynqInspector *asynq.Inspector processingMode string redisPrefix string @@ -137,6 +139,7 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { clickhouse: clickhouseClient, config: config, asynqClient: deps.AsynqClient, + asynqInspector: deps.AsynqInspector, processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, Limiter: limiter, diff --git a/pkg/state/manager.go b/pkg/state/manager.go index f2de16a..5e55925 100644 --- a/pkg/state/manager.go +++ b/pkg/state/manager.go @@ -721,7 +721,7 @@ func (s *Manager) GetIncompleteBlocksInRange( // GetMissingBlocksInRange returns block numbers that have no row in the database. // This finds blocks that were never processed, not incomplete blocks. -// Uses ClickHouse's numbers() function to generate a sequence and LEFT JOIN to find gaps. +// Uses ClickHouse's numbers() function to generate a sequence and NOT IN to find gaps. func (s *Manager) GetMissingBlocksInRange( ctx context.Context, network, processor string, @@ -729,19 +729,21 @@ func (s *Manager) GetMissingBlocksInRange( limit int, ) ([]uint64, error) { // Use numbers() to generate a sequence from minBlock to maxBlock, - // then LEFT JOIN to find blocks that don't exist in storage. + // then NOT IN to find blocks that don't exist in storage. + // NOTE: We use NOT IN instead of LEFT JOIN + IS NULL because ClickHouse's + // UInt64 columns can't be NULL - LEFT JOIN produces 0 instead of NULL for + // non-matching rows, which breaks the IS NULL check. query := fmt.Sprintf(` - SELECT n.number AS block_number - FROM numbers(%d, %d) AS n - LEFT JOIN ( + SELECT number AS block_number + FROM numbers(%d, %d) + WHERE number NOT IN ( SELECT DISTINCT block_number FROM %s FINAL WHERE processor = '%s' AND meta_network_name = '%s' AND block_number >= %d AND block_number <= %d - ) AS e ON n.number = e.block_number - WHERE e.block_number IS NULL + ) ORDER BY block_number ASC LIMIT %d `, minBlock, maxBlock-minBlock+1, s.storageTable, processor, network, @@ -830,6 +832,40 @@ func (s *Manager) IsBlockRecentlyProcessed(ctx context.Context, blockNumber uint return count != nil && *count > 0, nil } +// IsBlockComplete checks if a block is marked complete in ClickHouse. +// Uses FINAL to get the latest state after ReplacingMergeTree deduplication. +// This is used to prevent race conditions where a block completes between +// gap detection scan and the reprocess decision. +func (s *Manager) IsBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) (bool, error) { + query := fmt.Sprintf(` + SELECT toUInt64(complete) as complete + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND block_number = %d + LIMIT 1 + `, s.storageTable, processor, network, blockNumber) + + s.log.WithFields(logrus.Fields{ + "network": network, + "processor": processor, + "block_number": blockNumber, + "table": s.storageTable, + }).Debug("Checking if block is complete") + + complete, err := s.storageClient.QueryUInt64(ctx, query, "complete") + if err != nil { + return false, fmt.Errorf("failed to check block complete status: %w", err) + } + + // No row found = not complete + if complete == nil { + return false, nil + } + + return *complete == 1, nil +} + // GetHeadDistance calculates the distance between current processing block and the relevant head. func (s *Manager) GetHeadDistance(ctx context.Context, processor, network, mode string, executionHead *big.Int) (distance int64, headType string, err error) { // Get the current processing block (what would be next to process)