From 0aede81917597da9d5def785f2917ec542095853 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Wed, 11 Mar 2026 14:12:33 +0100 Subject: [PATCH 1/4] refactor(flagd): replace Thread.sleep with ScheduledExecutorService in SyncStreamQueueSource - Add ScheduledExecutorService field (flagd-sync-retry-scheduler daemon thread) - Replace Thread.sleep(maxBackoffMs) throttle with scheduler.schedule() - When throttle is needed, schedule the next observeSyncStream() invocation and return from the current one (non-blocking) - Use retryScheduler.execute() in init() instead of a raw daemon thread - Shut down retryScheduler in shutdown() via shutdownNow() - Handle RejectedExecutionException on schedule (race with shutdown) - Restore Thread.currentThread().interrupt() on InterruptedException Closes #1659 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Simon Schrottner --- .../connector/sync/SyncStreamQueueSource.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 5d245a764..d2968950c 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -21,7 +21,10 @@ import io.grpc.stub.StreamObserver; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -48,6 +51,11 @@ public class SyncStreamQueueSource implements QueueSource { private final FlagdOptions options; private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final List fatalStatusCodes; + private final ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "flagd-sync-retry-scheduler"); + t.setDaemon(true); + return t; + }); private volatile GrpcComponents grpcComponents; /** @@ -143,9 +151,7 @@ public synchronized void reinitializeChannelComponents() { /** Initialize sync stream connector. */ public void init() throws Exception { - Thread listener = new Thread(this::observeSyncStream); - listener.setDaemon(true); - listener.start(); + retryScheduler.execute(this::observeSyncStream); } /** Get blocking queue to obtain payloads exposed by this connector. */ @@ -167,6 +173,7 @@ public void shutdown() throws InterruptedException { return; } + retryScheduler.shutdownNow(); grpcComponents.channelConnector.shutdown(); } @@ -181,12 +188,12 @@ private void observeSyncStream() { try { if (shouldThrottle.getAndSet(false)) { log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); - Thread.sleep(this.maxBackoffMs); - - // Check shutdown again after sleep to avoid unnecessary work - if (shutdown.get()) { - break; + try { + retryScheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + log.debug("Retry scheduling rejected, most likely shutdown was invoked", e); } + return; } log.debug("Initializing sync stream request"); @@ -228,7 +235,8 @@ private void observeSyncStream() { shouldThrottle.set(true); } } catch (InterruptedException ie) { - log.debug("Stream loop interrupted, most likely shutdown was invoked", ie); + log.debug("Stream observer interrupted, most likely shutdown was invoked", ie); + Thread.currentThread().interrupt(); } } From 194eaf2762e7ec7f4af2833946ec3d027afe8666 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Wed, 11 Mar 2026 14:58:11 +0100 Subject: [PATCH 2/4] refactor(flagd): await scheduler termination before channel shutdown Add retryScheduler.awaitTermination(deadline, MILLISECONDS) after shutdownNow() to ensure the scheduler thread has fully stopped before the gRPC channel is torn down, preventing race conditions during shutdown. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Signed-off-by: Simon Schrottner --- .../providers/flagd/resolver/process/InProcessResolver.java | 1 + .../process/storage/connector/sync/SyncStreamQueueSource.java | 1 + 2 files changed, 2 insertions(+) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index c0a914052..feca49baf 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -185,4 +185,5 @@ static QueueSource getQueueSource(final FlagdOptions options) { ? new FileQueueSource(options.getOfflineFlagSourcePath(), options.getOfflinePollIntervalMs()) : new SyncStreamQueueSource(options); } + } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index d2968950c..982e556a1 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -174,6 +174,7 @@ public void shutdown() throws InterruptedException { } retryScheduler.shutdownNow(); + retryScheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS); grpcComponents.channelConnector.shutdown(); } From 19602d760004e31c010edf8612e59a4447fed230 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Thu, 12 Mar 2026 17:45:35 +0100 Subject: [PATCH 3/4] fix(flagd): enqueue SHUTDOWN before calling shutdown() to prevent race When observeSyncStream() (running on the retryScheduler thread) detects a fatal gRPC status code, it calls shutdown() which in turn calls retryScheduler.shutdownNow(). This interrupts the current thread. The subsequent retryScheduler.awaitTermination() call then throws InterruptedException immediately (thread is already interrupted), causing the enqueue(QueuePayload.SHUTDOWN) call that followed to be skipped. Without the SHUTDOWN payload in the queue, the FlagStore never emits StorageState.ERROR and the provider never transitions to FATAL state, causing the 'Provider forbidden' test to see ERROR instead of FATAL. Fix: enqueue QueuePayload.SHUTDOWN *before* calling shutdown() in both fatal-status-code paths so the payload is guaranteed to reach the queue regardless of whether shutdown() subsequently throws. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../process/storage/connector/sync/SyncStreamQueueSource.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 982e556a1..474be5c2f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -206,8 +206,8 @@ private void observeSyncStream() { log.info( "Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); - shutdown(); enqueue(QueuePayload.SHUTDOWN); + shutdown(); } else { // retry for other status codes String message = metaEx.getMessage(); @@ -226,8 +226,8 @@ private void observeSyncStream() { log.info( "Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); - shutdown(); enqueue(QueuePayload.SHUTDOWN); + shutdown(); } else { // retry for other status codes log.error("Unexpected sync stream exception, will restart.", ex); From 5eec9b98aca9b75b7a81e930feb187c4e8f860f5 Mon Sep 17 00:00:00 2001 From: Simon Schrottner Date: Thu, 12 Mar 2026 18:27:08 +0100 Subject: [PATCH 4/4] style(flagd): apply spotless formatting Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../resolver/process/InProcessResolver.java | 1 - .../main/resources/flagd/schemas/flags.json | 205 ++++++++++++------ 2 files changed, 142 insertions(+), 64 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java index feca49baf..c0a914052 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java @@ -185,5 +185,4 @@ static QueueSource getQueueSource(final FlagdOptions options) { ? new FileQueueSource(options.getOfflineFlagSourcePath(), options.getOfflinePollIntervalMs()) : new SyncStreamQueueSource(options); } - } diff --git a/tools/flagd-core/src/main/resources/flagd/schemas/flags.json b/tools/flagd-core/src/main/resources/flagd/schemas/flags.json index 6e045b654..cff1aab81 100644 --- a/tools/flagd-core/src/main/resources/flagd/schemas/flags.json +++ b/tools/flagd-core/src/main/resources/flagd/schemas/flags.json @@ -1,11 +1,9 @@ { "$id": "https://flagd.dev/schema/v0/flags.json", "$schema": "http://json-schema.org/draft-07/schema#", - "title": "flagd Flag Configuration", - "description": "Defines flags for use in flagd, including typed variants and rules.", - "type": "object", - "properties": { - "flags": { + "$ref": "#/definitions/providerConfig", + "definitions": { + "flagsMap": { "title": "Flags", "description": "Top-level flags object. All flags are defined here.", "type": "object", @@ -13,61 +11,113 @@ "additionalProperties": false, "patternProperties": { "^.{1,}$": { - "oneOf": [ - { - "title": "Boolean flag", - "description": "A flag having boolean values.", - "$ref": "#/definitions/booleanFlag" - }, - { - "title": "String flag", - "description": "A flag having string values.", - "$ref": "#/definitions/stringFlag" + "$ref": "#/definitions/anyFlag" + } + } + }, + "flagsArray": { + "title": "Flags", + "description": "Top-level flags array. All flags are defined here.", + "type": "array", + "items": { + "allOf": [ + { + "$ref": "#/definitions/anyFlag" + }, + { + "type": "object", + "properties": { + "key": { + "description": "Key of the flag: uniquely identifies this flag within it's flagSet", + "type": "string", + "minLength": 1 + } }, - { - "title": "Numeric flag", - "description": "A flag having numeric values.", - "$ref": "#/definitions/numberFlag" + "required": [ + "key" + ] + } + ] + } + }, + "baseConfig": { + "title": "flagd Flag Configuration", + "description": "Defines flags for use in flagd providers, including typed variants and rules.", + "type": "object", + "properties": { + "$evaluators": { + "title": "Evaluators", + "description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^.{1,}$": { + "$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path", + "$ref": "./targeting.json" + } + } + }, + "metadata": { + "title": "Flag Set Metadata", + "description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.", + "properties": { + "flagSetId": { + "description": "The unique identifier for the flag set.", + "type": "string" }, - { - "title": "Object flag", - "description": "A flag having arbitrary object values.", - "$ref": "#/definitions/objectFlag" + "version": { + "description": "The version of the flag set.", + "type": "string" } - ] + }, + "$ref": "#/definitions/metadata" } } }, - "$evaluators": { - "title": "Evaluators", - "description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.", + "providerConfig": { + "description": "Defines flags for use in providers (not flagd), including typed variants and rules.", "type": "object", - "additionalProperties": false, - "patternProperties": { - "^.{1,}$": { - "$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path", - "$ref": "./targeting.json" + "allOf": [ + { + "$ref": "#/definitions/baseConfig" } - } - }, - "metadata": { - "title": "Flag Set Metadata", - "description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.", + ], "properties": { - "flagSetId": { - "description": "The unique identifier for the flag set.", - "type": "string" - }, - "version": { - "description": "The version of the flag set.", - "type": "string" + "flags": { + "$ref": "#/definitions/flagsMap" } }, - "$ref": "#/definitions/metadata" - } - }, - "definitions": { - "flag": { + "required": [ + "flags" + ] + }, + "flagdConfig": { + "description": "Defines flags for use in the flagd daemon (a superset of what's available in providers), including typed variants and rules. Flags can be defined as an array or an object.", + "type": "object", + "allOf": [ + { + "$ref": "#/definitions/baseConfig" + }, + { + "properties": { + "flags": { + "oneOf": [ + { + "$ref": "#/definitions/flagsMap" + }, + { + "$ref": "#/definitions/flagsArray" + } + ] + } + } + } + ], + "required": [ + "flags" + ] + }, + "baseFlag": { "$comment": "base flag object; no title/description here, allows for better UX, keep it in the overrides", "type": "object", "properties": { @@ -82,8 +132,11 @@ }, "defaultVariant": { "title": "Default Variant", - "description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null).", - "type": "string" + "description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null). Set to null to use code-defined default.", + "type": [ + "string", + "null" + ] }, "targeting": { "$ref": "./targeting.json" @@ -92,11 +145,19 @@ "title": "Flag Metadata", "description": "Metadata about an individual feature flag, with keys of type string, and values of type boolean, string, or number.", "$ref": "#/definitions/metadata" + }, + "variants": { + "type": "object", + "minProperties": 1, + "additionalProperties": false, + "patternProperties": { + "^.{1,}$": {} + } } }, "required": [ "state", - "defaultVariant" + "variants" ] }, "booleanVariants": { @@ -109,10 +170,6 @@ "^.{1,}$": { "type": "boolean" } - }, - "default": { - "true": true, - "false": false } } } @@ -159,11 +216,29 @@ } } }, + "anyFlag": { + "anyOf": [ + { + "$ref": "#/definitions/booleanFlag" + }, + { + "$ref": "#/definitions/numberFlag" + }, + { + "$ref": "#/definitions/stringFlag" + }, + { + "$ref": "#/definitions/objectFlag" + } + ] + }, "booleanFlag": { "$comment": "merge the variants with the base flag to build our typed flags", + "title": "Boolean flag", + "description": "A flag having boolean values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/booleanVariants" @@ -171,9 +246,11 @@ ] }, "stringFlag": { + "title": "String flag", + "description": "A flag having string values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/stringVariants" @@ -181,9 +258,11 @@ ] }, "numberFlag": { + "title": "Numeric flag", + "description": "A flag having numeric values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/numberVariants" @@ -191,9 +270,11 @@ ] }, "objectFlag": { + "title": "Object flag", + "description": "A flag having arbitrary object values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/objectVariants" @@ -203,14 +284,12 @@ "metadata": { "type": "object", "additionalProperties": { - "description": "Any additional key/value pair with value of type boolean, string, or number.", "type": [ "string", "number", "boolean" ] - }, - "required": [] + } } } }