Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/browser-sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@reflag/browser-sdk",
"version": "1.4.4",
"version": "1.4.6",
"packageManager": "yarn@4.1.1",
"license": "MIT",
"repository": {
Expand Down
234 changes: 17 additions & 217 deletions packages/browser-sdk/src/bulkQueue.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
import { logResponseError } from "./utils/responseError";
import {
BULK_QUEUE_FLUSH_DELAY_MS,
BULK_QUEUE_MAX_SIZE,
BULK_QUEUE_RETRY_BASE_DELAY_MS,
BULK_QUEUE_RETRY_MAX_DELAY_MS,
} from "./config";
import { BULK_QUEUE_FLUSH_DELAY_MS, BULK_QUEUE_MAX_SIZE } from "./config";
import { Logger } from "./logger";

const BULK_QUEUE_STORAGE_KEY = "__reflag_bulk_queue_v1";
const WARN_AFTER_CONSECUTIVE_FAILURES = 10;
const WARN_AFTER_FAILURE_MS = 5 * 60 * 1000;
const WARN_THROTTLE_MS = 15 * 60 * 1000;
const DROP_ERROR_THROTTLE_MS = 15 * 60 * 1000;

type PayloadContext = {
Expand Down Expand Up @@ -61,84 +52,19 @@ export type BulkEvent =
export type BulkQueueOptions = {
flushDelayMs?: number;
maxSize?: number;
retryBaseDelayMs?: number;
retryMaxDelayMs?: number;
storageKey?: string;
logger?: Logger;
};

function getSessionStorage(): Storage | null {
try {
if (typeof sessionStorage === "undefined") {
return null;
}
return sessionStorage;
} catch {
return null;
}
}

function isObject(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}

function isBulkEvent(value: unknown): value is BulkEvent {
if (!isObject(value) || typeof value.type !== "string") {
return false;
}

if (value.type === "user") {
return typeof value.userId === "string";
}

if (value.type === "company") {
return typeof value.companyId === "string";
}

if (value.type === "event") {
return typeof value.userId === "string" && typeof value.event === "string";
}

if (value.type === "feature-flag-event") {
return (
typeof value.key === "string" &&
(value.action === "check-is-enabled" || value.action === "check-config")
);
}

if (value.type === "prompt-event") {
return (
typeof value.featureId === "string" &&
typeof value.promptId === "string" &&
typeof value.userId === "string" &&
typeof value.promptedQuestion === "string" &&
(value.action === "received" ||
value.action === "shown" ||
value.action === "dismissed")
);
}

return false;
}

export class BulkQueue {
private readonly flushDelayMs: number;
private readonly maxSize: number;
private readonly retryBaseDelayMs: number;
private readonly retryMaxDelayMs: number;
private readonly storageKey: string;
private readonly storage: Storage | null;
private readonly logger?: Logger;
private readonly sendBulk: (events: BulkEvent[]) => Promise<Response>;

private queue: BulkEvent[] = [];
private timer: ReturnType<typeof setTimeout> | null = null;
private inFlightBatch: BulkEvent[] | null = null;
private inFlightPromise: Promise<number | null> | null = null;
private retryCount = 0;
private consecutiveFailures = 0;
private firstFailureAt: number | null = null;
private lastWarnAt: number | null = null;
private inFlightPromise: Promise<void> | null = null;
private lastDropErrorAt: number | null = null;
private totalDroppedEvents = 0;
private droppedSinceLastError = 0;
Expand All @@ -150,24 +76,12 @@ export class BulkQueue {
this.sendBulk = sendBulk;
this.flushDelayMs = opts.flushDelayMs ?? BULK_QUEUE_FLUSH_DELAY_MS;
this.maxSize = opts.maxSize ?? BULK_QUEUE_MAX_SIZE;
this.retryBaseDelayMs =
opts.retryBaseDelayMs ?? BULK_QUEUE_RETRY_BASE_DELAY_MS;
this.retryMaxDelayMs =
opts.retryMaxDelayMs ?? BULK_QUEUE_RETRY_MAX_DELAY_MS;
this.storageKey = opts.storageKey ?? BULK_QUEUE_STORAGE_KEY;
this.storage = getSessionStorage();
this.logger = opts.logger;

this.restoreQueueFromStorage();
if (this.queue.length > 0) {
this.schedule(this.flushDelayMs);
}
}

async enqueue(event: BulkEvent) {
this.queue.push(event);
this.trimPendingQueueToCapacity();
this.persistQueueToStorage();

const maxPending = Math.max(0, this.maxSize - this.getInFlightBatchSize());
if (this.queue.length > 0 && this.queue.length >= maxPending) {
Expand Down Expand Up @@ -198,35 +112,24 @@ export class BulkQueue {

const sendPromise = this.sendBatch(batch);
this.inFlightPromise = sendPromise;
let nextDelayMs: number | null = null;
try {
nextDelayMs = await sendPromise;
await sendPromise;
} finally {
if (this.inFlightPromise === sendPromise) {
this.inFlightPromise = null;
}
this.inFlightBatch = null;
this.persistQueueToStorage();
}

if (this.queue.length > 0 && !this.timer && nextDelayMs !== null) {
this.schedule(nextDelayMs);
if (this.queue.length > 0 && !this.timer) {
this.schedule(this.flushDelayMs);
}
}

async size() {
return this.queue.length + this.getInFlightBatchSize();
}

private getRetryDelay() {
const maxExponent = 6;
const exponent = Math.min(this.retryCount - 1, maxExponent);
return Math.min(
this.retryBaseDelayMs * 2 ** exponent,
this.retryMaxDelayMs,
);
}

private schedule(delayMs: number) {
if (this.timer || this.inFlightPromise || this.queue.length === 0) {
return;
Expand All @@ -244,130 +147,27 @@ export class BulkQueue {
}

private async sendBatch(batch: BulkEvent[]) {
let nextDelayMs: number | null = null;

let res: Response;
try {
const res = await this.sendBulk(batch);
if (!res.ok) {
if (res.status >= 400 && res.status < 500) {
this.retryCount = 0;
this.firstFailureAt = null;
this.consecutiveFailures = 0;
this.lastWarnAt = null;
if (this.logger) {
await logResponseError({
logger: this.logger,
res,
message:
"bulk request failed with non-retriable status; dropping batch",
});
}
nextDelayMs = this.flushDelayMs;
} else {
throw new Error(`unexpected status ${res.status}`);
}
} else {
this.retryCount = 0;
if (this.firstFailureAt !== null && this.consecutiveFailures > 0) {
this.logger?.info("bulk delivery recovered", {
outageMs: Date.now() - this.firstFailureAt,
failedAttempts: this.consecutiveFailures,
});
}
this.firstFailureAt = null;
this.consecutiveFailures = 0;
this.lastWarnAt = null;
nextDelayMs = this.flushDelayMs;
}
res = await this.sendBulk(batch);
} catch (error) {
this.queue = batch.concat(this.queue);

const now = Date.now();
if (this.firstFailureAt === null) {
this.firstFailureAt = now;
}
this.consecutiveFailures += 1;
this.retryCount += 1;
const retryInMs = this.getRetryDelay();
nextDelayMs = retryInMs;
this.logger?.info("bulk retry scheduled", {
retryInMs,
queueSize: this.queue.length + this.getInFlightBatchSize(),
consecutiveFailures: this.consecutiveFailures,
this.logger?.error("bulk request failed; dropping batch", {
error,
batchSize: batch.length,
});

const outageMs = now - this.firstFailureAt;
const shouldWarn =
this.consecutiveFailures >= WARN_AFTER_CONSECUTIVE_FAILURES ||
outageMs >= WARN_AFTER_FAILURE_MS;
const canWarnNow =
this.lastWarnAt === null || now - this.lastWarnAt >= WARN_THROTTLE_MS;
if (shouldWarn && canWarnNow) {
this.logger?.warn("bulk delivery degraded", {
consecutiveFailures: this.consecutiveFailures,
outageMs,
queueSize: this.queue.length + this.getInFlightBatchSize(),
retryInMs,
error,
});
this.lastWarnAt = now;
}
}

return nextDelayMs;
}

private getPersistedQueue() {
const inFlight = this.inFlightBatch ?? [];
return inFlight.concat(this.queue).slice(-this.maxSize);
}

private persistQueueToStorage() {
if (!this.storage) {
return;
}

try {
const persisted = this.getPersistedQueue();
if (persisted.length === 0) {
this.storage.removeItem(this.storageKey);
return;
if (!res.ok) {
if (this.logger) {
await logResponseError({
logger: this.logger,
res,
message: "bulk request failed; dropping batch",
});
}

this.storage.setItem(this.storageKey, JSON.stringify(persisted));
} catch {
// ignore persistence failures
}
}

private restoreQueueFromStorage() {
if (!this.storage) {
return;
}

try {
const raw = this.storage.getItem(this.storageKey);
if (!raw) {
return;
}

const parsed: unknown = JSON.parse(raw);
if (!Array.isArray(parsed)) {
throw new Error("invalid stored bulk queue");
}

this.queue = parsed.filter(isBulkEvent).slice(-this.maxSize);
if (this.queue.length === 0) {
this.storage.removeItem(this.storageKey);
}
} catch {
this.queue = [];
try {
this.storage.removeItem(this.storageKey);
} catch {
// ignore cleanup failures
}
}
}

private getInFlightBatchSize() {
Expand Down
34 changes: 24 additions & 10 deletions packages/browser-sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,7 @@ export type InitOptions = ReflagDeprecatedContext & {
/**
* Queue settings for tracking updates sent to `/bulk`.
* Applies to user/company updates, check events, and prompt events.
* Queue data is persisted in `sessionStorage` and restored on reloads
* within the same browser tab.
* Events are buffered in memory and flushed in the background.
*/
trackingQueue?: {
/**
Expand All @@ -329,14 +328,12 @@ export type InitOptions = ReflagDeprecatedContext & {
maxSize?: number;

/**
* Base retry delay in milliseconds after a failed bulk request.
* Defaults to 5000ms.
* Deprecated: retries are no longer performed for bulk delivery.
*/
retryBaseDelayMs?: number;

/**
* Maximum retry delay in milliseconds after repeated failures.
* Defaults to 60000ms.
* Deprecated: retries are no longer performed for bulk delivery.
*/
retryMaxDelayMs?: number;
};
Expand Down Expand Up @@ -432,6 +429,7 @@ export class ReflagClient {
private autoFeedbackInit: Promise<void> | undefined;
private readonly flagsClient: FlagsClient;
private readonly bulkQueue: BulkQueue | undefined;
private readonly handleBeforeUnload?: () => void;

public readonly logger: Logger;

Expand Down Expand Up @@ -476,17 +474,27 @@ export class ReflagClient {
});
if (!this.config.offline && this.config.enableTracking) {
this.bulkQueue = new BulkQueue(
(events) => this.httpClient.post({ path: "/bulk", body: events }),
(events) =>
this.httpClient.post({
path: "/bulk",
body: events,
keepalive: true,
}),
{
flushDelayMs: opts.trackingQueue?.flushDelayMs,
maxSize: opts.trackingQueue?.maxSize,
retryBaseDelayMs: opts.trackingQueue?.retryBaseDelayMs,
retryMaxDelayMs: opts.trackingQueue?.retryMaxDelayMs,
storageKey: `__reflag_bulk_queue_v1:${this.config.apiBaseUrl}:${this.publishableKey}`,
logger: this.logger,
},
);
}
if (this.bulkQueue && !IS_SERVER) {
this.handleBeforeUnload = () => {
void this.bulkQueue?.flush();
};
window.addEventListener("beforeunload", this.handleBeforeUnload, {
capture: true,
});
}

const bulkQueue = this.bulkQueue;

Expand Down Expand Up @@ -597,6 +605,12 @@ export class ReflagClient {
*
**/
async stop() {
if (this.handleBeforeUnload && !IS_SERVER) {
window.removeEventListener("beforeunload", this.handleBeforeUnload, {
capture: true,
});
}

if (this.bulkQueue) {
await this.bulkQueue.flush();
let remaining = await this.bulkQueue.size();
Expand Down
Loading
Loading