diff --git a/pkg/processor/tracker/block_tracker.go b/pkg/processor/tracker/block_tracker.go index 735f89c..cd51547 100644 --- a/pkg/processor/tracker/block_tracker.go +++ b/pkg/processor/tracker/block_tracker.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strconv" - "strings" "time" "github.com/redis/go-redis/v9" @@ -20,6 +19,30 @@ const ( DefaultStaleThreshold = 5 * time.Minute ) +// trackTaskCompletionScript is a Lua script for atomic task completion tracking. +// It adds a task to the completed set and returns both the completed count and expected count +// in a single round trip, ensuring atomicity. +// +//nolint:gochecknoglobals // Lua scripts are safely shared across goroutines +var trackTaskCompletionScript = redis.NewScript(` +local completedKey = KEYS[1] +local expectedKey = KEYS[2] +local taskID = ARGV[1] +local ttlSeconds = tonumber(ARGV[2]) + +redis.call('SADD', completedKey, taskID) +redis.call('EXPIRE', completedKey, ttlSeconds) + +local completedCount = redis.call('SCARD', completedKey) +local expectedStr = redis.call('GET', expectedKey) + +if expectedStr == false then + return {completedCount, -1} +end + +return {completedCount, tonumber(expectedStr)} +`) + // BlockCompletionTrackerConfig holds configuration for the BlockCompletionTracker. type BlockCompletionTrackerConfig struct { // StaleThreshold is the time after which a block is considered stale. @@ -95,13 +118,14 @@ func (t *BlockCompletionTracker) metaKey(network, processor, mode string, blockN return fmt.Sprintf("%s:block_meta:%s:%s:%s:%d", t.prefix, processor, network, mode, blockNum) } -// metaKeyPattern returns a pattern for scanning block_meta keys. -func (t *BlockCompletionTracker) metaKeyPattern(network, processor, mode string) string { +// pendingBlocksKey returns the key for the sorted set of pending blocks. +// The sorted set uses enqueue timestamps as scores for O(log N) stale detection. +func (t *BlockCompletionTracker) pendingBlocksKey(network, processor, mode string) string { if t.prefix == "" { - return fmt.Sprintf("block_meta:%s:%s:%s:*", processor, network, mode) + return fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) } - return fmt.Sprintf("%s:block_meta:%s:%s:%s:*", t.prefix, processor, network, mode) + return fmt.Sprintf("%s:pending_blocks:%s:%s:%s", t.prefix, processor, network, mode) } // RegisterBlock initializes tracking for a new block. @@ -116,16 +140,24 @@ func (t *BlockCompletionTracker) RegisterBlock( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) metaKey := t.metaKey(network, processor, mode, blockNum) + pendingKey := t.pendingBlocksKey(network, processor, mode) + + now := time.Now() pipe := t.redis.Pipeline() pipe.Del(ctx, completedKey) // Clear old completions pipe.Set(ctx, expectedKey, expectedCount, DefaultBlockMetaTTL) // Set expected count pipe.HSet(ctx, metaKey, map[string]any{ - "enqueued_at": time.Now().Unix(), + "enqueued_at": now.Unix(), "queue": queue, "expected": expectedCount, }) pipe.Expire(ctx, metaKey, DefaultBlockMetaTTL) + // Add to pending blocks sorted set with timestamp as score for O(log N) stale detection + pipe.ZAdd(ctx, pendingKey, redis.Z{ + Score: float64(now.Unix()), + Member: blockNum, + }) _, err := pipe.Exec(ctx) if err != nil { @@ -146,6 +178,7 @@ func (t *BlockCompletionTracker) RegisterBlock( // TrackTaskCompletion records a task completion and checks if block is done. // Returns true if all tasks are now complete. +// Uses a Lua script for atomic completion tracking in a single round trip. func (t *BlockCompletionTracker) TrackTaskCompletion( ctx context.Context, taskID string, @@ -155,25 +188,31 @@ func (t *BlockCompletionTracker) TrackTaskCompletion( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) - // Add to completed set (idempotent - same task completing twice is fine) - // Set TTL to ensure cleanup if block never completes - pipe := t.redis.Pipeline() - pipe.SAdd(ctx, completedKey, taskID) - pipe.Expire(ctx, completedKey, DefaultBlockMetaTTL) + // Execute Lua script for atomic task completion tracking + result, err := trackTaskCompletionScript.Run(ctx, t.redis, + []string{completedKey, expectedKey}, + taskID, int(DefaultBlockMetaTTL.Seconds()), + ).Slice() + if err != nil { + return false, fmt.Errorf("failed to track task completion: %w", err) + } - if _, err := pipe.Exec(ctx); err != nil { - return false, fmt.Errorf("failed to add task to completed set: %w", err) + if len(result) != 2 { + return false, fmt.Errorf("unexpected result length from Lua script: %d", len(result)) } - // Get counts - completedCount, err := t.redis.SCard(ctx, completedKey).Result() - if err != nil { - return false, fmt.Errorf("failed to get completed count: %w", err) + completedCount, ok := result[0].(int64) + if !ok { + return false, fmt.Errorf("failed to parse completed count from Lua script result") } - expectedStr, err := t.redis.Get(ctx, expectedKey).Result() - if err == redis.Nil { - // Block not registered - might be old task from before retry, or already cleaned up + expected, ok := result[1].(int64) + if !ok { + return false, fmt.Errorf("failed to parse expected count from Lua script result") + } + + // -1 indicates block not registered (expected key doesn't exist) + if expected == -1 { t.log.WithFields(logrus.Fields{ "block_number": blockNum, "task_id": taskID, @@ -185,15 +224,6 @@ func (t *BlockCompletionTracker) TrackTaskCompletion( return false, nil } - if err != nil { - return false, fmt.Errorf("failed to get expected count: %w", err) - } - - expected, err := strconv.ParseInt(expectedStr, 10, 64) - if err != nil { - return false, fmt.Errorf("failed to parse expected count: %w", err) - } - t.log.WithFields(logrus.Fields{ "block_number": blockNum, "task_id": taskID, @@ -222,8 +252,13 @@ func (t *BlockCompletionTracker) MarkBlockComplete( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) metaKey := t.metaKey(network, processor, mode, blockNum) + pendingKey := t.pendingBlocksKey(network, processor, mode) - if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil { + pipe := t.redis.Pipeline() + pipe.Del(ctx, completedKey, expectedKey, metaKey) + pipe.ZRem(ctx, pendingKey, blockNum) + + if _, err := pipe.Exec(ctx); err != nil { // Log but don't fail - keys will expire anyway t.log.WithError(err).WithFields(logrus.Fields{ "block_number": blockNum, @@ -244,45 +279,85 @@ func (t *BlockCompletionTracker) MarkBlockComplete( } // GetStaleBlocks returns blocks that have been processing longer than the stale threshold. +// Uses ZRANGEBYSCORE on the pending blocks sorted set for O(log N + M) complexity. func (t *BlockCompletionTracker) GetStaleBlocks( ctx context.Context, network, processor, mode string, ) ([]uint64, error) { - pattern := t.metaKeyPattern(network, processor, mode) - staleBlocks := make([]uint64, 0) + pendingKey := t.pendingBlocksKey(network, processor, mode) + staleThreshold := time.Now().Add(-t.config.StaleThreshold).Unix() - iter := t.redis.Scan(ctx, 0, pattern, 100).Iterator() - for iter.Next(ctx) { - key := iter.Val() + members, err := t.redis.ZRangeByScore(ctx, pendingKey, &redis.ZRangeBy{ + Min: "-inf", + Max: fmt.Sprintf("%d", staleThreshold), + }).Result() + if err != nil { + return nil, fmt.Errorf("failed to get stale blocks: %w", err) + } + + staleBlocks := make([]uint64, 0, len(members)) - enqueuedAtStr, err := t.redis.HGet(ctx, key, "enqueued_at").Result() + for _, member := range members { + blockNum, err := strconv.ParseUint(member, 10, 64) if err != nil { - t.log.WithError(err).WithField("key", key).Debug("Failed to get enqueued_at") + t.log.WithError(err).WithField("member", member).Debug("Failed to parse block number") continue } - enqueuedAt, err := strconv.ParseInt(enqueuedAtStr, 10, 64) - if err != nil { - t.log.WithError(err).WithField("key", key).Debug("Failed to parse enqueued_at") + staleBlocks = append(staleBlocks, blockNum) + } - continue - } + return staleBlocks, nil +} - if time.Since(time.Unix(enqueuedAt, 0)) > t.config.StaleThreshold { - // Extract block number from key - blockNum := extractBlockNumFromKey(key) - if blockNum != 0 { - staleBlocks = append(staleBlocks, blockNum) - } - } +// ClearStaleBlocks removes all stale blocks and their associated tracking data. +// Uses ZRANGEBYSCORE to identify stale blocks and a pipeline to efficiently delete all related keys. +// Returns the number of blocks cleared. +func (t *BlockCompletionTracker) ClearStaleBlocks( + ctx context.Context, + network, processor, mode string, +) (int, error) { + staleBlocks, err := t.GetStaleBlocks(ctx, network, processor, mode) + if err != nil { + return 0, fmt.Errorf("failed to get stale blocks: %w", err) } - if err := iter.Err(); err != nil { - return nil, fmt.Errorf("failed to scan for stale blocks: %w", err) + if len(staleBlocks) == 0 { + return 0, nil } - return staleBlocks, nil + pendingKey := t.pendingBlocksKey(network, processor, mode) + pipe := t.redis.Pipeline() + + // Collect all keys to delete and members to remove from sorted set + members := make([]any, 0, len(staleBlocks)) + + for _, blockNum := range staleBlocks { + completedKey := t.completedKey(network, processor, mode, blockNum) + expectedKey := t.expectedKey(network, processor, mode, blockNum) + metaKey := t.metaKey(network, processor, mode, blockNum) + + pipe.Del(ctx, completedKey, expectedKey, metaKey) + + members = append(members, blockNum) + } + + // Remove all stale blocks from the sorted set in one operation + pipe.ZRem(ctx, pendingKey, members...) + + if _, err := pipe.Exec(ctx); err != nil { + return 0, fmt.Errorf("failed to clear stale blocks: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "count": len(staleBlocks), + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Cleared stale blocks") + + return len(staleBlocks), nil } // GetBlockStatus returns the completion status of a block. @@ -330,8 +405,13 @@ func (t *BlockCompletionTracker) ClearBlock( completedKey := t.completedKey(network, processor, mode, blockNum) expectedKey := t.expectedKey(network, processor, mode, blockNum) metaKey := t.metaKey(network, processor, mode, blockNum) + pendingKey := t.pendingBlocksKey(network, processor, mode) - if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil { + pipe := t.redis.Pipeline() + pipe.Del(ctx, completedKey, expectedKey, metaKey) + pipe.ZRem(ctx, pendingKey, blockNum) + + if _, err := pipe.Exec(ctx); err != nil { return fmt.Errorf("failed to clear block tracking: %w", err) } @@ -345,26 +425,6 @@ func (t *BlockCompletionTracker) ClearBlock( return nil } -// extractBlockNumFromKey extracts the block number from a Redis key. -// Key format: block_meta:{prefix}:{processor}:{network}:{mode}:{blockNum} -// or: block_meta:{processor}:{network}:{mode}:{blockNum} -func extractBlockNumFromKey(key string) uint64 { - parts := strings.Split(key, ":") - if len(parts) < 2 { - return 0 - } - - // Block number is always the last part - blockNumStr := parts[len(parts)-1] - - blockNum, err := strconv.ParseUint(blockNumStr, 10, 64) - if err != nil { - return 0 - } - - return blockNum -} - // HasBlockTracking checks if a block has Redis tracking data. // Returns true if block_meta key exists (block is being tracked). // Used to detect orphaned blocks that are in ClickHouse (complete=0) but have no Redis tracking. diff --git a/pkg/processor/tracker/block_tracker_test.go b/pkg/processor/tracker/block_tracker_test.go index 8fe2d49..90c13b6 100644 --- a/pkg/processor/tracker/block_tracker_test.go +++ b/pkg/processor/tracker/block_tracker_test.go @@ -3,6 +3,7 @@ package tracker import ( "context" "fmt" + "sync" "testing" "time" @@ -13,6 +14,11 @@ import ( "github.com/stretchr/testify/require" ) +const ( + testNetwork = "test_network" + testProcessor = "test_processor" +) + // mockStateProviderForTracker implements StateProvider for testing. type mockStateProviderForTracker struct { oldestIncomplete *uint64 @@ -50,7 +56,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { blockNum: 100, setupRedis: func(mr *miniredis.Miniredis, blockNum uint64) { mr.HSet( - fmt.Sprintf("block_meta:test_processor:test_network:forwards:%d", blockNum), + fmt.Sprintf("block_meta:%s:%s:%s:%d", testProcessor, testNetwork, FORWARDS_MODE, blockNum), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) }, @@ -68,7 +74,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { setupRedis: func(mr *miniredis.Miniredis, _ uint64) { // Set up tracking for a DIFFERENT block mr.HSet( - "block_meta:test_processor:test_network:forwards:999", + fmt.Sprintf("block_meta:%s:%s:%s:999", testProcessor, testNetwork, FORWARDS_MODE), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) }, @@ -80,7 +86,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { setupRedis: func(mr *miniredis.Miniredis, blockNum uint64) { // Note: prefix is empty in this test, so key format doesn't include prefix mr.HSet( - fmt.Sprintf("block_meta:test_processor:test_network:forwards:%d", blockNum), + fmt.Sprintf("block_meta:%s:%s:%s:%d", testProcessor, testNetwork, FORWARDS_MODE, blockNum), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) }, @@ -107,7 +113,7 @@ func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { got, err := tracker.HasBlockTracking( context.Background(), tt.blockNum, - "test_network", "test_processor", "forwards", + testNetwork, testProcessor, FORWARDS_MODE, ) require.NoError(t, err) @@ -134,13 +140,13 @@ func TestBlockCompletionTracker_HasBlockTracking_WithPrefix(t *testing.T) { // Set up tracking with prefix blockNum := uint64(100) mr.HSet( - fmt.Sprintf("%s:block_meta:test_processor:test_network:forwards:%d", prefix, blockNum), + fmt.Sprintf("%s:block_meta:%s:%s:%s:%d", prefix, testProcessor, testNetwork, FORWARDS_MODE, blockNum), "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), ) got, err := tracker.HasBlockTracking( context.Background(), blockNum, - "test_network", "test_processor", "forwards", + testNetwork, testProcessor, FORWARDS_MODE, ) require.NoError(t, err) @@ -162,9 +168,9 @@ func TestBlockCompletionTracker_HasBlockTracking_AfterRegisterBlock(t *testing.T ) blockNum := uint64(100) - network := "test_network" - processor := "test_processor" - mode := "forwards" + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE // Verify block has no tracking initially hasTracking, err := tracker.HasBlockTracking(context.Background(), blockNum, network, processor, mode) @@ -272,3 +278,399 @@ func (m *mockStateProviderForLimiter) MarkBlockComplete( ) error { return nil } + +func TestBlockCompletionTracker_PendingBlocksSortedSet(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: 5 * time.Minute}, + ) + + ctx := context.Background() + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE + + t.Run("RegisterBlock adds to sorted set", func(t *testing.T) { + blockNum := uint64(100) + + err := tracker.RegisterBlock(ctx, blockNum, 5, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify block is in sorted set + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + score, err := client.ZScore(ctx, pendingKey, "100").Result() + require.NoError(t, err) + assert.Greater(t, score, float64(0), "score should be a positive timestamp") + }) + + t.Run("MarkBlockComplete removes from sorted set", func(t *testing.T) { + blockNum := uint64(101) + + err := tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify block is in sorted set + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + exists, err := client.ZScore(ctx, pendingKey, "101").Result() + require.NoError(t, err) + assert.Greater(t, exists, float64(0)) + + // Mark complete + err = tracker.MarkBlockComplete(ctx, blockNum, network, processor, mode) + require.NoError(t, err) + + // Verify block is removed from sorted set + _, err = client.ZScore(ctx, pendingKey, "101").Result() + assert.ErrorIs(t, err, redis.Nil, "block should be removed from sorted set") + }) + + t.Run("ClearBlock removes from sorted set", func(t *testing.T) { + blockNum := uint64(102) + + err := tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify block is in sorted set + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + _, err = client.ZScore(ctx, pendingKey, "102").Result() + require.NoError(t, err) + + // Clear block + err = tracker.ClearBlock(ctx, blockNum, network, processor, mode) + require.NoError(t, err) + + // Verify block is removed from sorted set + _, err = client.ZScore(ctx, pendingKey, "102").Result() + assert.ErrorIs(t, err, redis.Nil, "block should be removed from sorted set") + }) +} + +func TestBlockCompletionTracker_GetStaleBlocks_SortedSet(t *testing.T) { + ctx := context.Background() + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE + + t.Run("returns empty when no blocks registered", func(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: 100 * time.Millisecond}, + ) + + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Empty(t, staleBlocks) + }) + + t.Run("returns stale blocks only", func(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + // Use a longer stale threshold to ensure fresh blocks are not marked stale + staleThreshold := 1 * time.Minute + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: staleThreshold}, + ) + + // Register blocks with old timestamps directly in Redis + pendingKey := fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode) + oldTimestamp := float64(time.Now().Add(-5 * time.Minute).Unix()) + // Fresh timestamp should be well within the stale threshold + newTimestamp := float64(time.Now().Unix()) + + // Add stale block (old timestamp - 5 minutes ago, well past 1 minute threshold) + err = client.ZAdd(ctx, pendingKey, redis.Z{Score: oldTimestamp, Member: "200"}).Err() + require.NoError(t, err) + + // Add fresh block (current timestamp - not past 1 minute threshold) + err = client.ZAdd(ctx, pendingKey, redis.Z{Score: newTimestamp, Member: "201"}).Err() + require.NoError(t, err) + + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + + assert.Len(t, staleBlocks, 1) + assert.Contains(t, staleBlocks, uint64(200)) + assert.NotContains(t, staleBlocks, uint64(201)) + }) + + t.Run("blocks become stale after threshold", func(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + // Use 1 second threshold for reliable testing + staleThreshold := 1 * time.Second + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: staleThreshold}, + ) + + blockNum := uint64(300) + + err = tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Initially not stale (we just registered it, well within 1 second) + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.NotContains(t, staleBlocks, blockNum, "block should not be stale immediately after registration") + + // Wait for stale threshold + time.Sleep(staleThreshold + 100*time.Millisecond) + + // Now stale + staleBlocks, err = tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Contains(t, staleBlocks, blockNum, "block should be stale after threshold") + }) +} + +func TestBlockCompletionTracker_TrackTaskCompletion_LuaScript(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + ctx := context.Background() + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE + blockNum := uint64(500) + + t.Run("returns false when block not registered", func(t *testing.T) { + complete, err := tracker.TrackTaskCompletion(ctx, "task1", blockNum, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + }) + + t.Run("tracks completion correctly", func(t *testing.T) { + err := tracker.RegisterBlock(ctx, blockNum, 3, network, processor, mode, "test_queue") + require.NoError(t, err) + + // First task + complete, err := tracker.TrackTaskCompletion(ctx, "task1", blockNum, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + + // Second task + complete, err = tracker.TrackTaskCompletion(ctx, "task2", blockNum, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + + // Third task - should complete + complete, err = tracker.TrackTaskCompletion(ctx, "task3", blockNum, network, processor, mode) + require.NoError(t, err) + assert.True(t, complete) + }) + + t.Run("is idempotent", func(t *testing.T) { + blockNum2 := uint64(501) + + err := tracker.RegisterBlock(ctx, blockNum2, 2, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Track same task twice + complete, err := tracker.TrackTaskCompletion(ctx, "task1", blockNum2, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete) + + complete, err = tracker.TrackTaskCompletion(ctx, "task1", blockNum2, network, processor, mode) + require.NoError(t, err) + assert.False(t, complete, "duplicate task should not increase count") + + // Second unique task should complete + complete, err = tracker.TrackTaskCompletion(ctx, "task2", blockNum2, network, processor, mode) + require.NoError(t, err) + assert.True(t, complete) + }) +} + +func TestBlockCompletionTracker_ClearStaleBlocks_Bulk(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + staleThreshold := 50 * time.Millisecond + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{StaleThreshold: staleThreshold}, + ) + + ctx := context.Background() + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE + + t.Run("returns 0 when no stale blocks", func(t *testing.T) { + cleared, err := tracker.ClearStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Equal(t, 0, cleared) + }) + + t.Run("clears stale blocks and their keys", func(t *testing.T) { + // Register multiple blocks + for i := uint64(600); i < 605; i++ { + err := tracker.RegisterBlock(ctx, i, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + } + + // Wait for blocks to become stale + time.Sleep(staleThreshold + 50*time.Millisecond) + + // Verify blocks are stale + staleBlocks, err := tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Len(t, staleBlocks, 5) + + // Clear stale blocks + cleared, err := tracker.ClearStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Equal(t, 5, cleared) + + // Verify blocks are cleared + staleBlocks, err = tracker.GetStaleBlocks(ctx, network, processor, mode) + require.NoError(t, err) + assert.Empty(t, staleBlocks) + + // Verify keys are deleted + for i := uint64(600); i < 605; i++ { + hasTracking, err := tracker.HasBlockTracking(ctx, i, network, processor, mode) + require.NoError(t, err) + assert.False(t, hasTracking, "block %d should not have tracking", i) + } + }) +} + +func TestBlockCompletionTracker_LuaScript_Concurrent(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + ctx := context.Background() + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE + blockNum := uint64(700) + expectedTasks := 100 + + err = tracker.RegisterBlock(ctx, blockNum, expectedTasks, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Track completions concurrently + var ( + wg sync.WaitGroup + completionCount int + mu sync.Mutex + ) + + for i := 0; i < expectedTasks; i++ { + wg.Add(1) + + go func(taskNum int) { + defer wg.Done() + + taskID := fmt.Sprintf("task%d", taskNum) + + complete, trackErr := tracker.TrackTaskCompletion(ctx, taskID, blockNum, network, processor, mode) + require.NoError(t, trackErr) + + if complete { + mu.Lock() + + completionCount++ + + mu.Unlock() + } + }(i) + } + + wg.Wait() + + // Exactly one goroutine should have seen the completion + assert.Equal(t, 1, completionCount, "exactly one task should report completion") + + // Verify final state + completed, expected, _, err := tracker.GetBlockStatus(ctx, blockNum, network, processor, mode) + require.NoError(t, err) + assert.Equal(t, int64(expectedTasks), completed) + assert.Equal(t, int64(expectedTasks), expected) +} + +func TestBlockCompletionTracker_PendingBlocksKey_WithPrefix(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + prefix := "myapp" + tracker := NewBlockCompletionTracker( + client, prefix, logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + ctx := context.Background() + network := testNetwork + processor := testProcessor + mode := FORWARDS_MODE + blockNum := uint64(800) + + err = tracker.RegisterBlock(ctx, blockNum, 1, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify the key includes the prefix + pendingKey := fmt.Sprintf("%s:pending_blocks:%s:%s:%s", prefix, processor, network, mode) + score, err := client.ZScore(ctx, pendingKey, "800").Result() + require.NoError(t, err) + assert.Greater(t, score, float64(0)) +} diff --git a/pkg/processor/tracker/queues_test.go b/pkg/processor/tracker/queues_test.go index b9dc614..7d5b1ef 100644 --- a/pkg/processor/tracker/queues_test.go +++ b/pkg/processor/tracker/queues_test.go @@ -6,6 +6,8 @@ import ( "github.com/stretchr/testify/assert" ) +const testProcessorName = "test_processor" + func TestProcessReprocessForwardsQueue(t *testing.T) { tests := []struct { name string @@ -212,7 +214,7 @@ func TestPrefixedProcessReprocessBackwardsQueue_EmptyPrefix(t *testing.T) { func TestQueueConsistency(t *testing.T) { // Ensure all queue functions follow consistent naming conventions - processorName := "test_processor" + processorName := testProcessorName prefix := "prefix" forwardsUnprefixed := ProcessForwardsQueue(processorName)