Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions apps/sim/lib/core/idempotency/cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { idempotencyKey } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt } from 'drizzle-orm'
import { and, count, inArray, like, lt, max, min, sql } from 'drizzle-orm'

const logger = createLogger('IdempotencyCleanup')

Expand All @@ -19,7 +19,8 @@ export interface CleanupOptions {
batchSize?: number

/**
* Specific namespace to clean up, or undefined to clean all namespaces
* Specific namespace prefix to clean up (e.g., 'webhook', 'polling')
* Keys are prefixed with namespace, so this filters by key prefix
*/
namespace?: string
}
Expand Down Expand Up @@ -53,13 +54,17 @@ export async function cleanupExpiredIdempotencyKeys(

while (hasMore) {
try {
// Build where condition - filter by cutoff date and optionally by namespace prefix
const whereCondition = namespace
? and(lt(idempotencyKey.createdAt, cutoffDate), eq(idempotencyKey.namespace, namespace))
? and(
lt(idempotencyKey.createdAt, cutoffDate),
like(idempotencyKey.key, `${namespace}:%`)
)
: lt(idempotencyKey.createdAt, cutoffDate)

// First, find IDs to delete with limit
// Find keys to delete with limit
const toDelete = await db
.select({ key: idempotencyKey.key, namespace: idempotencyKey.namespace })
.select({ key: idempotencyKey.key })
.from(idempotencyKey)
.where(whereCondition)
.limit(batchSize)
Expand All @@ -68,14 +73,13 @@ export async function cleanupExpiredIdempotencyKeys(
break
}

// Delete the found records
// Delete the found records by key
const deleteResult = await db
.delete(idempotencyKey)
.where(
and(
...toDelete.map((item) =>
and(eq(idempotencyKey.key, item.key), eq(idempotencyKey.namespace, item.namespace))
)
inArray(
idempotencyKey.key,
toDelete.map((item) => item.key)
)
)
.returning({ key: idempotencyKey.key })
Expand Down Expand Up @@ -126,6 +130,7 @@ export async function cleanupExpiredIdempotencyKeys(

/**
* Get statistics about idempotency key usage
* Uses SQL aggregations to avoid loading all keys into memory
*/
export async function getIdempotencyKeyStats(): Promise<{
totalKeys: number
Expand All @@ -134,34 +139,35 @@ export async function getIdempotencyKeyStats(): Promise<{
newestKey: Date | null
}> {
try {
const allKeys = await db
// Get total count and date range in a single query
const [statsResult] = await db
.select({
namespace: idempotencyKey.namespace,
createdAt: idempotencyKey.createdAt,
totalKeys: count(),
oldestKey: min(idempotencyKey.createdAt),
newestKey: max(idempotencyKey.createdAt),
})
.from(idempotencyKey)

const totalKeys = allKeys.length
const keysByNamespace: Record<string, number> = {}
let oldestKey: Date | null = null
let newestKey: Date | null = null

for (const key of allKeys) {
keysByNamespace[key.namespace] = (keysByNamespace[key.namespace] || 0) + 1
// Get counts by namespace prefix using SQL substring
// Extracts everything before the first ':' as the namespace
const namespaceStats = await db
.select({
namespace: sql<string>`split_part(${idempotencyKey.key}, ':', 1)`.as('namespace'),
count: count(),
})
.from(idempotencyKey)
.groupBy(sql`split_part(${idempotencyKey.key}, ':', 1)`)

if (!oldestKey || key.createdAt < oldestKey) {
oldestKey = key.createdAt
}
if (!newestKey || key.createdAt > newestKey) {
newestKey = key.createdAt
}
const keysByNamespace: Record<string, number> = {}
for (const row of namespaceStats) {
keysByNamespace[row.namespace || 'unknown'] = row.count
}

return {
totalKeys,
totalKeys: statsResult?.totalKeys ?? 0,
keysByNamespace,
oldestKey,
newestKey,
oldestKey: statsResult?.oldestKey ?? null,
newestKey: statsResult?.newestKey ?? null,
}
} catch (error) {
logger.error('Failed to get idempotency key stats:', error)
Expand Down
31 changes: 8 additions & 23 deletions apps/sim/lib/core/idempotency/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { randomUUID } from 'crypto'
import { db } from '@sim/db'
import { idempotencyKey } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { eq } from 'drizzle-orm'
import { getRedisClient } from '@/lib/core/config/redis'
import { getStorageMethod, type StorageMethod } from '@/lib/core/storage'
import { extractProviderIdentifierFromBody } from '@/lib/webhooks/provider-utils'
Expand Down Expand Up @@ -124,12 +124,7 @@ export class IdempotencyService {
const existing = await db
.select({ result: idempotencyKey.result, createdAt: idempotencyKey.createdAt })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.where(eq(idempotencyKey.key, normalizedKey))
.limit(1)

if (existing.length > 0) {
Expand Down Expand Up @@ -224,11 +219,12 @@ export class IdempotencyService {
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
result: inProgressResult,
createdAt: new Date(),
})
.onConflictDoNothing()
.onConflictDoNothing({
target: [idempotencyKey.key],
})
.returning({ key: idempotencyKey.key })

if (insertResult.length > 0) {
Expand All @@ -243,12 +239,7 @@ export class IdempotencyService {
const existing = await db
.select({ result: idempotencyKey.result })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.where(eq(idempotencyKey.key, normalizedKey))
.limit(1)

const existingResult =
Expand Down Expand Up @@ -280,12 +271,7 @@ export class IdempotencyService {
const existing = await db
.select({ result: idempotencyKey.result })
.from(idempotencyKey)
.where(
and(
eq(idempotencyKey.key, normalizedKey),
eq(idempotencyKey.namespace, this.config.namespace)
)
)
.where(eq(idempotencyKey.key, normalizedKey))
.limit(1)
currentResult = existing.length > 0 ? (existing[0].result as ProcessingResult) : null
}
Expand Down Expand Up @@ -339,12 +325,11 @@ export class IdempotencyService {
.insert(idempotencyKey)
.values({
key: normalizedKey,
namespace: this.config.namespace,
result: result,
createdAt: new Date(),
})
.onConflictDoUpdate({
target: [idempotencyKey.key, idempotencyKey.namespace],
target: [idempotencyKey.key],
set: {
result: result,
createdAt: new Date(),
Expand Down
4 changes: 4 additions & 0 deletions packages/db/migrations/0147_rare_firebrand.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP INDEX "idempotency_key_namespace_unique";--> statement-breakpoint
DROP INDEX "idempotency_key_namespace_idx";--> statement-breakpoint
ALTER TABLE "idempotency_key" ADD PRIMARY KEY ("key");--> statement-breakpoint
ALTER TABLE "idempotency_key" DROP COLUMN "namespace";
Loading