Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 73 additions & 25 deletions pkg/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ type Manager struct {
processors map[string]tracker.BlockProcessor

// Redis/Asynq for distributed processing
redisClient *r.Client
redisPrefix string
asynqClient *asynq.Client
asynqServer *asynq.Server
redisClient *r.Client
redisPrefix string
asynqClient *asynq.Client
asynqServer *asynq.Server
asynqInspector *asynq.Inspector

network *ethereum.Network

Expand Down Expand Up @@ -121,6 +122,9 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta
// Initialize Asynq client with its own Redis connection
asynqClient := asynq.NewClient(asynqRedisOpt)

// Initialize Asynq inspector for task management (deletion before reprocessing)
asynqInspector := asynq.NewInspector(asynqRedisOpt)

var asynqServer *asynq.Server
// Setup queue priorities dynamically based on processors
// This will be populated after processors are initialized
Expand All @@ -145,6 +149,7 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta
redisPrefix: redisPrefix,
asynqClient: asynqClient,
asynqServer: asynqServer,
asynqInspector: asynqInspector,
stopChan: make(chan struct{}),
blockProcessStop: make(chan struct{}),
queueHighWaterMarks: make(map[string]int),
Expand Down Expand Up @@ -369,6 +374,13 @@ func (m *Manager) Stop(ctx context.Context) error {
}
}

// Close Asynq inspector
if m.asynqInspector != nil {
if err := m.asynqInspector.Close(); err != nil {
m.log.WithError(err).Error("Failed to close Asynq inspector")
}
}

// Wait for all goroutines to complete
m.wg.Wait()
m.log.Info("All goroutines stopped")
Expand All @@ -384,13 +396,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error {
m.log.Debug("Transaction structlog processor is enabled, initializing...")

processor, err := transaction_structlog.New(&transaction_structlog.Dependencies{
Log: m.log.WithField("processor", "transaction_structlog"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
Log: m.log.WithField("processor", "transaction_structlog"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
AsynqInspector: m.asynqInspector,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
}, &m.config.TransactionStructlog)
if err != nil {
return fmt.Errorf("failed to create transaction_structlog processor: %w", err)
Expand All @@ -415,13 +428,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error {
m.log.Debug("Transaction simple processor is enabled, initializing...")

processor, err := transaction_simple.New(&transaction_simple.Dependencies{
Log: m.log.WithField("processor", "transaction_simple"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
Log: m.log.WithField("processor", "transaction_simple"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
AsynqInspector: m.asynqInspector,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
}, &m.config.TransactionSimple)
if err != nil {
return fmt.Errorf("failed to create transaction_simple processor: %w", err)
Expand All @@ -446,13 +460,14 @@ func (m *Manager) initializeProcessors(ctx context.Context) error {
m.log.Debug("Transaction structlog_agg processor is enabled, initializing...")

processor, err := transaction_structlog_agg.New(&transaction_structlog_agg.Dependencies{
Log: m.log.WithField("processor", "transaction_structlog_agg"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
Log: m.log.WithField("processor", "transaction_structlog_agg"),
Pool: m.pool,
State: m.state,
AsynqClient: m.asynqClient,
AsynqInspector: m.asynqInspector,
RedisClient: m.redisClient,
Network: m.network,
RedisPrefix: m.redisPrefix,
}, &m.config.TransactionStructlogAgg)
if err != nil {
return fmt.Errorf("failed to create transaction_structlog_agg processor: %w", err)
Expand Down Expand Up @@ -1508,6 +1523,39 @@ func (m *Manager) checkGaps(ctx context.Context) {

// Handle INCOMPLETE blocks -> ReprocessBlock (row exists, just stuck)
for _, gapBlock := range gapResult.Incomplete {
// Skip if block already has active Redis tracking (being processed)
hasTracking, trackErr := processor.GetCompletionTracker().HasBlockTracking(
ctx, gapBlock, m.network.Name, processorName, m.config.Mode)
if trackErr != nil {
m.log.WithError(trackErr).WithField("block", gapBlock).Debug("Failed to check block tracking")
}

if hasTracking {
m.log.WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
}).Debug("Skipping incomplete block - already has active Redis tracking")

continue
}

// Re-verify block is still incomplete in ClickHouse.
// This closes a race window where a block completes between
// the gap scan query and now (Redis tracking cleaned up).
isComplete, completeErr := m.state.IsBlockComplete(ctx, gapBlock, m.network.Name, processorName)
if completeErr != nil {
m.log.WithError(completeErr).WithField("block", gapBlock).Debug("Failed to verify block complete status")
}

if isComplete {
m.log.WithFields(logrus.Fields{
"processor": processorName,
"block": gapBlock,
}).Debug("Skipping incomplete block - completed during gap scan (race avoided)")

continue
}

if reprocessErr := processor.ReprocessBlock(ctx, gapBlock); reprocessErr != nil {
m.log.WithError(reprocessErr).WithFields(logrus.Fields{
"processor": processorName,
Expand Down
62 changes: 62 additions & 0 deletions pkg/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,68 @@ func TestManager_LeaderElectionDisabled(t *testing.T) {

// Race condition tests

// TestGapDetectionRaceConditionLogic tests the race condition where a block
// completes between gap scan and reprocess decision.
//
// The race occurs when:
// 1. Gap detection queries ClickHouse - returns block 100 as incomplete
// 2. Block 100 completes (MarkBlockComplete writes complete=1, cleans Redis)
// 3. Gap detection checks Redis - HasBlockTracking returns false
// 4. Gap detection calls ReprocessBlock - WRONG! Block is already complete
//
// Current (buggy) logic in checkGaps():
//
// if !hasTracking { ReprocessBlock() } // BUG: doesn't check if complete
//
// Fixed logic:
//
// if !hasTracking && !isComplete { ReprocessBlock() }
func TestGapDetectionRaceConditionLogic(t *testing.T) {
tests := []struct {
name string
hasTracking bool // Redis tracking exists
isComplete bool // Block is complete in ClickHouse
shouldReprocess bool // Expected decision
}{
{
name: "has tracking - skip (being processed)",
hasTracking: true,
isComplete: false,
shouldReprocess: false,
},
{
name: "no tracking, incomplete - reprocess (orphaned)",
hasTracking: false,
isComplete: false,
shouldReprocess: true,
},
{
name: "no tracking, complete - skip (race condition)",
hasTracking: false,
isComplete: true,
shouldReprocess: false, // THIS CASE IS THE BUG - currently would reprocess!
},
{
name: "has tracking, complete - skip (already being processed)",
hasTracking: true,
isComplete: true,
shouldReprocess: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// FIXED logic (now in checkGaps):
// Only reprocess if no Redis tracking AND not complete in ClickHouse
shouldReprocess := !tt.hasTracking && !tt.isComplete

assert.Equal(t, tt.shouldReprocess, shouldReprocess,
"block with hasTracking=%v, isComplete=%v should reprocess=%v, but got %v",
tt.hasTracking, tt.isComplete, tt.shouldReprocess, shouldReprocess)
})
}
}

// TestManager_RaceConditions specifically tests for race conditions in manager.
func TestManager_RaceConditions(t *testing.T) {
log := logrus.New()
Expand Down
47 changes: 29 additions & 18 deletions pkg/processor/tracker/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,40 +240,51 @@ func (l *Limiter) GetGaps(ctx context.Context, currentBlock uint64, lookbackRang
return nil, fmt.Errorf("state provider does not support gap detection")
}

// Get min and max stored blocks to constrain our search range.
// We can only find gaps within blocks that have actually been stored.
minStored, maxStored, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor)
if err != nil {
return nil, fmt.Errorf("failed to get min/max stored blocks: %w", err)
}

if minStored == nil || maxStored == nil {
// No blocks stored yet
return &GapResult{ScanDuration: time.Since(startTime)}, nil
}

// Use the stored max as reference point, not the chain head.
// We can only find gaps within data we've actually stored.
referenceBlock := maxStored.Uint64()

var minBlock uint64

if lookbackRange == 0 {
// Unlimited: scan from oldest stored block
minStored, _, err := gapProvider.GetMinMaxStoredBlocks(ctx, l.network, l.processor)
if err != nil {
return nil, fmt.Errorf("failed to get min stored block: %w", err)
}

if minStored == nil {
// No blocks stored yet
return &GapResult{ScanDuration: time.Since(startTime)}, nil
}

minBlock = minStored.Uint64()
} else {
// Limited: scan from currentBlock - lookbackRange
if currentBlock > lookbackRange {
minBlock = currentBlock - lookbackRange
// Limited: scan from referenceBlock - lookbackRange
if referenceBlock > lookbackRange {
minBlock = referenceBlock - lookbackRange
}

// Constrain to actual stored range - can't find gaps before the first stored block
if minBlock < minStored.Uint64() {
minBlock = minStored.Uint64()
}
}

// Calculate maxBlock to exclude the window handled by IsBlockedByIncompleteBlocks.
// The limiter already handles blocks within [currentBlock - maxPendingBlockRange, currentBlock],
// so we only scan up to (currentBlock - maxPendingBlockRange - 1) to avoid double work.
maxBlock := currentBlock
// so we only scan up to (referenceBlock - maxPendingBlockRange - 1) to avoid double work.
maxBlock := referenceBlock

if l.config.MaxPendingBlockRange > 0 {
exclusionWindow := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated in config

if currentBlock > exclusionWindow {
maxBlock = currentBlock - exclusionWindow - 1
if referenceBlock > exclusionWindow {
maxBlock = referenceBlock - exclusionWindow - 1
} else {
// Current block is within the exclusion window, nothing to scan
// Reference block is within the exclusion window, nothing to scan
return &GapResult{ScanDuration: time.Since(startTime)}, nil
}
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/processor/tracker/limiter_gap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func TestGetGaps_ErrorFromGetMinMax(t *testing.T) {

require.Error(t, err)
assert.Nil(t, result)
assert.Contains(t, err.Error(), "failed to get min stored block")
assert.Contains(t, err.Error(), "failed to get min/max stored blocks")
}

func TestGetGaps_NoBlocksStored(t *testing.T) {
Expand Down Expand Up @@ -431,7 +431,9 @@ func TestGetGaps_ScanDurationTracked(t *testing.T) {

func TestGetGaps_RespectsLookbackRange(t *testing.T) {
mockProvider := &mockGapStateProvider{
// GetMinMaxStoredBlocks should NOT be called when lookbackRange is set
// minStoredBlock is always needed now to constrain the search range
minStoredBlock: big.NewInt(50), // Matches the calculated min from lookback
maxStoredBlock: big.NewInt(100),
incompleteBlocksInRange: []uint64{75, 80},
missingBlocksInRange: []uint64{77},
}
Expand Down Expand Up @@ -459,9 +461,12 @@ func TestGetGaps_RespectsLookbackRange(t *testing.T) {

func TestGetGaps_LookbackRangeGreaterThanCurrentBlock(t *testing.T) {
// When lookbackRange is greater than currentBlock, minBlock should be 0
// but constrained to minStoredBlock
mockProvider := &mockGapStateProvider{
incompleteBlocksInRange: []uint64{3},
missingBlocksInRange: []uint64{5},
minStoredBlock: big.NewInt(0), // Set min stored to 0 so gaps at 3,5 are valid
maxStoredBlock: big.NewInt(10),
}

limiter := NewLimiter(&LimiterDeps{
Expand Down
3 changes: 3 additions & 0 deletions pkg/processor/tracker/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ type BlockProcessor interface {
// ProcessBlock processes a single block - fetches, marks enqueued, and enqueues tasks.
// This is used for gap filling of missing blocks (blocks with no row in DB).
ProcessBlock(ctx context.Context, block execution.Block) error

// GetCompletionTracker returns the block completion tracker for checking tracking status.
GetCompletionTracker() *BlockCompletionTracker
}

// QueueInfo contains information about a processor queue.
Expand Down
Loading