From 37bb1f3f88af4def1482afe3af3f5eca6aa7e514 Mon Sep 17 00:00:00 2001 From: Yuta Kasai Date: Wed, 23 Jul 2025 13:08:42 +0900 Subject: [PATCH 1/6] NO-ISSUE Support yaml partially --- centraldogma/build.gradle | 1 + .../CentralDogmaPropertySupplier.java | 76 ++-- .../DecatonPropertyFileFormat.java | 41 +++ .../decaton/centraldogma/JsonFormat.java | 23 ++ .../decaton/centraldogma/YamlFormat.java | 46 +++ ...lDogmaPropertySupplierIntegrationTest.java | 324 +++++++++++++++--- .../CentralDogmaPropertySupplierTest.java | 135 +++++--- docs/dynamic-property-configuration.adoc | 47 +++ 8 files changed, 571 insertions(+), 122 deletions(-) create mode 100644 centraldogma/src/main/java/com/linecorp/decaton/centraldogma/DecatonPropertyFileFormat.java create mode 100644 centraldogma/src/main/java/com/linecorp/decaton/centraldogma/JsonFormat.java create mode 100644 centraldogma/src/main/java/com/linecorp/decaton/centraldogma/YamlFormat.java diff --git a/centraldogma/build.gradle b/centraldogma/build.gradle index 53a1e08a..f2aedae5 100644 --- a/centraldogma/build.gradle +++ b/centraldogma/build.gradle @@ -13,6 +13,7 @@ dependencies { implementation "org.slf4j:slf4j-api:$slf4jVersion" api "com.linecorp.centraldogma:centraldogma-client:$centralDogmaVersion" implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion" + implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion" testImplementation "org.hamcrest:hamcrest:$hamcrestVersion" testImplementation "com.linecorp.centraldogma:centraldogma-testing-junit:$centralDogmaVersion" diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index 80f71180..9879fc3b 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -16,6 +16,8 @@ package com.linecorp.decaton.centraldogma; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -38,7 +40,6 @@ import com.linecorp.centraldogma.common.Change; import com.linecorp.centraldogma.common.ChangeConflictException; import com.linecorp.centraldogma.common.PathPattern; -import com.linecorp.centraldogma.common.Query; import com.linecorp.centraldogma.common.Revision; import com.linecorp.decaton.processor.runtime.DynamicProperty; import com.linecorp.decaton.processor.runtime.ProcessorProperties; @@ -48,12 +49,15 @@ /** * A {@link PropertySupplier} implementation with Central Dogma backend. - * + *

* This implementation maps property's {@link PropertyDefinition#name()} as the absolute field name in the file * on Central Dogma. - * + *

+ * You can use json or yaml format for the property file. + * You cannot nest keys in both formats. Keys must be top-level fields. + *

* An example JSON format would be look like: - * {@code + *

{@code
  * {
  *     "decaton.partition.concurrency": 10,
  *     "decaton.ignore.keys": [
@@ -62,7 +66,16 @@
  *     ],
  *     "decaton.processing.rate.per.partition": 50
  * }
- * }
+ * }
+ * + * An example YAML format would be look like: + *
{@code
+ * decaton.partition.concurrency: 10
+ * decaton.ignore.keys:
+ *  - "123456"
+ *  - "79797979"
+ * decaton.processing.rate.per.partition: 50
+ * }
*/ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(CentralDogmaPropertySupplier.class); @@ -73,7 +86,6 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose private static final ObjectMapper objectMapper = new ObjectMapper(); private final Watcher rootWatcher; - private final ConcurrentMap> cachedProperties = new ConcurrentHashMap<>(); /** @@ -94,7 +106,9 @@ public CentralDogmaPropertySupplier(CentralDogma centralDogma, String projectNam * @param fileName the name of the file containing properties as top-level fields. */ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepository, String fileName) { - rootWatcher = centralDogmaRepository.watcher(Query.ofJsonPath(fileName)).start(); + DecatonPropertyFileFormat configFile = DecatonPropertyFileFormat.of(fileName); + this.rootWatcher = configFile.createWatcher(centralDogmaRepository, fileName); + try { rootWatcher.awaitInitialValue(INITIAL_VALUE_TIMEOUT_SECS, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -103,6 +117,21 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor } catch (TimeoutException e) { throw new RuntimeException(e); } + + rootWatcher.watch(node -> { + node.fields().forEachRemaining(entry -> { + DynamicProperty p = cachedProperties.get(entry.getKey()); + if (p != null) { + try { + setValue(p, entry.getValue()); + } catch (Exception e) { + // Catching Exception instead of RuntimeException, since + // Kotlin-implemented DynamicProperty would throw checked exceptions + logger.warn("Failed to set initial value from CentralDogma for {}", entry.getKey(), e); + } + } + }); + }); } // visible for testing @@ -129,25 +158,13 @@ public Optional> getProperty(PropertyDefinition definition) { // for most use cases though, this cache is only filled/read once. final DynamicProperty cachedProp = cachedProperties.computeIfAbsent(definition.name(), name -> { DynamicProperty prop = new DynamicProperty<>(definition); - Watcher child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name())); - child.watch(node -> { - try { - setValue(prop, node); - } catch (Exception e) { - // Catching Exception instead of RuntimeException, since - // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e); - } - }); try { - JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher - setValue(prop, node); + setValue(prop, rootWatcher.latestValue().get(definition.name())); } catch (Exception e) { // Catching Exception instead of RuntimeException, since // Kotlin-implemented DynamicProperty would throw checked exceptions logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e); } - return prop; }); @@ -175,8 +192,7 @@ public void close() { public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project, String repository, String filename) { final CentralDogmaRepository centralDogmaRepository = centralDogma.forRepo(project, repository); - createPropertyFile(centralDogmaRepository, filename, ProcessorProperties.defaultProperties()); - return new CentralDogmaPropertySupplier(centralDogmaRepository, filename); + return register(centralDogmaRepository, filename); } /** @@ -215,6 +231,7 @@ public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, S public static CentralDogmaPropertySupplier register(CentralDogmaRepository centralDogmaRepository, String filename, PropertySupplier supplier) { + List> properties = ProcessorProperties.defaultProperties().stream().map(defaultProperty -> { Optional> prop = supplier.getProperty(defaultProperty.definition()); if (prop.isPresent()) { @@ -230,18 +247,31 @@ public static CentralDogmaPropertySupplier register(CentralDogmaRepository centr private static void createPropertyFile(CentralDogmaRepository centralDogmaRepository, String fileName, List> properties) { + // show given properties + logger.info("Creating CentralDogma property file: {} with properties: {}", + fileName, + properties.stream() + .map(p -> String.format("%s=%s", p.definition().name(), p.value())) + .collect(Collectors.joining(", "))); + Revision baseRevision = normalizeRevision(centralDogmaRepository, Revision.HEAD); boolean fileExists = fileExists(centralDogmaRepository, fileName, baseRevision); long startedTime = System.currentTimeMillis(); long remainingTime = remainingTime(PROPERTY_CREATION_TIMEOUT_MILLIS, startedTime); JsonNode jsonNodeProperties = convertPropertyListToJsonNode(properties); + Change upsert; + try { + upsert = DecatonPropertyFileFormat.of(fileName).createUpsertChange(fileName, jsonNodeProperties); + } catch (IOException e) { + throw new UncheckedIOException(e); + } while (!fileExists && remainingTime > 0) { try { centralDogmaRepository .commit(String.format("[CentralDogmaPropertySupplier] Property file created: %s", fileName), - Change.ofJsonUpsert(fileName, jsonNodeProperties)) + upsert) .push(baseRevision) .get(remainingTime, TimeUnit.MILLISECONDS); logger.info("New property file {} registered on Central Dogma", fileName); diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/DecatonPropertyFileFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/DecatonPropertyFileFormat.java new file mode 100644 index 00000000..04cde27a --- /dev/null +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/DecatonPropertyFileFormat.java @@ -0,0 +1,41 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 … + */ + +package com.linecorp.decaton.centraldogma; + +import java.io.IOException; +import java.util.Locale; + +import com.fasterxml.jackson.databind.JsonNode; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.Watcher; +import com.linecorp.centraldogma.common.Change; + +/** + * Encapsulates Central Dogma–specific concerns for reading and writing + * configuration files in various text formats (JSON, YAML, ...). + *

+ * Implementations convert between raw file contents managed by Central Dogma + * and {@link JsonNode} values consumed by {@link CentralDogmaPropertySupplier}. + */ +public interface DecatonPropertyFileFormat { + /** + * Create and start a Watcher that emits {@link JsonNode} for each file update. + */ + Watcher createWatcher(CentralDogmaRepository repo, String fileName); + + /** + * Serialize the given node and wrap it as Central Dogma {@link Change} for initial file creation. + */ + Change createUpsertChange(String fileName, JsonNode initialNode) throws IOException; + + static DecatonPropertyFileFormat of(String fileName) { + String lower = fileName.toLowerCase(Locale.ROOT); + return (lower.endsWith(".yml") || lower.endsWith(".yaml")) + ? new YamlFormat() + : new JsonFormat(); + } +} diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/JsonFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/JsonFormat.java new file mode 100644 index 00000000..c6b50fd7 --- /dev/null +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/JsonFormat.java @@ -0,0 +1,23 @@ +/* + * Copyright 2025 LINE Corporation + */ + +package com.linecorp.decaton.centraldogma; + +import com.fasterxml.jackson.databind.JsonNode; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.Watcher; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Query; + +public class JsonFormat implements DecatonPropertyFileFormat { + @Override + public Watcher createWatcher(CentralDogmaRepository repo, String fileName) { + return repo.watcher(Query.ofJsonPath(fileName)).start(); + } + + @Override + public Change createUpsertChange(String fileName, JsonNode initialNode) { + return Change.ofJsonUpsert(fileName, initialNode); + } +} diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/YamlFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/YamlFormat.java new file mode 100644 index 00000000..ed7e2e64 --- /dev/null +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/YamlFormat.java @@ -0,0 +1,46 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 … + */ + +package com.linecorp.decaton.centraldogma; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.Watcher; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Query; + +import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; + +public class YamlFormat implements DecatonPropertyFileFormat { + private static final ObjectMapper YAML_MAPPER = new ObjectMapper( + new YAMLFactory() + .disable(WRITE_DOC_START_MARKER) + ); + + @Override + public Watcher createWatcher(CentralDogmaRepository repo, String fileName) { + return repo.watcher(Query.ofText(fileName)) + .map(text -> { + try { + return YAML_MAPPER.readTree(text); + } catch (IOException e) { + throw new UncheckedIOException("Failed to parse YAML from " + fileName, e); + } + }) + .start(); + } + + @Override + public Change createUpsertChange(String fileName, JsonNode initialNode) throws IOException { + String yaml = YAML_MAPPER.writeValueAsString(initialNode); + return Change.ofTextUpsert(fileName, yaml); + } +} diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java index 9fa135f0..d001e768 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java @@ -16,12 +16,9 @@ package com.linecorp.decaton.centraldogma; +import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PARTITION_CONCURRENCY; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertSame; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -33,8 +30,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; @@ -54,7 +57,10 @@ import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +@Slf4j public class CentralDogmaPropertySupplierIntegrationTest { @RegisterExtension final CentralDogmaExtension extension = new CentralDogmaExtension() { @@ -73,7 +79,6 @@ protected void configureHttpClient(WebClientBuilder builder) { private static final String PROJECT_NAME = "unit-test"; private static final String REPOSITORY_NAME = "repo"; - private static final String FILENAME = "/subscription.json"; private JsonNode defaultProperties() { return CentralDogmaPropertySupplier.convertPropertyListToJsonNode( @@ -82,7 +87,8 @@ private JsonNode defaultProperties() { @Test @Timeout(50) - public void testCDIntegration() throws InterruptedException { + public void testCDIntegrationJson() throws InterruptedException { + final String FILENAME = "/subscription.json"; CentralDogma client = extension.client(); final String ORIGINAL = @@ -140,32 +146,86 @@ public void testCDIntegration() throws InterruptedException { } @Test - public void testFileExist() { + @Timeout(50) + void testCDIntegrationYaml() throws Exception { + final String FILE = "/subscription.yaml"; CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); - CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME) - .join(); + CentralDogmaRepository repo = + client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); - centralDogmaRepository - .commit("test", Change.ofJsonUpsert(FILENAME, "{}")) - .push() - .join(); - assertTrue(CentralDogmaPropertySupplier - .fileExists(centralDogmaRepository, FILENAME, Revision.HEAD)); - } + final String ORIGINAL_YAML = + "# processor properties\n" + + "decaton.partition.concurrency: 10\n" + + "\n" + + "# keys to ignore\n" + + "decaton.ignore.keys:\n" + + " - \"123456\" # hi\n" + + " - \"79797979\" # hello\n" + + "\n" + + "decaton.processing.rate.per.partition: 50\n"; - @Test - public void testFileNonExistent() { - CentralDogma client = extension.client(); - client.createProject(PROJECT_NAME).join(); - CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); - assertFalse(CentralDogmaPropertySupplier - .fileExists(centralDogmaRepository, FILENAME, Revision.HEAD)); + repo.commit("init-yaml", Change.ofTextUpsert(FILE, ORIGINAL_YAML)) + .push().join(); + + CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE); + + Property concurrency = + supplier.getProperty(CONFIG_PARTITION_CONCURRENCY).get(); + Property> ignoreKeys = + supplier.getProperty(ProcessorProperties.CONFIG_IGNORE_KEYS).get(); + + assertEquals(10, concurrency.value()); + assertEquals(java.util.Arrays.asList("123456", "79797979"), ignoreKeys.value()); + + CountDownLatch latch = new CountDownLatch(2); + concurrency.listen((o, n) -> latch.countDown()); + + AtomicBoolean firstCall = new AtomicBoolean(true); + + ignoreKeys.listen((oldVal, newVal) -> { + // null to list is allowed + if (firstCall.getAndSet(false)) { + return; + } + fail("ignoreKeys should not be updated after the first call"); + }); + + final String UPDATED_YAML = + "# processor properties\n" + + "decaton.partition.concurrency: 20\n" // This is changed + + "\n" + + "# keys to ignore\n" + + "decaton.ignore.keys:\n" + + " - \"123456\" # hi\n" + + " - \"79797979\" # hello\n" + + "\n" + + "decaton.processing.rate.per.partition: 50\n"; + + repo.commit("patch-yaml", Change.ofTextPatch(FILE, ORIGINAL_YAML, UPDATED_YAML)) + .push().join(); + + latch.await(); + assertEquals(20, concurrency.value()); + assertEquals(java.util.Arrays.asList("123456", "79797979"), ignoreKeys.value()); + + assertEquals(20, + IntStream.range(0, 10_000) + .mapToObj(i -> CONFIG_PARTITION_CONCURRENCY) + .map(supplier::getProperty) + .reduce((l, r) -> { + assertSame(l.get(), r.get()); + return l; + }) + .orElseThrow() + .get().value().intValue()); } @Test @Timeout(10) - public void testCDRegisterSuccess() { + public void testCDRegisterSuccessJson() { + final String FILENAME = "/subscription.json"; CentralDogma client = extension.client(); client.createProject(PROJECT_NAME).join(); CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); @@ -173,40 +233,47 @@ public void testCDRegisterSuccess() { CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); Entry prop = centralDogmaRepository.file(Query.ofJson(FILENAME)).get().join(); - assertEquals(defaultProperties().asText(), - prop.content().asText()); + JsonNode expected = defaultProperties(); + JsonNode actual = prop.content(); + assertEquals(expected.toString(), actual.toString(), + () -> "\nexpected: " + expected.toPrettyString() + + "\nactual: " + actual.toPrettyString()); } @Test @Timeout(10) - public void testCDRegisterNonExistentProject() { - assertThrows(RuntimeException.class, () -> { - CentralDogmaPropertySupplier.register(extension.client(), - "non-existent-project", REPOSITORY_NAME, FILENAME); - }); - } - - @Test - @Timeout(15) - public void testCDRegisterTimeout() { + public void testCDRegisterSuccessYaml() { + String yamlFile = "/subscription.yaml"; CentralDogma client = extension.client(); client.createProject(PROJECT_NAME).join(); - CentralDogmaRepository centralDogmaRepository = spy(client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join()); - - doReturn(CompletableFuture.completedFuture(new Revision(1))) - .when(centralDogmaRepository) - .normalize(any()); - - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); + CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); - assertThrows(RuntimeException.class, () -> { - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); - }); + CentralDogmaPropertySupplier.register(centralDogmaRepository, yamlFile); + + String expectedYaml = "decaton.ignore.keys: []\n" + + "decaton.processing.rate.per.partition: -1\n" + + "decaton.partition.concurrency: 1\n" + + "decaton.max.pending.records: 10000\n" + + "decaton.commit.interval.ms: 1000\n" + + "decaton.group.rebalance.timeout.ms: 1000\n" + + "decaton.processing.shutdown.timeout.ms: 0\n" + + "decaton.logging.mdc.enabled: true\n" + + "decaton.client.metrics.micrometer.bound: false\n" + + "decaton.deferred.complete.timeout.ms: -1\n" + + "decaton.processor.threads.termination.timeout.ms: 9223372036854775807\n" + + "decaton.per.key.quota.processing.rate: -1\n" + + "decaton.retry.task.in.legacy.format: false\n" + + "decaton.legacy.parse.fallback.enabled: false\n"; + + String actualText = centralDogmaRepository.file(Query.ofText(yamlFile)).get().join().content(); + assertEquals(expectedYaml, actualText); + log.info("Content of {}: {}", yamlFile, actualText); } @Test @Timeout(15) - public void testCDRegisterConflict() throws Exception { + public void testCDRegisterConflictJson() throws Exception { + final String FILENAME = "/subscription.json"; CountDownLatch userAIsRunning = new CountDownLatch(1); CountDownLatch userBIsRunning = new CountDownLatch(1); @@ -246,4 +313,165 @@ public void testCDRegisterConflict() throws Exception { assertEquals(userBPush, prop.content()); } + + @Test + @Timeout(15) + void testCDRegisterConflictYaml() throws Exception { + final String FILE = "/subscription.yaml"; + CountDownLatch userAIsRunning = new CountDownLatch(1); + CountDownLatch userBIsRunning = new CountDownLatch(1); + + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + + CentralDogmaRepository userB = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + CentralDogmaRepository userA = spy(client.forRepo(PROJECT_NAME, REPOSITORY_NAME)); + + final String userBYaml = + "# pushed by user‑B (should win the race)\n" + + "foo: bar\n"; + + JsonNode userBPush = Jackson.readTree("{\"foo\":\"bar\"}"); + + String defaultYaml = new ObjectMapper(new YAMLFactory().disable(WRITE_DOC_START_MARKER)) + .writeValueAsString(defaultProperties()); + + doAnswer(inv -> { + userAIsRunning.countDown(); + userBIsRunning.await(); + return inv.callRealMethod(); + }).when(userA) + .commit(any(), eq(Change.ofTextUpsert(FILE, defaultYaml))); + + ExecutorService svc = Executors.newFixedThreadPool(2); + + svc.submit(() -> CentralDogmaPropertySupplier.register(userA, FILE)); + + svc.submit(() -> { + try { + userAIsRunning.await(); + userB.commit("userB‑push", Change.ofTextUpsert(FILE, userBYaml)) + .push().join(); + userBIsRunning.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }); + + svc.shutdown(); + svc.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + Entry entry = userA.file(Query.ofText(FILE)).get().join(); + + assertEquals(userBYaml, entry.content()); + + JsonNode actual = new ObjectMapper(new YAMLFactory()) + .readTree(entry.content()); + assertEquals(userBPush, actual); + } + + + interface FormatCase { + String file(); + + Change upsert(String body); + + String emptyBody(); + } + + private static final FormatCase JSON = new FormatCase() { + public String file() { + return "/subscription.json"; + } + + public Change upsert(String body) { + return Change.ofJsonUpsert(file(), body); + } + + public String emptyBody() { + return "{}"; + } + + @Override + public String toString() { + return "JSON"; + } + }; + + private static final FormatCase YAML = new FormatCase() { + public String file() { + return "/subscription.yaml"; + } + + public Change upsert(String body) { + return Change.ofTextUpsert(file(), body); + } + + public String emptyBody() { + return ""; + } + + @Override + public String toString() { + return "YAML"; + } + }; + + static Stream formats() { + return Stream.of(JSON, YAML); + } + + @ParameterizedTest(name = "registerTimeout‑{0}") + @MethodSource("formats") + @Timeout(15) + void testCDRegisterTimeout(FormatCase testCase) { + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository centralDogmaRepository = spy(client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join()); + + doReturn(CompletableFuture.completedFuture(new Revision(1))) + .when(centralDogmaRepository) + .normalize(any()); + + CentralDogmaPropertySupplier.register(centralDogmaRepository, testCase.file()); + + assertThrows(RuntimeException.class, () -> { + CentralDogmaPropertySupplier.register(centralDogmaRepository, testCase.file()); + }); + } + + @ParameterizedTest(name = "registerNonExistentProject‑{0}") + @MethodSource("formats") + void testCDRegisterNonExistentProject(FormatCase testCase) { + assertThrows(RuntimeException.class, () -> { + CentralDogmaPropertySupplier.register(extension.client(), + "non-existent-project", REPOSITORY_NAME, testCase.file()); + }); + } + + @ParameterizedTest(name = "fileExists‑{0}") + @MethodSource("formats") + void testFileExist(FormatCase testCase) { + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + + centralDogmaRepository + .commit("test", testCase.upsert(testCase.emptyBody())) + .push() + .join(); + assertTrue(CentralDogmaPropertySupplier + .fileExists(centralDogmaRepository, testCase.file(), Revision.HEAD)); + } + + @ParameterizedTest(name = "fileNonExistent‑{0}") + @MethodSource("formats") + void testFileNonExistent(FormatCase testCase) { + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + assertFalse(CentralDogmaPropertySupplier + .fileExists(centralDogmaRepository, testCase.file(), Revision.HEAD)); + } } diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java index d3573416..7a6b3ab4 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java @@ -16,6 +16,7 @@ package com.linecorp.decaton.centraldogma; +import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertSame; @@ -23,22 +24,21 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -47,6 +47,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.linecorp.centraldogma.client.CentralDogmaRepository; import com.linecorp.centraldogma.client.CommitRequest; @@ -69,8 +70,10 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class CentralDogmaPropertySupplierTest { private final ObjectMapper objectMapper = new ObjectMapper(); - - private static final String FILENAME = "/subscription.json"; + private static final ObjectMapper YAML_MAPPER = new ObjectMapper( + new YAMLFactory() + .disable(WRITE_DOC_START_MARKER) + ); private static final PropertyDefinition LONG_PROPERTY = PropertyDefinition.define("num.property", Long.class, 0L, @@ -89,37 +92,44 @@ public class CentralDogmaPropertySupplierTest { @Mock Watcher rootWatcher; - private CentralDogmaPropertySupplier supplier; + private static Stream fileParams() { + return Stream.of( + Arguments.of("/subscription.json"), + Arguments.of("/subscription.yaml") + ); + } - @BeforeEach - public void setUp() { - when(centralDogmaRepository.watcher(Query.ofJsonPath(FILENAME))).thenReturn(watcherRequest); + @SuppressWarnings("unchecked") + private CentralDogmaPropertySupplier setup(String fileName) { + when(centralDogmaRepository.watcher(any(Query.class))) + .thenReturn(watcherRequest); + + // yaml mode + if (fileName.endsWith(".yaml")) { + doReturn(watcherRequest) + .when(watcherRequest) + .map(any()); + } when(watcherRequest.start()).thenReturn(rootWatcher); - supplier = new CentralDogmaPropertySupplier(centralDogmaRepository, FILENAME); + return new CentralDogmaPropertySupplier(centralDogmaRepository, fileName); } - @Test - @SuppressWarnings("unchecked") - public void testWatcherSetup() { + @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @MethodSource("fileParams") + public void testWatcherSetup(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + when(rootWatcher.latestValue()).thenReturn( objectMapper.createObjectNode().put(LONG_PROPERTY.name(), 123L)); - Watcher longPropertyWatcher = mock(Watcher.class); - Watcher listPropertyWatcher = mock(Watcher.class); - - when(rootWatcher.newChild((Query) any())) - .thenReturn(longPropertyWatcher) - .thenReturn(listPropertyWatcher) - .thenReturn(null); - assertTrue(supplier.getProperty(LONG_PROPERTY).isPresent()); - - verify(rootWatcher).newChild(any()); - verify(longPropertyWatcher).watch(any(Consumer.class)); } - @Test - public void testConvertValue() { + @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @MethodSource("fileParams") + public void testConvertValue(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + JsonNodeFactory factory = objectMapper.getNodeFactory(); Object convertedLong = supplier.convertNodeToValue( @@ -133,8 +143,11 @@ public void testConvertValue() { assertEquals(Arrays.asList("foo", "bar"), convertedList); } - @Test - public void testSetValue() { + @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @MethodSource("fileParams") + public void testSetValue(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + JsonNodeFactory factory = objectMapper.getNodeFactory(); DynamicProperty prop = spy(new DynamicProperty<>(LONG_PROPERTY)); @@ -142,25 +155,40 @@ public void testSetValue() { verify(prop).checkingSet(10L); } - @Test - public void testSetNullValue() { - JsonNodeFactory factory = objectMapper.getNodeFactory(); - - DynamicProperty prop = spy(new DynamicProperty<>(LONG_PROPERTY)); - supplier.setValue(prop, factory.nullNode()); - verify(prop).checkingSet(LONG_PROPERTY.defaultValue()); - } + @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @MethodSource("fileParams") + public void testGetPropertyAbsentName(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); - @Test - public void testGetPropertyAbsentName() { when(rootWatcher.latestValue()).thenReturn(objectMapper.createObjectNode()); PropertyDefinition missingProperty = PropertyDefinition.define("absent.value", Long.class); assertFalse(supplier.getProperty(missingProperty).isPresent()); } - @Test - public void testRegisterWithDefaultSettings() { +// @Test +// public void testSetNullValue() { +// JsonNodeFactory factory = objectMapper.getNodeFactory(); +// +// DynamicProperty prop = spy(new DynamicProperty<>(LONG_PROPERTY)); +// supplier.setValue(prop, factory.nullNode()); +// verify(prop).checkingSet(LONG_PROPERTY.defaultValue()); +// } + + + private Change expectedChange(String fileName, JsonNode node) throws Exception { + if (fileName.endsWith(".yaml") || fileName.endsWith(".yml")) { + return Change.ofTextUpsert(fileName, YAML_MAPPER.writeValueAsString(node)); + } else { + return Change.ofJsonUpsert(fileName, node); + } + } + + @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @MethodSource("fileParams") + public void testRegisterWithDefaultSettings(String fileName) throws Exception { + setup(fileName); + when(centralDogmaRepository.normalize(Revision.HEAD)) .thenReturn(CompletableFuture.completedFuture(Revision.HEAD)); @@ -169,19 +197,23 @@ public void testRegisterWithDefaultSettings() { when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap())); final CommitRequest commitRequest = mock(CommitRequest.class); - when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode())))).thenReturn(commitRequest); + final Change upsert = expectedChange(fileName, defaultPropertiesAsJsonNode()); + when(centralDogmaRepository.commit(anyString(), eq(upsert))).thenReturn(commitRequest); when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, 1))); - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); + CentralDogmaPropertySupplier.register(centralDogmaRepository, fileName); verify(centralDogmaRepository).commit( anyString(), - eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode())) + eq(upsert) ); } - @Test - public void testRegisterWithCustomizedSettings() { + @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @MethodSource("fileParams") + public void testRegisterWithCustomizedSettings(String fileName) throws Exception { + setup(fileName); + final int settingForPartitionConcurrency = 188; final int settingForMaxPendingRecords = 121212; final int whenCentralDogmaPushed = 111111; @@ -219,15 +251,16 @@ public void testRegisterWithCustomizedSettings() { when(centralDogmaRepository.file(any(PathPattern.class))).thenReturn(filesRequest); when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap())); + final Change upsert = expectedChange(fileName, jsonNodeProperties); final CommitRequest commitRequest = mock(CommitRequest.class); - when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest); + when(centralDogmaRepository.commit(anyString(), eq(upsert))).thenReturn(commitRequest); when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, whenCentralDogmaPushed))); - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME, supplier); + CentralDogmaPropertySupplier.register(centralDogmaRepository, fileName, supplier); verify(centralDogmaRepository).commit( anyString(), - eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)) + eq(upsert) ); } diff --git a/docs/dynamic-property-configuration.adoc b/docs/dynamic-property-configuration.adoc index 3e2609fc..edff9984 100644 --- a/docs/dynamic-property-configuration.adoc +++ b/docs/dynamic-property-configuration.adoc @@ -80,6 +80,53 @@ public class CentralDogmaSupplierMain { } ---- +== YAML Support + +You can store the property file in YAML as well as JSON. +Nothing changes in your code except the file‐name extension. + +Note you cannot use YAML's tag, anchor, or alias features. Just you can add comment in the file. + +[source,java] +---- +CentralDogmaPropertySupplier supplier = + CentralDogmaPropertySupplier.register( + centralDogma, + "project", + "repository", + "/properties.yaml"); // <1> +---- +<1> Use `.yaml` (or `.yml`) instead of `.json`. +All other APIs and behaviours remain exactly the same. + +=== Authoring the YAML file + +You may keep the flat, dot‑separated keys. + +[source,yaml] +---- +# You should use the flat, dot-separated keys like JSON. +decaton.partition.concurrency: 8 +decaton.processing.rate.per.partition: 50 +---- + +As with JSON, you cannot use nested structures in YAML. +Therefore, the following is not allowed: + +[source,yaml] +---- +# This is not supported. +decaton: + partition: + concurrency: 8 + processing: + rate: + per: + partition: 50 +---- + +Comments (`# like this`) are allowed and ignored by decaton. + == Multiple Property Suppliers You can specify multiple property suppliers including one provides hardcoded properties. From 3ec161896c3176f10a1fff47e3dc58aac7f8bf7a Mon Sep 17 00:00:00 2001 From: Yuta Kasai Date: Sun, 7 Sep 2025 15:21:18 +0900 Subject: [PATCH 2/6] NO-ISSUE fix test --- .../CentralDogmaPropertySupplier.java | 23 +++---- ...lDogmaPropertySupplierIntegrationTest.java | 64 ++++++++----------- .../CentralDogmaPropertySupplierTest.java | 40 ++++++------ 3 files changed, 59 insertions(+), 68 deletions(-) diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index 9879fc3b..1769bed5 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -47,6 +47,8 @@ import com.linecorp.decaton.processor.runtime.PropertyDefinition; import com.linecorp.decaton.processor.runtime.PropertySupplier; +import static com.linecorp.decaton.processor.runtime.ProcessorProperties.PROPERTY_DEFINITIONS; + /** * A {@link PropertySupplier} implementation with Central Dogma backend. *

@@ -119,18 +121,18 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor } rootWatcher.watch(node -> { - node.fields().forEachRemaining(entry -> { - DynamicProperty p = cachedProperties.get(entry.getKey()); - if (p != null) { + for (PropertyDefinition definition : PROPERTY_DEFINITIONS) { + DynamicProperty p = cachedProperties.get(definition.name()); + if (p != null && node.has(definition.name())) { try { - setValue(p, entry.getValue()); + setValue(p, node.get(definition.name())); } catch (Exception e) { // Catching Exception instead of RuntimeException, since // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set initial value from CentralDogma for {}", entry.getKey(), e); + logger.warn("Failed to set value from CentralDogma for {}", definition.name(), e); } } - }); + } }); } @@ -163,7 +165,7 @@ public Optional> getProperty(PropertyDefinition definition) { } catch (Exception e) { // Catching Exception instead of RuntimeException, since // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e); + logger.warn("Failed to set value from CentralDogma for {}", definition.name(), e); } return prop; }); @@ -247,13 +249,6 @@ public static CentralDogmaPropertySupplier register(CentralDogmaRepository centr private static void createPropertyFile(CentralDogmaRepository centralDogmaRepository, String fileName, List> properties) { - // show given properties - logger.info("Creating CentralDogma property file: {} with properties: {}", - fileName, - properties.stream() - .map(p -> String.format("%s=%s", p.definition().name(), p.value())) - .collect(Collectors.joining(", "))); - Revision baseRevision = normalizeRevision(centralDogmaRepository, Revision.HEAD); boolean fileExists = fileExists(centralDogmaRepository, fileName, baseRevision); long startedTime = System.currentTimeMillis(); diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java index d001e768..91337bf4 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java @@ -18,7 +18,12 @@ import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PARTITION_CONCURRENCY; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -36,8 +41,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; @@ -60,7 +63,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -@Slf4j public class CentralDogmaPropertySupplierIntegrationTest { @RegisterExtension final CentralDogmaExtension extension = new CentralDogmaExtension() { @@ -210,16 +212,14 @@ void testCDIntegrationYaml() throws Exception { assertEquals(20, concurrency.value()); assertEquals(java.util.Arrays.asList("123456", "79797979"), ignoreKeys.value()); - assertEquals(20, - IntStream.range(0, 10_000) - .mapToObj(i -> CONFIG_PARTITION_CONCURRENCY) - .map(supplier::getProperty) - .reduce((l, r) -> { - assertSame(l.get(), r.get()); - return l; - }) - .orElseThrow() - .get().value().intValue()); + assertEquals(20, IntStream + .range(0, 10_000) + .mapToObj(i -> CONFIG_PARTITION_CONCURRENCY) + .map(supplier::getProperty) + .reduce((l, r) -> { + assertSame(l.get(), r.get()); + return l; + }).get().get().value().intValue()); } @Test @@ -242,7 +242,7 @@ public void testCDRegisterSuccessJson() { @Test @Timeout(10) - public void testCDRegisterSuccessYaml() { + public void testCDRegisterSuccessYaml() throws Exception { String yamlFile = "/subscription.yaml"; CentralDogma client = extension.client(); client.createProject(PROJECT_NAME).join(); @@ -250,24 +250,18 @@ public void testCDRegisterSuccessYaml() { CentralDogmaPropertySupplier.register(centralDogmaRepository, yamlFile); - String expectedYaml = "decaton.ignore.keys: []\n" - + "decaton.processing.rate.per.partition: -1\n" - + "decaton.partition.concurrency: 1\n" - + "decaton.max.pending.records: 10000\n" - + "decaton.commit.interval.ms: 1000\n" - + "decaton.group.rebalance.timeout.ms: 1000\n" - + "decaton.processing.shutdown.timeout.ms: 0\n" - + "decaton.logging.mdc.enabled: true\n" - + "decaton.client.metrics.micrometer.bound: false\n" - + "decaton.deferred.complete.timeout.ms: -1\n" - + "decaton.processor.threads.termination.timeout.ms: 9223372036854775807\n" - + "decaton.per.key.quota.processing.rate: -1\n" - + "decaton.retry.task.in.legacy.format: false\n" - + "decaton.legacy.parse.fallback.enabled: false\n"; - String actualText = centralDogmaRepository.file(Query.ofText(yamlFile)).get().join().content(); - assertEquals(expectedYaml, actualText); - log.info("Content of {}: {}", yamlFile, actualText); + + ObjectMapper yaml = new ObjectMapper(new YAMLFactory()); + JsonNode actual = yaml.readTree(actualText); + JsonNode expected = defaultProperties(); + + assertEquals(expected.toString(), actual.toString(), + () -> "\nexpected: " + expected.toPrettyString() + + "\nactual: " + actual.toPrettyString()); + + assertFalse(actualText.startsWith("---"), "YAML should not include doc start marker"); + assertFalse(actualText.trim().startsWith("{"), "YAML should not be JSON text"); } @Test @@ -331,7 +325,7 @@ void testCDRegisterConflictYaml() throws Exception { "# pushed by user‑B (should win the race)\n" + "foo: bar\n"; - JsonNode userBPush = Jackson.readTree("{\"foo\":\"bar\"}"); + JsonNode userBYamlAsJsonNode = Jackson.readTree("{\"foo\":\"bar\"}"); String defaultYaml = new ObjectMapper(new YAMLFactory().disable(WRITE_DOC_START_MARKER)) .writeValueAsString(defaultProperties()); @@ -344,9 +338,7 @@ void testCDRegisterConflictYaml() throws Exception { .commit(any(), eq(Change.ofTextUpsert(FILE, defaultYaml))); ExecutorService svc = Executors.newFixedThreadPool(2); - svc.submit(() -> CentralDogmaPropertySupplier.register(userA, FILE)); - svc.submit(() -> { try { userAIsRunning.await(); @@ -368,7 +360,7 @@ void testCDRegisterConflictYaml() throws Exception { JsonNode actual = new ObjectMapper(new YAMLFactory()) .readTree(entry.content()); - assertEquals(userBPush, actual); + assertEquals(userBYamlAsJsonNode, actual); } diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java index 7a6b3ab4..f2074266 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java @@ -24,7 +24,11 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.Collections; @@ -34,7 +38,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -114,7 +117,7 @@ private CentralDogmaPropertySupplier setup(String fileName) { return new CentralDogmaPropertySupplier(centralDogmaRepository, fileName); } - @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @ParameterizedTest() @MethodSource("fileParams") public void testWatcherSetup(String fileName) { CentralDogmaPropertySupplier supplier = setup(fileName); @@ -125,7 +128,7 @@ public void testWatcherSetup(String fileName) { assertTrue(supplier.getProperty(LONG_PROPERTY).isPresent()); } - @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @ParameterizedTest() @MethodSource("fileParams") public void testConvertValue(String fileName) { CentralDogmaPropertySupplier supplier = setup(fileName); @@ -143,7 +146,7 @@ public void testConvertValue(String fileName) { assertEquals(Arrays.asList("foo", "bar"), convertedList); } - @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @ParameterizedTest() @MethodSource("fileParams") public void testSetValue(String fileName) { CentralDogmaPropertySupplier supplier = setup(fileName); @@ -155,7 +158,18 @@ public void testSetValue(String fileName) { verify(prop).checkingSet(10L); } - @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @ParameterizedTest() + @MethodSource("fileParams") + public void testSetNullValue(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + JsonNodeFactory factory = objectMapper.getNodeFactory(); + + DynamicProperty prop = spy(new DynamicProperty<>(LONG_PROPERTY)); + supplier.setValue(prop, factory.nullNode()); + verify(prop).checkingSet(LONG_PROPERTY.defaultValue()); + } + + @ParameterizedTest() @MethodSource("fileParams") public void testGetPropertyAbsentName(String fileName) { CentralDogmaPropertySupplier supplier = setup(fileName); @@ -166,16 +180,6 @@ public void testGetPropertyAbsentName(String fileName) { assertFalse(supplier.getProperty(missingProperty).isPresent()); } -// @Test -// public void testSetNullValue() { -// JsonNodeFactory factory = objectMapper.getNodeFactory(); -// -// DynamicProperty prop = spy(new DynamicProperty<>(LONG_PROPERTY)); -// supplier.setValue(prop, factory.nullNode()); -// verify(prop).checkingSet(LONG_PROPERTY.defaultValue()); -// } - - private Change expectedChange(String fileName, JsonNode node) throws Exception { if (fileName.endsWith(".yaml") || fileName.endsWith(".yml")) { return Change.ofTextUpsert(fileName, YAML_MAPPER.writeValueAsString(node)); @@ -184,7 +188,7 @@ private Change expectedChange(String fileName, JsonNode node) throws Exceptio } } - @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @ParameterizedTest() @MethodSource("fileParams") public void testRegisterWithDefaultSettings(String fileName) throws Exception { setup(fileName); @@ -209,7 +213,7 @@ public void testRegisterWithDefaultSettings(String fileName) throws Exception { ); } - @ParameterizedTest(name = "watcherSetup[{index}] → {0}") + @ParameterizedTest() @MethodSource("fileParams") public void testRegisterWithCustomizedSettings(String fileName) throws Exception { setup(fileName); From 0c1da8a3d67f3a88f77c9845953dc3b045ee1536 Mon Sep 17 00:00:00 2001 From: Yuta Kasai Date: Mon, 15 Sep 2025 00:30:57 +0900 Subject: [PATCH 3/6] NO-ISSUE Fix which keys are used to update dynamic properties --- .../CentralDogmaPropertySupplier.java | 11 +- ...lDogmaPropertySupplierIntegrationTest.java | 101 +++++++++++++++++- docs/dynamic-property-configuration.adoc | 26 +++-- 3 files changed, 121 insertions(+), 17 deletions(-) diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index 1769bed5..f5d558d5 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -121,15 +121,14 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor } rootWatcher.watch(node -> { - for (PropertyDefinition definition : PROPERTY_DEFINITIONS) { - DynamicProperty p = cachedProperties.get(definition.name()); - if (p != null && node.has(definition.name())) { + for(ConcurrentHashMap.Entry> cachedProperty : cachedProperties.entrySet()) { + if (node.has(cachedProperty.getKey())) { try { - setValue(p, node.get(definition.name())); + setValue(cachedProperty.getValue(), node.get(cachedProperty.getKey())); } catch (Exception e) { // Catching Exception instead of RuntimeException, since // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set value from CentralDogma for {}", definition.name(), e); + logger.warn("Failed to set value updatedfrom CentralDogma for {}", cachedProperty.getKey(), e); } } } @@ -165,7 +164,7 @@ public Optional> getProperty(PropertyDefinition definition) { } catch (Exception e) { // Catching Exception instead of RuntimeException, since // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set value from CentralDogma for {}", definition.name(), e); + logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e); } return prop; }); diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java index 91337bf4..1916e07f 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java @@ -41,6 +41,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.linecorp.decaton.processor.runtime.PropertyDefinition; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; @@ -222,6 +223,98 @@ void testCDIntegrationYaml() throws Exception { }).get().get().value().intValue()); } + @Test + @Timeout(50) + public void testCDIntegrationDynamicPropertyJson() throws InterruptedException { + final String FILE = "/subscription.json"; + CentralDogma client = extension.client(); + + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository repo = + client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + + final String topic = "orders"; + final String dynamicName = + "decaton.shaping.topic.processing.rate.per.partition." + topic; + assertFalse(ProcessorProperties.defaultProperties().stream() + .anyMatch(p -> p.definition().name().equals(dynamicName))); + + final String ORIGINAL = + "{\n" + + " \"" + dynamicName + "\": 7,\n" + + " \"decaton.partition.concurrency\": 10\n" + + "}\n"; + + repo.commit("init-json", Change.ofJsonUpsert(FILE, ORIGINAL)).push().join(); + + CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE); + + PropertyDefinition DYNAMIC_INT = + PropertyDefinition.define(dynamicName, Integer.class, 0, v -> v instanceof Integer); + + Property prop = supplier.getProperty(DYNAMIC_INT).get(); + assertEquals(7, prop.value().intValue()); + + final String UPDATED = + "{\n" + + " \"" + dynamicName + "\": 11,\n" // This is changed + + " \"decaton.partition.concurrency\": 10\n" + + "}\n"; + + CountDownLatch latch = new CountDownLatch(2); + prop.listen((oldV, newV) -> latch.countDown()); + + repo.commit("patch-json", Change.ofJsonPatch(FILE, ORIGINAL, UPDATED)).push().join(); + + latch.await(); + assertEquals(11, prop.value().intValue()); + } + + @Test + @Timeout(50) + public void testCDIntegrationDynamicPropertyYaml() throws Exception { + final String FILE = "/subscription.yaml"; + CentralDogma client = extension.client(); + + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository repo = + client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + + final String topic = "payments"; + final String dynamicName = + "decaton.shaping.topic.processing.rate.per.partition." + topic; + assertFalse(ProcessorProperties.defaultProperties().stream() + .anyMatch(p -> p.definition().name().equals(dynamicName))); + + final String ORIGINAL_YAML = + "decaton.partition.concurrency: 10\n" + + dynamicName + ": 3\n"; + + repo.commit("init-yaml", Change.ofTextUpsert(FILE, ORIGINAL_YAML)) + .push().join(); + + CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE); + + PropertyDefinition DYNAMIC_INT = + PropertyDefinition.define(dynamicName, Integer.class, 0, v -> v instanceof Integer); + + Property prop = supplier.getProperty(DYNAMIC_INT).get(); + assertEquals(3, prop.value().intValue()); + + final String UPDATED_YAML = + "decaton.partition.concurrency: 10\n" + + dynamicName + ": 9\n"; // This is changed + + CountDownLatch latch = new CountDownLatch(2); + prop.listen((o, n) -> latch.countDown()); + + repo.commit("patch-yaml", Change.ofTextPatch(FILE, ORIGINAL_YAML, UPDATED_YAML)) + .push().join(); + + latch.await(); + assertEquals(9, prop.value().intValue()); + } + @Test @Timeout(10) public void testCDRegisterSuccessJson() { @@ -414,7 +507,7 @@ static Stream formats() { return Stream.of(JSON, YAML); } - @ParameterizedTest(name = "registerTimeout‑{0}") + @ParameterizedTest() @MethodSource("formats") @Timeout(15) void testCDRegisterTimeout(FormatCase testCase) { @@ -433,7 +526,7 @@ void testCDRegisterTimeout(FormatCase testCase) { }); } - @ParameterizedTest(name = "registerNonExistentProject‑{0}") + @ParameterizedTest() @MethodSource("formats") void testCDRegisterNonExistentProject(FormatCase testCase) { assertThrows(RuntimeException.class, () -> { @@ -442,7 +535,7 @@ void testCDRegisterNonExistentProject(FormatCase testCase) { }); } - @ParameterizedTest(name = "fileExists‑{0}") + @ParameterizedTest() @MethodSource("formats") void testFileExist(FormatCase testCase) { CentralDogma client = extension.client(); @@ -457,7 +550,7 @@ void testFileExist(FormatCase testCase) { .fileExists(centralDogmaRepository, testCase.file(), Revision.HEAD)); } - @ParameterizedTest(name = "fileNonExistent‑{0}") + @ParameterizedTest() @MethodSource("formats") void testFileNonExistent(FormatCase testCase) { CentralDogma client = extension.client(); diff --git a/docs/dynamic-property-configuration.adoc b/docs/dynamic-property-configuration.adoc index 6382f23c..7da6ae90 100644 --- a/docs/dynamic-property-configuration.adoc +++ b/docs/dynamic-property-configuration.adoc @@ -1,6 +1,7 @@ = Dynamic Property Configuration :base_version: 9.0.0 :modules: centraldogma,processor +:toc: == Property Supplier Decaton provides some properties for you to configure how it processes tasks. These properties don't need to be hard-coded. Decaton lets you configure some of the properties so they can be loaded dynamically. @@ -79,7 +80,7 @@ public class CentralDogmaSupplierMain { } ---- -== YAML Support +== Use YAML instead of JSON You can store the property file in YAML as well as JSON. Nothing changes in your code except the file‐name extension. @@ -98,8 +99,6 @@ CentralDogmaPropertySupplier supplier = <1> Use `.yaml` (or `.yml`) instead of `.json`. All other APIs and behaviours remain exactly the same. -=== Authoring the YAML file - You may keep the flat, dot‑separated keys. [source,yaml] @@ -110,11 +109,11 @@ decaton.processing.rate.per.partition: 50 ---- As with JSON, you cannot use nested structures in YAML. -Therefore, the following is not allowed: +Therefore, the following is NOT allowed: [source,yaml] ---- -# This is not supported. +# This style is not supported. decaton: partition: concurrency: 8 @@ -143,7 +142,7 @@ Hence you should put supplier with higher priority before others. ---- -== JSON Schema +== Validate your configuration file with JSON Schema Decaton ships a set of https://json-schema.org/[JSON Schema] files that precisely describe every key available in `CentralDogmaPropertySupplier` including each key’s type and default value. Leveraging these schemas in your configuration files gives you two immediate benefits: @@ -157,6 +156,7 @@ Leveraging these schemas in your configuration files gives you two immediate ben [source,json] +.decaton-processor-dynamic-configuration.json ---- { "$schema": "https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json", @@ -165,7 +165,19 @@ Leveraging these schemas in your configuration files gives you two immediate ben ... } ---- -For example, you can use JSON Schema by adding a `$schema` directive at the top of the file as shown above.follows. + +[source,yaml] +.decaton-processor-dynamic-configuration.yaml +---- +# $schema: https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json[https://raw.githubusercontent.com/Yang-33/decaton/support-decaton-processor-property-jsonschema-9-1-2-test/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_2019_09-allow-additional-properties.json] +# yaml-language-server: $schema=https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json +decaton.partition.concurrency: 10000 +decaton.processing.rate.per.partition: -1 +... +---- + + +For example, you can use JSON Schema by adding a `$schema` directive at the top of the file as shown above follows. Of course, there may be other ways to use it. Replace `vX.Y.Z` with the exact Decaton version your application depends on. If you prefer living at HEAD, you can also reference `master`, but pinning to a release tag guarantees repeatable builds. From e56aa28f4e9dd8cb88cfb0f4ca010e75de598525 Mon Sep 17 00:00:00 2001 From: Yuta Kasai Date: Fri, 10 Oct 2025 09:04:45 +0900 Subject: [PATCH 4/6] NO-ISSUE Move internal classes into internal package --- .../decaton/centraldogma/CentralDogmaPropertySupplier.java | 3 +-- .../centraldogma/{ => internal}/DecatonPropertyFileFormat.java | 3 ++- .../decaton/centraldogma/{ => internal}/JsonFormat.java | 2 +- .../decaton/centraldogma/{ => internal}/YamlFormat.java | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) rename centraldogma/src/main/java/com/linecorp/decaton/centraldogma/{ => internal}/DecatonPropertyFileFormat.java (91%) rename centraldogma/src/main/java/com/linecorp/decaton/centraldogma/{ => internal}/JsonFormat.java (93%) rename centraldogma/src/main/java/com/linecorp/decaton/centraldogma/{ => internal}/YamlFormat.java (96%) diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index f5d558d5..4df814b9 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -41,14 +41,13 @@ import com.linecorp.centraldogma.common.ChangeConflictException; import com.linecorp.centraldogma.common.PathPattern; import com.linecorp.centraldogma.common.Revision; +import com.linecorp.decaton.centraldogma.internal.DecatonPropertyFileFormat; import com.linecorp.decaton.processor.runtime.DynamicProperty; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; import com.linecorp.decaton.processor.runtime.PropertyDefinition; import com.linecorp.decaton.processor.runtime.PropertySupplier; -import static com.linecorp.decaton.processor.runtime.ProcessorProperties.PROPERTY_DEFINITIONS; - /** * A {@link PropertySupplier} implementation with Central Dogma backend. *

diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/DecatonPropertyFileFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/DecatonPropertyFileFormat.java similarity index 91% rename from centraldogma/src/main/java/com/linecorp/decaton/centraldogma/DecatonPropertyFileFormat.java rename to centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/DecatonPropertyFileFormat.java index 04cde27a..ad7cf36a 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/DecatonPropertyFileFormat.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/DecatonPropertyFileFormat.java @@ -4,7 +4,7 @@ * Licensed under the Apache License, Version 2.0 … */ -package com.linecorp.decaton.centraldogma; +package com.linecorp.decaton.centraldogma.internal; import java.io.IOException; import java.util.Locale; @@ -13,6 +13,7 @@ import com.linecorp.centraldogma.client.CentralDogmaRepository; import com.linecorp.centraldogma.client.Watcher; import com.linecorp.centraldogma.common.Change; +import com.linecorp.decaton.centraldogma.CentralDogmaPropertySupplier; /** * Encapsulates Central Dogma–specific concerns for reading and writing diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/JsonFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/JsonFormat.java similarity index 93% rename from centraldogma/src/main/java/com/linecorp/decaton/centraldogma/JsonFormat.java rename to centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/JsonFormat.java index c6b50fd7..7d3725ae 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/JsonFormat.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/JsonFormat.java @@ -2,7 +2,7 @@ * Copyright 2025 LINE Corporation */ -package com.linecorp.decaton.centraldogma; +package com.linecorp.decaton.centraldogma.internal; import com.fasterxml.jackson.databind.JsonNode; import com.linecorp.centraldogma.client.CentralDogmaRepository; diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/YamlFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/YamlFormat.java similarity index 96% rename from centraldogma/src/main/java/com/linecorp/decaton/centraldogma/YamlFormat.java rename to centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/YamlFormat.java index ed7e2e64..c99852e0 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/YamlFormat.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/YamlFormat.java @@ -4,7 +4,7 @@ * Licensed under the Apache License, Version 2.0 … */ -package com.linecorp.decaton.centraldogma; +package com.linecorp.decaton.centraldogma.internal; import java.io.IOException; import java.io.UncheckedIOException; From f7661b0835018b8f1306dcf4c1d32ecead67eaf8 Mon Sep 17 00:00:00 2001 From: Yuta Kasai Date: Fri, 10 Oct 2025 09:10:06 +0900 Subject: [PATCH 5/6] NO-ISSUE Fix docs --- docs/dynamic-property-configuration.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/dynamic-property-configuration.adoc b/docs/dynamic-property-configuration.adoc index 7da6ae90..5aae2fab 100644 --- a/docs/dynamic-property-configuration.adoc +++ b/docs/dynamic-property-configuration.adoc @@ -82,7 +82,7 @@ public class CentralDogmaSupplierMain { == Use YAML instead of JSON -You can store the property file in YAML as well as JSON. +From decaton v10, you can store the property file in YAML as well as JSON. Nothing changes in your code except the file‐name extension. Note you cannot use YAML's tag, anchor, or alias features. Just you can add comment in the file. @@ -169,7 +169,7 @@ Leveraging these schemas in your configuration files gives you two immediate ben [source,yaml] .decaton-processor-dynamic-configuration.yaml ---- -# $schema: https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json[https://raw.githubusercontent.com/Yang-33/decaton/support-decaton-processor-property-jsonschema-9-1-2-test/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_2019_09-allow-additional-properties.json] +# $schema: https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json # yaml-language-server: $schema=https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json decaton.partition.concurrency: 10000 decaton.processing.rate.per.partition: -1 From 925f8f0ea3245bba682896848528aebade20ad4b Mon Sep 17 00:00:00 2001 From: Yuta Kasai Date: Fri, 10 Oct 2025 10:14:41 +0900 Subject: [PATCH 6/6] NO-ISSUE Fix version in doc --- docs/dynamic-property-configuration.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/dynamic-property-configuration.adoc b/docs/dynamic-property-configuration.adoc index 5aae2fab..5e795742 100644 --- a/docs/dynamic-property-configuration.adoc +++ b/docs/dynamic-property-configuration.adoc @@ -82,7 +82,7 @@ public class CentralDogmaSupplierMain { == Use YAML instead of JSON -From decaton v10, you can store the property file in YAML as well as JSON. +From decaton v9.4.0, you can store the property file in YAML as well as JSON. Nothing changes in your code except the file‐name extension. Note you cannot use YAML's tag, anchor, or alias features. Just you can add comment in the file.