Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions centraldogma/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation "org.slf4j:slf4j-api:$slf4jVersion"
api "com.linecorp.centraldogma:centraldogma-client:$centralDogmaVersion"
implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion"

testImplementation "org.hamcrest:hamcrest:$hamcrestVersion"
testImplementation "com.linecorp.centraldogma:centraldogma-testing-junit:$centralDogmaVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.linecorp.decaton.centraldogma;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -38,8 +40,8 @@
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.ChangeConflictException;
import com.linecorp.centraldogma.common.PathPattern;
import com.linecorp.centraldogma.common.Query;
import com.linecorp.centraldogma.common.Revision;
import com.linecorp.decaton.centraldogma.internal.DecatonPropertyFileFormat;
import com.linecorp.decaton.processor.runtime.DynamicProperty;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
Expand All @@ -48,12 +50,15 @@

/**
* A {@link PropertySupplier} implementation with Central Dogma backend.
*
* <p>
* This implementation maps property's {@link PropertyDefinition#name()} as the absolute field name in the file
* on Central Dogma.
*
* <p>
* You can use json or yaml format for the property file.
* You cannot nest keys in both formats. Keys must be top-level fields.
* <p>
* An example JSON format would be look like:
* {@code
* <pre>{@code
* {
* "decaton.partition.concurrency": 10,
* "decaton.ignore.keys": [
Expand All @@ -62,7 +67,16 @@
* ],
* "decaton.processing.rate.per.partition": 50
* }
* }
* }</pre>
*
* An example YAML format would be look like:
* <pre>{@code
* decaton.partition.concurrency: 10
* decaton.ignore.keys:
* - "123456"
* - "79797979"
* decaton.processing.rate.per.partition: 50
* }</pre>
*/
public class CentralDogmaPropertySupplier implements PropertySupplier, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(CentralDogmaPropertySupplier.class);
Expand All @@ -73,7 +87,6 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose
private static final ObjectMapper objectMapper = new ObjectMapper();

private final Watcher<JsonNode> rootWatcher;

private final ConcurrentMap<String, DynamicProperty<?>> cachedProperties = new ConcurrentHashMap<>();

/**
Expand All @@ -94,7 +107,9 @@ public CentralDogmaPropertySupplier(CentralDogma centralDogma, String projectNam
* @param fileName the name of the file containing properties as top-level fields.
*/
public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepository, String fileName) {
rootWatcher = centralDogmaRepository.watcher(Query.ofJsonPath(fileName)).start();
DecatonPropertyFileFormat configFile = DecatonPropertyFileFormat.of(fileName);
this.rootWatcher = configFile.createWatcher(centralDogmaRepository, fileName);

try {
rootWatcher.awaitInitialValue(INITIAL_VALUE_TIMEOUT_SECS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Expand All @@ -103,6 +118,20 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor
} catch (TimeoutException e) {
throw new RuntimeException(e);
}

rootWatcher.watch(node -> {
for(ConcurrentHashMap.Entry<String, DynamicProperty<?>> cachedProperty : cachedProperties.entrySet()) {
if (node.has(cachedProperty.getKey())) {
try {
setValue(cachedProperty.getValue(), node.get(cachedProperty.getKey()));
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set value updatedfrom CentralDogma for {}", cachedProperty.getKey(), e);
}
}
}
});
}

// visible for testing
Expand All @@ -129,25 +158,13 @@ public <T> Optional<Property<T>> getProperty(PropertyDefinition<T> definition) {
// for most use cases though, this cache is only filled/read once.
final DynamicProperty<?> cachedProp = cachedProperties.computeIfAbsent(definition.name(), name -> {
DynamicProperty<T> prop = new DynamicProperty<>(definition);
Watcher<JsonNode> child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name()));
child.watch(node -> {
try {
setValue(prop, node);
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e);
}
});
try {
JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher
setValue(prop, node);
setValue(prop, rootWatcher.latestValue().get(definition.name()));
} catch (Exception e) {
// Catching Exception instead of RuntimeException, since
// Kotlin-implemented DynamicProperty would throw checked exceptions
logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e);
}

return prop;
});

Expand Down Expand Up @@ -175,8 +192,7 @@ public void close() {
public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project,
String repository, String filename) {
final CentralDogmaRepository centralDogmaRepository = centralDogma.forRepo(project, repository);
createPropertyFile(centralDogmaRepository, filename, ProcessorProperties.defaultProperties());
return new CentralDogmaPropertySupplier(centralDogmaRepository, filename);
return register(centralDogmaRepository, filename);
}

/**
Expand Down Expand Up @@ -215,6 +231,7 @@ public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, S
public static CentralDogmaPropertySupplier register(CentralDogmaRepository centralDogmaRepository,
String filename,
PropertySupplier supplier) {

List<Property<?>> properties = ProcessorProperties.defaultProperties().stream().map(defaultProperty -> {
Optional<? extends Property<?>> prop = supplier.getProperty(defaultProperty.definition());
if (prop.isPresent()) {
Expand All @@ -236,12 +253,18 @@ private static void createPropertyFile(CentralDogmaRepository centralDogmaReposi
long remainingTime = remainingTime(PROPERTY_CREATION_TIMEOUT_MILLIS, startedTime);

JsonNode jsonNodeProperties = convertPropertyListToJsonNode(properties);
Change<?> upsert;
try {
upsert = DecatonPropertyFileFormat.of(fileName).createUpsertChange(fileName, jsonNodeProperties);
} catch (IOException e) {
throw new UncheckedIOException(e);
}

while (!fileExists && remainingTime > 0) {
try {
centralDogmaRepository
.commit(String.format("[CentralDogmaPropertySupplier] Property file created: %s", fileName),
Change.ofJsonUpsert(fileName, jsonNodeProperties))
upsert)
.push(baseRevision)
.get(remainingTime, TimeUnit.MILLISECONDS);
logger.info("New property file {} registered on Central Dogma", fileName);
Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class doesn't need to be exposed to users, so let's move to internal package (refs: https://github.com/line/decaton/blob/master/VERSIONING.md#public-apis)

JsonFormat, YamlFormat as well

Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2025 LINE Corporation
*
* Licensed under the Apache License, Version 2.0 …
*/

package com.linecorp.decaton.centraldogma.internal;

import java.io.IOException;
import java.util.Locale;

import com.fasterxml.jackson.databind.JsonNode;
import com.linecorp.centraldogma.client.CentralDogmaRepository;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.decaton.centraldogma.CentralDogmaPropertySupplier;

/**
* Encapsulates Central Dogma–specific concerns for reading and writing
* configuration files in various text formats (JSON, YAML, ...).
* <p>
* Implementations convert between raw file contents managed by Central Dogma
* and {@link JsonNode} values consumed by {@link CentralDogmaPropertySupplier}.
*/
public interface DecatonPropertyFileFormat {
/**
* Create and start a Watcher that emits {@link JsonNode} for each file update.
*/
Watcher<JsonNode> createWatcher(CentralDogmaRepository repo, String fileName);

/**
* Serialize the given node and wrap it as Central Dogma {@link Change} for initial file creation.
*/
Change<?> createUpsertChange(String fileName, JsonNode initialNode) throws IOException;

static DecatonPropertyFileFormat of(String fileName) {
String lower = fileName.toLowerCase(Locale.ROOT);
return (lower.endsWith(".yml") || lower.endsWith(".yaml"))
? new YamlFormat()
: new JsonFormat();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2025 LINE Corporation
*/

package com.linecorp.decaton.centraldogma.internal;

import com.fasterxml.jackson.databind.JsonNode;
import com.linecorp.centraldogma.client.CentralDogmaRepository;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Query;

public class JsonFormat implements DecatonPropertyFileFormat {
@Override
public Watcher<JsonNode> createWatcher(CentralDogmaRepository repo, String fileName) {
return repo.watcher(Query.ofJsonPath(fileName)).start();
}

@Override
public Change<?> createUpsertChange(String fileName, JsonNode initialNode) {
return Change.ofJsonUpsert(fileName, initialNode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2025 LINE Corporation
*
* Licensed under the Apache License, Version 2.0 …
*/

package com.linecorp.decaton.centraldogma.internal;

import java.io.IOException;
import java.io.UncheckedIOException;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.linecorp.centraldogma.client.CentralDogmaRepository;
import com.linecorp.centraldogma.client.Watcher;
import com.linecorp.centraldogma.common.Change;
import com.linecorp.centraldogma.common.Query;

import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER;

public class YamlFormat implements DecatonPropertyFileFormat {
private static final ObjectMapper YAML_MAPPER = new ObjectMapper(
new YAMLFactory()
.disable(WRITE_DOC_START_MARKER)
);

@Override
public Watcher<JsonNode> createWatcher(CentralDogmaRepository repo, String fileName) {
return repo.watcher(Query.ofText(fileName))
.map(text -> {
try {
return YAML_MAPPER.readTree(text);
} catch (IOException e) {
throw new UncheckedIOException("Failed to parse YAML from " + fileName, e);
}
})
.start();
}

@Override
public Change<?> createUpsertChange(String fileName, JsonNode initialNode) throws IOException {
String yaml = YAML_MAPPER.writeValueAsString(initialNode);
return Change.ofTextUpsert(fileName, yaml);
}
}
Loading