Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -48,6 +51,11 @@ public class SyncStreamQueueSource implements QueueSource {
private final FlagdOptions options;
private final BlockingQueue<QueuePayload> outgoingQueue = new LinkedBlockingQueue<>(QUEUE_SIZE);
private final List<String> fatalStatusCodes;
private final ScheduledExecutorService retryScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "flagd-sync-retry-scheduler");
t.setDaemon(true);
return t;
});
private volatile GrpcComponents grpcComponents;

/**
Expand Down Expand Up @@ -143,9 +151,7 @@ public synchronized void reinitializeChannelComponents() {

/** Initialize sync stream connector. */
public void init() throws Exception {
Thread listener = new Thread(this::observeSyncStream);
listener.setDaemon(true);
listener.start();
retryScheduler.execute(this::observeSyncStream);
}

/** Get blocking queue to obtain payloads exposed by this connector. */
Expand All @@ -167,6 +173,8 @@ public void shutdown() throws InterruptedException {
return;
}

retryScheduler.shutdownNow();
retryScheduler.awaitTermination(deadline, TimeUnit.MILLISECONDS);
grpcComponents.channelConnector.shutdown();
}

Expand All @@ -181,12 +189,12 @@ private void observeSyncStream() {
try {
if (shouldThrottle.getAndSet(false)) {
log.debug("Previous stream ended with error, waiting {} ms before retry", this.maxBackoffMs);
Thread.sleep(this.maxBackoffMs);

// Check shutdown again after sleep to avoid unnecessary work
if (shutdown.get()) {
break;
try {
retryScheduler.schedule(this::observeSyncStream, this.maxBackoffMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
log.debug("Retry scheduling rejected, most likely shutdown was invoked", e);
}
return;
}

log.debug("Initializing sync stream request");
Expand All @@ -198,8 +206,8 @@ private void observeSyncStream() {
log.info(
"Fatal status code for metadata request: {}, not retrying",
metaEx.getStatus().getCode());
shutdown();
enqueue(QueuePayload.SHUTDOWN);
shutdown();
} else {
// retry for other status codes
String message = metaEx.getMessage();
Expand All @@ -218,8 +226,8 @@ private void observeSyncStream() {
log.info(
"Fatal status code during sync stream: {}, not retrying",
ex.getStatus().getCode());
shutdown();
enqueue(QueuePayload.SHUTDOWN);
shutdown();
} else {
// retry for other status codes
log.error("Unexpected sync stream exception, will restart.", ex);
Expand All @@ -228,7 +236,8 @@ private void observeSyncStream() {
shouldThrottle.set(true);
}
} catch (InterruptedException ie) {
log.debug("Stream loop interrupted, most likely shutdown was invoked", ie);
log.debug("Stream observer interrupted, most likely shutdown was invoked", ie);
Thread.currentThread().interrupt();
}
}

Expand Down
205 changes: 142 additions & 63 deletions tools/flagd-core/src/main/resources/flagd/schemas/flags.json
Original file line number Diff line number Diff line change
@@ -1,73 +1,123 @@
{
"$id": "https://flagd.dev/schema/v0/flags.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "flagd Flag Configuration",
"description": "Defines flags for use in flagd, including typed variants and rules.",
"type": "object",
"properties": {
"flags": {
"$ref": "#/definitions/providerConfig",
"definitions": {
"flagsMap": {
"title": "Flags",
"description": "Top-level flags object. All flags are defined here.",
"type": "object",
"$comment": "flag objects are one of the 4 flag types defined in definitions",
"additionalProperties": false,
"patternProperties": {
"^.{1,}$": {
"oneOf": [
{
"title": "Boolean flag",
"description": "A flag having boolean values.",
"$ref": "#/definitions/booleanFlag"
},
{
"title": "String flag",
"description": "A flag having string values.",
"$ref": "#/definitions/stringFlag"
"$ref": "#/definitions/anyFlag"
}
}
},
"flagsArray": {
"title": "Flags",
"description": "Top-level flags array. All flags are defined here.",
"type": "array",
"items": {
"allOf": [
{
"$ref": "#/definitions/anyFlag"
},
{
"type": "object",
"properties": {
"key": {
"description": "Key of the flag: uniquely identifies this flag within it's flagSet",
"type": "string",
"minLength": 1
}
},
{
"title": "Numeric flag",
"description": "A flag having numeric values.",
"$ref": "#/definitions/numberFlag"
"required": [
"key"
]
}
]
}
},
"baseConfig": {
"title": "flagd Flag Configuration",
"description": "Defines flags for use in flagd providers, including typed variants and rules.",
"type": "object",
"properties": {
"$evaluators": {
"title": "Evaluators",
"description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.",
"type": "object",
"additionalProperties": false,
"patternProperties": {
"^.{1,}$": {
"$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path",
"$ref": "./targeting.json"
}
}
},
"metadata": {
"title": "Flag Set Metadata",
"description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.",
"properties": {
"flagSetId": {
"description": "The unique identifier for the flag set.",
"type": "string"
},
{
"title": "Object flag",
"description": "A flag having arbitrary object values.",
"$ref": "#/definitions/objectFlag"
"version": {
"description": "The version of the flag set.",
"type": "string"
}
]
},
"$ref": "#/definitions/metadata"
}
}
},
"$evaluators": {
"title": "Evaluators",
"description": "Reusable targeting rules that can be referenced with \"$ref\": \"myRule\" in multiple flags.",
"providerConfig": {
"description": "Defines flags for use in providers (not flagd), including typed variants and rules.",
"type": "object",
"additionalProperties": false,
"patternProperties": {
"^.{1,}$": {
"$comment": "this relative ref means that targeting.json MUST be in the same dir, or available on the same HTTP path",
"$ref": "./targeting.json"
"allOf": [
{
"$ref": "#/definitions/baseConfig"
}
}
},
"metadata": {
"title": "Flag Set Metadata",
"description": "Metadata about the flag set, with keys of type string, and values of type boolean, string, or number.",
],
"properties": {
"flagSetId": {
"description": "The unique identifier for the flag set.",
"type": "string"
},
"version": {
"description": "The version of the flag set.",
"type": "string"
"flags": {
"$ref": "#/definitions/flagsMap"
}
},
"$ref": "#/definitions/metadata"
}
},
"definitions": {
"flag": {
"required": [
"flags"
]
},
"flagdConfig": {
"description": "Defines flags for use in the flagd daemon (a superset of what's available in providers), including typed variants and rules. Flags can be defined as an array or an object.",
"type": "object",
"allOf": [
{
"$ref": "#/definitions/baseConfig"
},
{
"properties": {
"flags": {
"oneOf": [
{
"$ref": "#/definitions/flagsMap"
},
{
"$ref": "#/definitions/flagsArray"
}
]
}
}
}
],
"required": [
"flags"
]
},
"baseFlag": {
"$comment": "base flag object; no title/description here, allows for better UX, keep it in the overrides",
"type": "object",
"properties": {
Expand All @@ -82,8 +132,11 @@
},
"defaultVariant": {
"title": "Default Variant",
"description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null).",
"type": "string"
"description": "The variant to serve if no dynamic targeting applies (including if the targeting returns null). Set to null to use code-defined default.",
"type": [
"string",
"null"
]
},
"targeting": {
"$ref": "./targeting.json"
Expand All @@ -92,11 +145,19 @@
"title": "Flag Metadata",
"description": "Metadata about an individual feature flag, with keys of type string, and values of type boolean, string, or number.",
"$ref": "#/definitions/metadata"
},
"variants": {
"type": "object",
"minProperties": 1,
"additionalProperties": false,
"patternProperties": {
"^.{1,}$": {}
}
}
},
"required": [
"state",
"defaultVariant"
"variants"
]
},
"booleanVariants": {
Expand All @@ -109,10 +170,6 @@
"^.{1,}$": {
"type": "boolean"
}
},
"default": {
"true": true,
"false": false
}
}
}
Expand Down Expand Up @@ -159,41 +216,65 @@
}
}
},
"anyFlag": {
"anyOf": [
{
"$ref": "#/definitions/booleanFlag"
},
{
"$ref": "#/definitions/numberFlag"
},
{
"$ref": "#/definitions/stringFlag"
},
{
"$ref": "#/definitions/objectFlag"
}
]
},
"booleanFlag": {
"$comment": "merge the variants with the base flag to build our typed flags",
"title": "Boolean flag",
"description": "A flag having boolean values.",
"allOf": [
{
"$ref": "#/definitions/flag"
"$ref": "#/definitions/baseFlag"
},
{
"$ref": "#/definitions/booleanVariants"
}
]
},
"stringFlag": {
"title": "String flag",
"description": "A flag having string values.",
"allOf": [
{
"$ref": "#/definitions/flag"
"$ref": "#/definitions/baseFlag"
},
{
"$ref": "#/definitions/stringVariants"
}
]
},
"numberFlag": {
"title": "Numeric flag",
"description": "A flag having numeric values.",
"allOf": [
{
"$ref": "#/definitions/flag"
"$ref": "#/definitions/baseFlag"
},
{
"$ref": "#/definitions/numberVariants"
}
]
},
"objectFlag": {
"title": "Object flag",
"description": "A flag having arbitrary object values.",
"allOf": [
{
"$ref": "#/definitions/flag"
"$ref": "#/definitions/baseFlag"
},
{
"$ref": "#/definitions/objectVariants"
Expand All @@ -203,14 +284,12 @@
"metadata": {
"type": "object",
"additionalProperties": {
"description": "Any additional key/value pair with value of type boolean, string, or number.",
"type": [
"string",
"number",
"boolean"
]
},
"required": []
}
}
}
}
Loading