Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 133 additions & 73 deletions pkg/processor/tracker/block_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"strconv"
"strings"
"time"

"github.com/redis/go-redis/v9"
Expand All @@ -20,6 +19,30 @@ const (
DefaultStaleThreshold = 5 * time.Minute
)

// trackTaskCompletionScript is a Lua script for atomic task completion tracking.
// It adds a task to the completed set and returns both the completed count and expected count
// in a single round trip, ensuring atomicity.
//
//nolint:gochecknoglobals // Lua scripts are safely shared across goroutines
var trackTaskCompletionScript = redis.NewScript(`
local completedKey = KEYS[1]
local expectedKey = KEYS[2]
local taskID = ARGV[1]
local ttlSeconds = tonumber(ARGV[2])

redis.call('SADD', completedKey, taskID)
redis.call('EXPIRE', completedKey, ttlSeconds)

local completedCount = redis.call('SCARD', completedKey)
local expectedStr = redis.call('GET', expectedKey)

if expectedStr == false then
return {completedCount, -1}
end

return {completedCount, tonumber(expectedStr)}
`)

// BlockCompletionTrackerConfig holds configuration for the BlockCompletionTracker.
type BlockCompletionTrackerConfig struct {
// StaleThreshold is the time after which a block is considered stale.
Expand Down Expand Up @@ -95,13 +118,14 @@ func (t *BlockCompletionTracker) metaKey(network, processor, mode string, blockN
return fmt.Sprintf("%s:block_meta:%s:%s:%s:%d", t.prefix, processor, network, mode, blockNum)
}

// metaKeyPattern returns a pattern for scanning block_meta keys.
func (t *BlockCompletionTracker) metaKeyPattern(network, processor, mode string) string {
// pendingBlocksKey returns the key for the sorted set of pending blocks.
// The sorted set uses enqueue timestamps as scores for O(log N) stale detection.
func (t *BlockCompletionTracker) pendingBlocksKey(network, processor, mode string) string {
if t.prefix == "" {
return fmt.Sprintf("block_meta:%s:%s:%s:*", processor, network, mode)
return fmt.Sprintf("pending_blocks:%s:%s:%s", processor, network, mode)
}

return fmt.Sprintf("%s:block_meta:%s:%s:%s:*", t.prefix, processor, network, mode)
return fmt.Sprintf("%s:pending_blocks:%s:%s:%s", t.prefix, processor, network, mode)
}

// RegisterBlock initializes tracking for a new block.
Expand All @@ -116,16 +140,24 @@ func (t *BlockCompletionTracker) RegisterBlock(
completedKey := t.completedKey(network, processor, mode, blockNum)
expectedKey := t.expectedKey(network, processor, mode, blockNum)
metaKey := t.metaKey(network, processor, mode, blockNum)
pendingKey := t.pendingBlocksKey(network, processor, mode)

now := time.Now()

pipe := t.redis.Pipeline()
pipe.Del(ctx, completedKey) // Clear old completions
pipe.Set(ctx, expectedKey, expectedCount, DefaultBlockMetaTTL) // Set expected count
pipe.HSet(ctx, metaKey, map[string]any{
"enqueued_at": time.Now().Unix(),
"enqueued_at": now.Unix(),
"queue": queue,
"expected": expectedCount,
})
pipe.Expire(ctx, metaKey, DefaultBlockMetaTTL)
// Add to pending blocks sorted set with timestamp as score for O(log N) stale detection
pipe.ZAdd(ctx, pendingKey, redis.Z{
Score: float64(now.Unix()),
Member: blockNum,
})

_, err := pipe.Exec(ctx)
if err != nil {
Expand All @@ -146,6 +178,7 @@ func (t *BlockCompletionTracker) RegisterBlock(

// TrackTaskCompletion records a task completion and checks if block is done.
// Returns true if all tasks are now complete.
// Uses a Lua script for atomic completion tracking in a single round trip.
func (t *BlockCompletionTracker) TrackTaskCompletion(
ctx context.Context,
taskID string,
Expand All @@ -155,25 +188,31 @@ func (t *BlockCompletionTracker) TrackTaskCompletion(
completedKey := t.completedKey(network, processor, mode, blockNum)
expectedKey := t.expectedKey(network, processor, mode, blockNum)

// Add to completed set (idempotent - same task completing twice is fine)
// Set TTL to ensure cleanup if block never completes
pipe := t.redis.Pipeline()
pipe.SAdd(ctx, completedKey, taskID)
pipe.Expire(ctx, completedKey, DefaultBlockMetaTTL)
// Execute Lua script for atomic task completion tracking
result, err := trackTaskCompletionScript.Run(ctx, t.redis,
[]string{completedKey, expectedKey},
taskID, int(DefaultBlockMetaTTL.Seconds()),
).Slice()
if err != nil {
return false, fmt.Errorf("failed to track task completion: %w", err)
}

if _, err := pipe.Exec(ctx); err != nil {
return false, fmt.Errorf("failed to add task to completed set: %w", err)
if len(result) != 2 {
return false, fmt.Errorf("unexpected result length from Lua script: %d", len(result))
}

// Get counts
completedCount, err := t.redis.SCard(ctx, completedKey).Result()
if err != nil {
return false, fmt.Errorf("failed to get completed count: %w", err)
completedCount, ok := result[0].(int64)
if !ok {
return false, fmt.Errorf("failed to parse completed count from Lua script result")
}

expectedStr, err := t.redis.Get(ctx, expectedKey).Result()
if err == redis.Nil {
// Block not registered - might be old task from before retry, or already cleaned up
expected, ok := result[1].(int64)
if !ok {
return false, fmt.Errorf("failed to parse expected count from Lua script result")
}

// -1 indicates block not registered (expected key doesn't exist)
if expected == -1 {
t.log.WithFields(logrus.Fields{
"block_number": blockNum,
"task_id": taskID,
Expand All @@ -185,15 +224,6 @@ func (t *BlockCompletionTracker) TrackTaskCompletion(
return false, nil
}

if err != nil {
return false, fmt.Errorf("failed to get expected count: %w", err)
}

expected, err := strconv.ParseInt(expectedStr, 10, 64)
if err != nil {
return false, fmt.Errorf("failed to parse expected count: %w", err)
}

t.log.WithFields(logrus.Fields{
"block_number": blockNum,
"task_id": taskID,
Expand Down Expand Up @@ -222,8 +252,13 @@ func (t *BlockCompletionTracker) MarkBlockComplete(
completedKey := t.completedKey(network, processor, mode, blockNum)
expectedKey := t.expectedKey(network, processor, mode, blockNum)
metaKey := t.metaKey(network, processor, mode, blockNum)
pendingKey := t.pendingBlocksKey(network, processor, mode)

if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil {
pipe := t.redis.Pipeline()
pipe.Del(ctx, completedKey, expectedKey, metaKey)
pipe.ZRem(ctx, pendingKey, blockNum)

if _, err := pipe.Exec(ctx); err != nil {
// Log but don't fail - keys will expire anyway
t.log.WithError(err).WithFields(logrus.Fields{
"block_number": blockNum,
Expand All @@ -244,45 +279,85 @@ func (t *BlockCompletionTracker) MarkBlockComplete(
}

// GetStaleBlocks returns blocks that have been processing longer than the stale threshold.
// Uses ZRANGEBYSCORE on the pending blocks sorted set for O(log N + M) complexity.
func (t *BlockCompletionTracker) GetStaleBlocks(
ctx context.Context,
network, processor, mode string,
) ([]uint64, error) {
pattern := t.metaKeyPattern(network, processor, mode)
staleBlocks := make([]uint64, 0)
pendingKey := t.pendingBlocksKey(network, processor, mode)
staleThreshold := time.Now().Add(-t.config.StaleThreshold).Unix()

iter := t.redis.Scan(ctx, 0, pattern, 100).Iterator()
for iter.Next(ctx) {
key := iter.Val()
members, err := t.redis.ZRangeByScore(ctx, pendingKey, &redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%d", staleThreshold),
}).Result()
if err != nil {
return nil, fmt.Errorf("failed to get stale blocks: %w", err)
}

staleBlocks := make([]uint64, 0, len(members))

enqueuedAtStr, err := t.redis.HGet(ctx, key, "enqueued_at").Result()
for _, member := range members {
blockNum, err := strconv.ParseUint(member, 10, 64)
if err != nil {
t.log.WithError(err).WithField("key", key).Debug("Failed to get enqueued_at")
t.log.WithError(err).WithField("member", member).Debug("Failed to parse block number")

continue
}

enqueuedAt, err := strconv.ParseInt(enqueuedAtStr, 10, 64)
if err != nil {
t.log.WithError(err).WithField("key", key).Debug("Failed to parse enqueued_at")
staleBlocks = append(staleBlocks, blockNum)
}

continue
}
return staleBlocks, nil
}

if time.Since(time.Unix(enqueuedAt, 0)) > t.config.StaleThreshold {
// Extract block number from key
blockNum := extractBlockNumFromKey(key)
if blockNum != 0 {
staleBlocks = append(staleBlocks, blockNum)
}
}
// ClearStaleBlocks removes all stale blocks and their associated tracking data.
// Uses ZRANGEBYSCORE to identify stale blocks and a pipeline to efficiently delete all related keys.
// Returns the number of blocks cleared.
func (t *BlockCompletionTracker) ClearStaleBlocks(
ctx context.Context,
network, processor, mode string,
) (int, error) {
staleBlocks, err := t.GetStaleBlocks(ctx, network, processor, mode)
if err != nil {
return 0, fmt.Errorf("failed to get stale blocks: %w", err)
}

if err := iter.Err(); err != nil {
return nil, fmt.Errorf("failed to scan for stale blocks: %w", err)
if len(staleBlocks) == 0 {
return 0, nil
}

return staleBlocks, nil
pendingKey := t.pendingBlocksKey(network, processor, mode)
pipe := t.redis.Pipeline()

// Collect all keys to delete and members to remove from sorted set
members := make([]any, 0, len(staleBlocks))

for _, blockNum := range staleBlocks {
completedKey := t.completedKey(network, processor, mode, blockNum)
expectedKey := t.expectedKey(network, processor, mode, blockNum)
metaKey := t.metaKey(network, processor, mode, blockNum)

pipe.Del(ctx, completedKey, expectedKey, metaKey)

members = append(members, blockNum)
}

// Remove all stale blocks from the sorted set in one operation
pipe.ZRem(ctx, pendingKey, members...)

if _, err := pipe.Exec(ctx); err != nil {
return 0, fmt.Errorf("failed to clear stale blocks: %w", err)
}

t.log.WithFields(logrus.Fields{
"count": len(staleBlocks),
"network": network,
"processor": processor,
"mode": mode,
}).Debug("Cleared stale blocks")

return len(staleBlocks), nil
}

// GetBlockStatus returns the completion status of a block.
Expand Down Expand Up @@ -330,8 +405,13 @@ func (t *BlockCompletionTracker) ClearBlock(
completedKey := t.completedKey(network, processor, mode, blockNum)
expectedKey := t.expectedKey(network, processor, mode, blockNum)
metaKey := t.metaKey(network, processor, mode, blockNum)
pendingKey := t.pendingBlocksKey(network, processor, mode)

if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil {
pipe := t.redis.Pipeline()
pipe.Del(ctx, completedKey, expectedKey, metaKey)
pipe.ZRem(ctx, pendingKey, blockNum)

if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("failed to clear block tracking: %w", err)
}

Expand All @@ -345,26 +425,6 @@ func (t *BlockCompletionTracker) ClearBlock(
return nil
}

// extractBlockNumFromKey extracts the block number from a Redis key.
// Key format: block_meta:{prefix}:{processor}:{network}:{mode}:{blockNum}
// or: block_meta:{processor}:{network}:{mode}:{blockNum}
func extractBlockNumFromKey(key string) uint64 {
parts := strings.Split(key, ":")
if len(parts) < 2 {
return 0
}

// Block number is always the last part
blockNumStr := parts[len(parts)-1]

blockNum, err := strconv.ParseUint(blockNumStr, 10, 64)
if err != nil {
return 0
}

return blockNum
}

// HasBlockTracking checks if a block has Redis tracking data.
// Returns true if block_meta key exists (block is being tracked).
// Used to detect orphaned blocks that are in ClickHouse (complete=0) but have no Redis tracking.
Expand Down
Loading