diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 5d245a764..474be5c2f 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -21,7 +21,10 @@ import io.grpc.stub.StreamObserver; import java.util.List; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; @@ -48,6 +51,11 @@ public class SyncStreamQueueSource implements QueueSource { private final FlagdOptions options; private final BlockingQueue outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE); private final List fatalStatusCodes; + private final ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "flagd-sync-retry-scheduler"); + t.setDaemon(true); + return t; + }); private volatile GrpcComponents grpcComponents; /** @@ -143,9 +151,7 @@ public synchronized void reinitializeChannelComponents() { /** Initialize sync stream connector. */ public void init() throws Exception { - Thread listener = new Thread(this::observeSyncStream); - listener.setDaemon(true); - listener.start(); + retryScheduler.execute(this::observeSyncStream); } /** Get blocking queue to obtain payloads exposed by this connector. */ @@ -167,6 +173,8 @@ public void shutdown() throws InterruptedException { return; } + retryScheduler.shutdownNow(); + retryScheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS); grpcComponents.channelConnector.shutdown(); } @@ -181,12 +189,12 @@ private void observeSyncStream() { try { if (shouldThrottle.getAndSet(false)) { log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs); - Thread.sleep(this.maxBackoffMs); - - // Check shutdown again after sleep to avoid unnecessary work - if (shutdown.get()) { - break; + try { + retryScheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException e) { + log.debug("Retry scheduling rejected, most likely shutdown was invoked", e); } + return; } log.debug("Initializing sync stream request"); @@ -198,8 +206,8 @@ private void observeSyncStream() { log.info( "Fatal status code for metadata request: {}, not retrying", metaEx.getStatus().getCode()); - shutdown(); enqueue(QueuePayload.SHUTDOWN); + shutdown(); } else { // retry for other status codes String message = metaEx.getMessage(); @@ -218,8 +226,8 @@ private void observeSyncStream() { log.info( "Fatal status code during sync stream: {}, not retrying", ex.getStatus().getCode()); - shutdown(); enqueue(QueuePayload.SHUTDOWN); + shutdown(); } else { // retry for other status codes log.error("Unexpected sync stream exception, will restart.", ex); @@ -228,7 +236,8 @@ private void observeSyncStream() { shouldThrottle.set(true); } } catch (InterruptedException ie) { - log.debug("Stream loop interrupted, most likely shutdown was invoked", ie); + log.debug("Stream observer interrupted, most likely shutdown was invoked", ie); + Thread.currentThread().interrupt(); } } diff --git a/tools/flagd-core/src/main/resources/flagd/schemas/flags.json b/tools/flagd-core/src/main/resources/flagd/schemas/flags.json index 6e045b654..cff1aab81 100644 --- a/tools/flagd-core/src/main/resources/flagd/schemas/flags.json +++ b/tools/flagd-core/src/main/resources/flagd/schemas/flags.json @@ -1,11 +1,9 @@ { "$id": "https://flagd.dev/schema/v0/flags.json", "$schema": "http://json-schema.org/draft-07/schema#", - "title": "flagd Flag Configuration", - "description": "Defines flags for use in flagd, including typed variants and rules.", - "type": "object", - "properties": { - "flags": { + "$ref": "#/definitions/providerConfig", + "definitions": { + "flagsMap": { "title": "Flags", "description": "Top-level flags object. All flags are defined here.", "type": "object", @@ -13,61 +11,113 @@ "additionalProperties": false, "patternProperties": { "^.{1,}$": { - "oneOf": [ - { - "title": "Boolean flag", - "description": "A flag having boolean values.", - "$ref": "#/definitions/booleanFlag" - }, - { - "title": "String flag", - "description": "A flag having string values.", - "$ref": "#/definitions/stringFlag" + "$ref": "#/definitions/anyFlag" + } + } + }, + "flagsArray": { + "title": "Flags", + "description": "Top-level flags array. All flags are defined here.", + "type": "array", + "items": { + "allOf": [ + { + "$ref": "#/definitions/anyFlag" + }, + { + "type": "object", + "properties": { + "key": { + "description": "Key of the flag: uniquely identifies this flag within it's flagSet", + "type": "string", + "minLength": 1 + } }, - { - "title": "Numeric flag", - "description": "A flag having numeric values.", - "$ref": "#/definitions/numberFlag" + "required": [ + "key" + ] + } + ] + } + }, + "baseConfig": { + "title": "flagd Flag Configuration", + "description": "Defines flags for use in flagd providers, including typed variants and rules.", + "type": "object", + "properties": { + "$evaluators": { + "title": "Evaluators", + "description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.", + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^.{1,}$": { + "$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path", + "$ref": "./targeting.json" + } + } + }, + "metadata": { + "title": "Flag Set Metadata", + "description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.", + "properties": { + "flagSetId": { + "description": "The unique identifier for the flag set.", + "type": "string" }, - { - "title": "Object flag", - "description": "A flag having arbitrary object values.", - "$ref": "#/definitions/objectFlag" + "version": { + "description": "The version of the flag set.", + "type": "string" } - ] + }, + "$ref": "#/definitions/metadata" } } }, - "$evaluators": { - "title": "Evaluators", - "description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.", + "providerConfig": { + "description": "Defines flags for use in providers (not flagd), including typed variants and rules.", "type": "object", - "additionalProperties": false, - "patternProperties": { - "^.{1,}$": { - "$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path", - "$ref": "./targeting.json" + "allOf": [ + { + "$ref": "#/definitions/baseConfig" } - } - }, - "metadata": { - "title": "Flag Set Metadata", - "description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.", + ], "properties": { - "flagSetId": { - "description": "The unique identifier for the flag set.", - "type": "string" - }, - "version": { - "description": "The version of the flag set.", - "type": "string" + "flags": { + "$ref": "#/definitions/flagsMap" } }, - "$ref": "#/definitions/metadata" - } - }, - "definitions": { - "flag": { + "required": [ + "flags" + ] + }, + "flagdConfig": { + "description": "Defines flags for use in the flagd daemon (a superset of what's available in providers), including typed variants and rules. Flags can be defined as an array or an object.", + "type": "object", + "allOf": [ + { + "$ref": "#/definitions/baseConfig" + }, + { + "properties": { + "flags": { + "oneOf": [ + { + "$ref": "#/definitions/flagsMap" + }, + { + "$ref": "#/definitions/flagsArray" + } + ] + } + } + } + ], + "required": [ + "flags" + ] + }, + "baseFlag": { "$comment": "base flag object; no title/description here, allows for better UX, keep it in the overrides", "type": "object", "properties": { @@ -82,8 +132,11 @@ }, "defaultVariant": { "title": "Default Variant", - "description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null).", - "type": "string" + "description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null). Set to null to use code-defined default.", + "type": [ + "string", + "null" + ] }, "targeting": { "$ref": "./targeting.json" @@ -92,11 +145,19 @@ "title": "Flag Metadata", "description": "Metadata about an individual feature flag, with keys of type string, and values of type boolean, string, or number.", "$ref": "#/definitions/metadata" + }, + "variants": { + "type": "object", + "minProperties": 1, + "additionalProperties": false, + "patternProperties": { + "^.{1,}$": {} + } } }, "required": [ "state", - "defaultVariant" + "variants" ] }, "booleanVariants": { @@ -109,10 +170,6 @@ "^.{1,}$": { "type": "boolean" } - }, - "default": { - "true": true, - "false": false } } } @@ -159,11 +216,29 @@ } } }, + "anyFlag": { + "anyOf": [ + { + "$ref": "#/definitions/booleanFlag" + }, + { + "$ref": "#/definitions/numberFlag" + }, + { + "$ref": "#/definitions/stringFlag" + }, + { + "$ref": "#/definitions/objectFlag" + } + ] + }, "booleanFlag": { "$comment": "merge the variants with the base flag to build our typed flags", + "title": "Boolean flag", + "description": "A flag having boolean values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/booleanVariants" @@ -171,9 +246,11 @@ ] }, "stringFlag": { + "title": "String flag", + "description": "A flag having string values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/stringVariants" @@ -181,9 +258,11 @@ ] }, "numberFlag": { + "title": "Numeric flag", + "description": "A flag having numeric values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/numberVariants" @@ -191,9 +270,11 @@ ] }, "objectFlag": { + "title": "Object flag", + "description": "A flag having arbitrary object values.", "allOf": [ { - "$ref": "#/definitions/flag" + "$ref": "#/definitions/baseFlag" }, { "$ref": "#/definitions/objectVariants" @@ -203,14 +284,12 @@ "metadata": { "type": "object", "additionalProperties": { - "description": "Any additional key/value pair with value of type boolean, string, or number.", "type": [ "string", "number", "boolean" ] - }, - "required": [] + } } } }