diff --git a/.changeset/dull-candies-cheer.md b/.changeset/dull-candies-cheer.md new file mode 100644 index 000000000..17843b348 --- /dev/null +++ b/.changeset/dull-candies-cheer.md @@ -0,0 +1,5 @@ +--- +"@exactly/server": patch +--- + +✨ add maturity check and worker diff --git a/server/index.ts b/server/index.ts index b08cfa558..3470f97b7 100644 --- a/server/index.ts +++ b/server/index.ts @@ -18,6 +18,11 @@ import bridge from "./hooks/bridge"; import manteca from "./hooks/manteca"; import panda from "./hooks/panda"; import persona from "./hooks/persona"; +import { + close as closeMaturityQueue, + initializeWorker as initializeMaturityWorker, + scheduleMaturityChecks, +} from "./queues/maturityQueue"; import androidFingerprints from "./utils/android/fingerprints"; import appOrigin from "./utils/appOrigin"; import { closeQueue as closeAccountQueue } from "./utils/createCredential"; @@ -323,7 +328,13 @@ const server = serve(app); export async function close() { return new Promise((resolve, reject) => { server.close((error) => { - Promise.allSettled([closeSentry(), closeSegment(), database.$client.end(), closeAccountQueue()]) + Promise.allSettled([ + closeSentry(), + closeSegment(), + database.$client.end(), + closeMaturityQueue(), + closeAccountQueue(), + ]) .then(async (results) => { await closeRedis(); if (error) reject(error); @@ -336,6 +347,11 @@ export async function close() { } if (!process.env.VITEST) { + initializeMaturityWorker(); + scheduleMaturityChecks().catch((error: unknown) => { + captureException(error, { level: "error", tags: { unhandled: true } }); + }); + ["SIGINT", "SIGTERM"].map((code) => { process.on(code, () => { close() diff --git a/server/queues/maturityQueue.ts b/server/queues/maturityQueue.ts new file mode 100644 index 000000000..42b389966 --- /dev/null +++ b/server/queues/maturityQueue.ts @@ -0,0 +1,236 @@ +import { SPAN_STATUS_ERROR } from "@sentry/core"; +import { addBreadcrumb, captureException, startSpan, type Span } from "@sentry/node"; +import { Queue, Worker, type Job } from "bullmq"; +import * as v from "valibot"; + +import { previewerAbi, previewerAddress } from "@exactly/common/generated/chain"; +import { MATURITY_INTERVAL } from "@exactly/lib"; + +import database, { credentials } from "../database"; +import { sendPushNotification } from "../utils/onesignal"; +import publicClient from "../utils/publicClient"; +import { queue as redis } from "../utils/redis"; + +const QUEUE_NAME = "maturity-notifications"; + +export const MaturityJob = { + CHECK_DEBTS: "check-debts", +} as const; + +let _maturityQueue: Queue | undefined; + +export function getMaturityQueue(): Queue { + _maturityQueue ??= new Queue(QUEUE_NAME, { + connection: redis, + defaultJobOptions: { + attempts: 3, + backoff: { type: "exponential", delay: 1000 }, + removeOnComplete: true, + removeOnFail: true, + }, + }); + return _maturityQueue; +} + +type DebtCheckResult = { + accounts: { account: string; hasDebt: boolean }[]; + contractCalls: number; +}; + +const checkDebtsSchema = v.object({ + maturity: v.number(), + window: v.picklist(["1h", "24h"]), +}); + +export type CheckDebtsData = v.InferOutput; + +export async function processor(job: Job) { + return startSpan( + { name: "maturity.processor", op: "queue.process", attributes: { job: job.name, ...job.data } }, + async (span: Span) => { + switch (job.name) { + case MaturityJob.CHECK_DEBTS: { + const parseResult = v.safeParse(checkDebtsSchema, job.data); + if (!parseResult.success) { + captureException(parseResult.issues, { extra: { jobData: job.data } }); + return; + } + const { maturity, window } = parseResult.output; + try { + const CHUNK_SIZE = 50; + let totalContractCalls = 0; + let totalAccountsProcessed = 0; + + for (let offset = 0; ; offset += CHUNK_SIZE) { + const chunk = await database + .select({ account: credentials.account }) + .from(credentials) + .orderBy(credentials.account) + .limit(CHUNK_SIZE) + .offset(offset); + if (chunk.length === 0) break; + totalAccountsProcessed += chunk.length; + try { + const results = await checkDebts(chunk, maturity); + + totalContractCalls += results.contractCalls; + const notifications: Promise[] = []; + for (const { account, hasDebt } of results.accounts) { + if (hasDebt) { + notifications.push( + redis + .set( + `notification:sent:${account}:${maturity}:${window}`, + String(Date.now()), + "EX", + 86_400, + "NX", + ) + .then((r) => { + if (r === "OK") { + return sendPushNotification({ + userId: account, + headings: { en: "Debt Maturity Alert" }, + contents: { + en: `Your debt is due in ${window === "24h" ? "24 hours" : "1 hour"}. Repay now to avoid liquidation.`, + }, + }); + } + }), + ); + } + } + + for (const result of await Promise.allSettled(notifications)) { + if (result.status === "rejected") captureException(result.reason, { level: "error" }); + } + } catch (error: unknown) { + captureException(error); + } + if (chunk.length < CHUNK_SIZE) break; + } + addBreadcrumb({ + category: "maturity-queue", + message: `processed ${String(totalAccountsProcessed)} accounts`, + level: "info", + data: { + totalContractCalls, + accountsProcessed: totalAccountsProcessed, + callsPerAccount: totalAccountsProcessed > 0 ? totalContractCalls / totalAccountsProcessed : 0, + }, + }); + } finally { + if (window === "1h") { + await scheduleMaturityChecks(maturity); + } + } + break; + } + default: { + const message = `Unknown job name: ${job.name}`; + span.setStatus({ code: SPAN_STATUS_ERROR, message }); + throw new Error(message); + } + } + }, + ); +} + +async function checkDebts(chunk: { account: string }[], maturity: number): Promise { + const promises = chunk.map(({ account }) => + publicClient.readContract({ + address: previewerAddress, + abi: previewerAbi, + functionName: "exactly", + args: [account as `0x${string}`], + }), + ); + + const results = await Promise.allSettled(promises); + const accounts: DebtCheckResult["accounts"] = []; + + for (const [index, result] of results.entries()) { + const entry = chunk[index]; + if (!entry) continue; + const { account } = entry; + if (result.status === "rejected") { + captureException(result.reason, { extra: { account } }); + continue; + } + const hasDebt = result.value.some((market) => + market.fixedBorrowPositions.some((p) => p.maturity === BigInt(maturity) && p.position.principal > 0n), + ); + accounts.push({ account, hasDebt }); + } + + return { accounts, contractCalls: chunk.length }; +} + +let maturityWorker: undefined | Worker; + +export function initializeWorker(): void { + if (maturityWorker) return; + + try { + maturityWorker = new Worker(QUEUE_NAME, processor, { connection: redis }); + } catch (error) { + captureException(error, { level: "error", tags: { queue: QUEUE_NAME, phase: "initialization" } }); + return; + } + + maturityWorker + .on("failed", (job: Job | undefined, error: Error) => { + captureException(error, { level: "error", extra: { job: job?.data } }); + }) + .on("completed", (job: Job) => { + addBreadcrumb({ + category: "queue", + message: `Job ${job.id} completed`, + level: "info", + data: { job: job.data }, + }); + }) + .on("active", (job: Job) => { + addBreadcrumb({ + category: "queue", + message: `Job ${job.id} active`, + level: "info", + data: { job: job.data }, + }); + }) + .on("error", (error: Error) => { + captureException(error, { tags: { queue: QUEUE_NAME } }); + }); +} + +export async function close() { + await Promise.all([maturityWorker?.close() ?? Promise.resolve(), _maturityQueue?.close() ?? Promise.resolve()]); + maturityWorker = undefined; + _maturityQueue = undefined; +} + +export async function scheduleMaturityChecks(afterMaturity?: number) { + const now = Math.floor(Date.now() / 1000); + const nextMaturity = + afterMaturity === undefined + ? now - (now % MATURITY_INTERVAL) + MATURITY_INTERVAL + : afterMaturity + MATURITY_INTERVAL; + + await getMaturityQueue().add( + MaturityJob.CHECK_DEBTS, + { maturity: nextMaturity, window: "24h" }, + { + jobId: `check-debts-${nextMaturity}-24h`, + delay: Math.max(0, (nextMaturity - 24 * 3600 - now) * 1000), + }, + ); + + await getMaturityQueue().add( + MaturityJob.CHECK_DEBTS, + { maturity: nextMaturity, window: "1h" }, + { + jobId: `check-debts-${nextMaturity}-1h`, + delay: Math.max(0, (nextMaturity - 3600 - now) * 1000), + }, + ); +} diff --git a/server/test/queues/maturityQueue.test.ts b/server/test/queues/maturityQueue.test.ts new file mode 100644 index 000000000..ba6e08c56 --- /dev/null +++ b/server/test/queues/maturityQueue.test.ts @@ -0,0 +1,232 @@ +import "../mocks/onesignal"; +import "../mocks/sentry"; + +import { Redis } from "ioredis"; +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from "vitest"; + +import { previewerAbi, previewerAddress } from "@exactly/common/generated/chain"; +import { MATURITY_INTERVAL } from "@exactly/lib"; + +import { close, getMaturityQueue, MaturityJob, processor, scheduleMaturityChecks } from "../../queues/maturityQueue"; +import * as onesignal from "../../utils/onesignal"; +import { closeRedis } from "../../utils/redis"; + +import type { CheckDebtsData } from "../../queues/maturityQueue"; +import type { Job } from "bullmq"; + +const mocks = vi.hoisted(() => ({ + select: vi.fn(), + readContract: vi.fn(), +})); + +vi.mock("../../database", () => ({ + default: { select: mocks.select }, + credentials: { account: "account" }, +})); + +vi.mock("../../utils/publicClient", () => ({ + default: { readContract: mocks.readContract }, +})); + +function mockAccounts(accounts: { account: string }[]) { + const offsetMock = vi.fn().mockResolvedValueOnce(accounts).mockResolvedValueOnce([]); + const limitMock = vi.fn().mockReturnValue({ offset: offsetMock }); + const orderByMock = vi.fn().mockReturnValue({ limit: limitMock }); + const fromMock = vi.fn().mockReturnValue({ orderBy: orderByMock }); + mocks.select.mockReturnValue({ from: fromMock }); +} + +function mockExactly( + result: { fixedBorrowPositions: { maturity: bigint; position: { fee: bigint; principal: bigint } }[] }[], +) { + mocks.readContract.mockResolvedValue(result); +} + +let testRedis: Redis; + +describe("worker", () => { + const sendPushNotification = vi.spyOn(onesignal, "sendPushNotification"); + + beforeAll(() => { + if (!process.env.REDIS_URL) throw new Error("missing REDIS_URL"); + testRedis = new Redis(process.env.REDIS_URL); + }); + + afterAll(async () => { + await close(); + await closeRedis(); + await testRedis.quit(); + }); + + beforeEach(async () => { + vi.clearAllMocks(); + await testRedis.flushdb(); + }); + + it("schedules maturity checks", async () => { + await scheduleMaturityChecks(); + const jobs = await getMaturityQueue().getJobs(["delayed", "waiting"]); + expect(jobs).toHaveLength(2); + expect(jobs.map((index) => (index.data as CheckDebtsData).window).toSorted()).toStrictEqual(["1h", "24h"]); + }); + + it("processes check-debts job with debt in single market", async () => { + const account = "0x1234567890123456789012345678901234567890"; + const maturity = 1_234_567_890; + const window = "24h"; + + mockAccounts([{ account }]); + mockExactly([{ fixedBorrowPositions: [{ maturity: BigInt(maturity), position: { principal: 100n, fee: 0n } }] }]); + + await processor({ + name: MaturityJob.CHECK_DEBTS, + data: { maturity, window }, + } as unknown as Job); + + expect(mocks.readContract).toHaveBeenCalledWith({ + address: previewerAddress, + abi: previewerAbi, + functionName: "exactly", + args: [account], + }); + const key = `notification:sent:${account}:${String(maturity)}:${window}`; + const value = await testRedis.get(key); + expect(value).not.toBeNull(); + const ttl = await testRedis.ttl(key); + expect(ttl).toBeGreaterThan(0); + expect(ttl).toBeLessThanOrEqual(86_400); + expect(sendPushNotification).toHaveBeenCalledWith({ + userId: account, + headings: { en: "Debt Maturity Alert" }, + contents: { en: "Your debt is due in 24 hours. Repay now to avoid liquidation." }, + }); + }); + + it("handles duplicate notification", async () => { + const account = "0x1234567890123456789012345678901234567890"; + const maturity = 1_234_567_890; + const window = "24h"; + + mockAccounts([{ account }]); + await testRedis.set(`notification:sent:${account}:${String(maturity)}:${window}`, String(Date.now()), "EX", 86_400); + + mockExactly([{ fixedBorrowPositions: [{ maturity: BigInt(maturity), position: { principal: 100n, fee: 0n } }] }]); + + await processor({ + name: MaturityJob.CHECK_DEBTS, + data: { maturity, window }, + } as unknown as Job); + + expect(sendPushNotification).not.toHaveBeenCalled(); + }); + + it("handles position with principal = 0", async () => { + const account = "0x1234567890123456789012345678901234567890"; + const maturity = 1_234_567_890; + + mockAccounts([{ account }]); + mockExactly([{ fixedBorrowPositions: [{ maturity: BigInt(maturity), position: { principal: 0n, fee: 0n } }] }]); + + await processor({ + name: MaturityJob.CHECK_DEBTS, + data: { maturity, window: "24h" }, + } as unknown as Job); + + const keys = await testRedis.keys("notification:sent:*"); + expect(keys).toHaveLength(0); + expect(sendPushNotification).not.toHaveBeenCalled(); + }); + + it("handles account with no debt in any market", async () => { + const account = "0x1234567890123456789012345678901234567890"; + const maturity = 1_234_567_890; + + mockAccounts([{ account }]); + mockExactly([{ fixedBorrowPositions: [] }, { fixedBorrowPositions: [] }]); + + await processor({ + name: MaturityJob.CHECK_DEBTS, + data: { maturity, window: "24h" }, + } as unknown as Job); + + const keys = await testRedis.keys("notification:sent:*"); + expect(keys).toHaveLength(0); + expect(sendPushNotification).not.toHaveBeenCalled(); + }); + + it("detects debt across any market", async () => { + const account = "0x1234567890123456789012345678901234567890"; + const maturity = 1_234_567_890; + const window = "24h"; + + mockAccounts([{ account }]); + mockExactly([ + { fixedBorrowPositions: [] }, + { fixedBorrowPositions: [] }, + { fixedBorrowPositions: [{ maturity: BigInt(maturity), position: { principal: 50n, fee: 0n } }] }, + ]); + + await processor({ + name: MaturityJob.CHECK_DEBTS, + data: { maturity, window }, + } as unknown as Job); + + const key = `notification:sent:${account}:${String(maturity)}:${window}`; + const value = await testRedis.get(key); + expect(value).not.toBeNull(); + expect(sendPushNotification).toHaveBeenCalledWith({ + userId: account, + headings: { en: "Debt Maturity Alert" }, + contents: { en: "Your debt is due in 24 hours. Repay now to avoid liquidation." }, + }); + }); + + it("handles exactly() call failure", async () => { + const { captureException } = await import("@sentry/node"); + const account = "0x1234567890123456789012345678901234567890"; + const maturity = 1_234_567_890; + + mockAccounts([{ account }]); + mocks.readContract.mockRejectedValue(new Error("rpc error")); + + await processor({ + name: MaturityJob.CHECK_DEBTS, + data: { maturity, window: "24h" }, + } as unknown as Job); + + expect(captureException).toHaveBeenCalledWith(expect.any(Error), { extra: { account } }); + const keys = await testRedis.keys("notification:sent:*"); + expect(keys).toHaveLength(0); + expect(sendPushNotification).not.toHaveBeenCalled(); + }); + + it("should throw an error for unknown job names", async () => { + const job = { name: "unknown", data: {} } as unknown as Job; + await expect(processor(job)).rejects.toThrow("Unknown job name: unknown"); + }); + + it("reschedules on 1h window", async () => { + const jobMaturity = MATURITY_INTERVAL * 10; + + const offsetMock = vi.fn().mockResolvedValueOnce([]); + const limitMock = vi.fn().mockReturnValue({ offset: offsetMock }); + const orderByMock = vi.fn().mockReturnValue({ limit: limitMock }); + const fromMock = vi.fn().mockReturnValue({ orderBy: orderByMock }); + mocks.select.mockReturnValue({ from: fromMock }); + + await processor({ + name: MaturityJob.CHECK_DEBTS, + data: { maturity: jobMaturity, window: "1h" }, + } as unknown as Job); + + const expectedNextMaturity = jobMaturity + MATURITY_INTERVAL; + const jobs = await getMaturityQueue().getJobs(["delayed", "waiting"]); + expect(jobs).toHaveLength(2); + const windows = jobs.map((index) => { + const data = index.data as CheckDebtsData; + return { maturity: data.maturity, window: data.window }; + }); + expect(windows).toContainEqual({ maturity: expectedNextMaturity, window: "24h" }); + expect(windows).toContainEqual({ maturity: expectedNextMaturity, window: "1h" }); + }); +});