diff --git a/centraldogma/build.gradle b/centraldogma/build.gradle index d702cf38..19e26549 100644 --- a/centraldogma/build.gradle +++ b/centraldogma/build.gradle @@ -28,6 +28,7 @@ dependencies { implementation "org.slf4j:slf4j-api:$slf4jVersion" api "com.linecorp.centraldogma:centraldogma-client:$centralDogmaVersion" implementation "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion" + implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:$jacksonVersion" testImplementation "org.hamcrest:hamcrest:$hamcrestVersion" testImplementation "com.linecorp.centraldogma:centraldogma-testing-junit:$centralDogmaVersion" diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java index 80f71180..4df814b9 100644 --- a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplier.java @@ -16,6 +16,8 @@ package com.linecorp.decaton.centraldogma; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -38,8 +40,8 @@ import com.linecorp.centraldogma.common.Change; import com.linecorp.centraldogma.common.ChangeConflictException; import com.linecorp.centraldogma.common.PathPattern; -import com.linecorp.centraldogma.common.Query; import com.linecorp.centraldogma.common.Revision; +import com.linecorp.decaton.centraldogma.internal.DecatonPropertyFileFormat; import com.linecorp.decaton.processor.runtime.DynamicProperty; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; @@ -48,12 +50,15 @@ /** * A {@link PropertySupplier} implementation with Central Dogma backend. - * + *

* This implementation maps property's {@link PropertyDefinition#name()} as the absolute field name in the file * on Central Dogma. - * + *

+ * You can use json or yaml format for the property file. + * You cannot nest keys in both formats. Keys must be top-level fields. + *

* An example JSON format would be look like: - * {@code + *

{@code
  * {
  *     "decaton.partition.concurrency": 10,
  *     "decaton.ignore.keys": [
@@ -62,7 +67,16 @@
  *     ],
  *     "decaton.processing.rate.per.partition": 50
  * }
- * }
+ * }
+ * + * An example YAML format would be look like: + *
{@code
+ * decaton.partition.concurrency: 10
+ * decaton.ignore.keys:
+ *  - "123456"
+ *  - "79797979"
+ * decaton.processing.rate.per.partition: 50
+ * }
*/ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(CentralDogmaPropertySupplier.class); @@ -73,7 +87,6 @@ public class CentralDogmaPropertySupplier implements PropertySupplier, AutoClose private static final ObjectMapper objectMapper = new ObjectMapper(); private final Watcher rootWatcher; - private final ConcurrentMap> cachedProperties = new ConcurrentHashMap<>(); /** @@ -94,7 +107,9 @@ public CentralDogmaPropertySupplier(CentralDogma centralDogma, String projectNam * @param fileName the name of the file containing properties as top-level fields. */ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepository, String fileName) { - rootWatcher = centralDogmaRepository.watcher(Query.ofJsonPath(fileName)).start(); + DecatonPropertyFileFormat configFile = DecatonPropertyFileFormat.of(fileName); + this.rootWatcher = configFile.createWatcher(centralDogmaRepository, fileName); + try { rootWatcher.awaitInitialValue(INITIAL_VALUE_TIMEOUT_SECS, TimeUnit.SECONDS); } catch (InterruptedException e) { @@ -103,6 +118,20 @@ public CentralDogmaPropertySupplier(CentralDogmaRepository centralDogmaRepositor } catch (TimeoutException e) { throw new RuntimeException(e); } + + rootWatcher.watch(node -> { + for(ConcurrentHashMap.Entry> cachedProperty : cachedProperties.entrySet()) { + if (node.has(cachedProperty.getKey())) { + try { + setValue(cachedProperty.getValue(), node.get(cachedProperty.getKey())); + } catch (Exception e) { + // Catching Exception instead of RuntimeException, since + // Kotlin-implemented DynamicProperty would throw checked exceptions + logger.warn("Failed to set value updatedfrom CentralDogma for {}", cachedProperty.getKey(), e); + } + } + } + }); } // visible for testing @@ -129,25 +158,13 @@ public Optional> getProperty(PropertyDefinition definition) { // for most use cases though, this cache is only filled/read once. final DynamicProperty cachedProp = cachedProperties.computeIfAbsent(definition.name(), name -> { DynamicProperty prop = new DynamicProperty<>(definition); - Watcher child = rootWatcher.newChild(jsonNode -> jsonNode.path(definition.name())); - child.watch(node -> { - try { - setValue(prop, node); - } catch (Exception e) { - // Catching Exception instead of RuntimeException, since - // Kotlin-implemented DynamicProperty would throw checked exceptions - logger.warn("Failed to set value updated from CentralDogma for {}", definition.name(), e); - } - }); try { - JsonNode node = child.initialValueFuture().join().value(); //doesn't fail since it's a child watcher - setValue(prop, node); + setValue(prop, rootWatcher.latestValue().get(definition.name())); } catch (Exception e) { // Catching Exception instead of RuntimeException, since // Kotlin-implemented DynamicProperty would throw checked exceptions logger.warn("Failed to set initial value from CentralDogma for {}", definition.name(), e); } - return prop; }); @@ -175,8 +192,7 @@ public void close() { public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, String project, String repository, String filename) { final CentralDogmaRepository centralDogmaRepository = centralDogma.forRepo(project, repository); - createPropertyFile(centralDogmaRepository, filename, ProcessorProperties.defaultProperties()); - return new CentralDogmaPropertySupplier(centralDogmaRepository, filename); + return register(centralDogmaRepository, filename); } /** @@ -215,6 +231,7 @@ public static CentralDogmaPropertySupplier register(CentralDogma centralDogma, S public static CentralDogmaPropertySupplier register(CentralDogmaRepository centralDogmaRepository, String filename, PropertySupplier supplier) { + List> properties = ProcessorProperties.defaultProperties().stream().map(defaultProperty -> { Optional> prop = supplier.getProperty(defaultProperty.definition()); if (prop.isPresent()) { @@ -236,12 +253,18 @@ private static void createPropertyFile(CentralDogmaRepository centralDogmaReposi long remainingTime = remainingTime(PROPERTY_CREATION_TIMEOUT_MILLIS, startedTime); JsonNode jsonNodeProperties = convertPropertyListToJsonNode(properties); + Change upsert; + try { + upsert = DecatonPropertyFileFormat.of(fileName).createUpsertChange(fileName, jsonNodeProperties); + } catch (IOException e) { + throw new UncheckedIOException(e); + } while (!fileExists && remainingTime > 0) { try { centralDogmaRepository .commit(String.format("[CentralDogmaPropertySupplier] Property file created: %s", fileName), - Change.ofJsonUpsert(fileName, jsonNodeProperties)) + upsert) .push(baseRevision) .get(remainingTime, TimeUnit.MILLISECONDS); logger.info("New property file {} registered on Central Dogma", fileName); diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/DecatonPropertyFileFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/DecatonPropertyFileFormat.java new file mode 100644 index 00000000..ad7cf36a --- /dev/null +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/DecatonPropertyFileFormat.java @@ -0,0 +1,42 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 … + */ + +package com.linecorp.decaton.centraldogma.internal; + +import java.io.IOException; +import java.util.Locale; + +import com.fasterxml.jackson.databind.JsonNode; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.Watcher; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.decaton.centraldogma.CentralDogmaPropertySupplier; + +/** + * Encapsulates Central Dogma–specific concerns for reading and writing + * configuration files in various text formats (JSON, YAML, ...). + *

+ * Implementations convert between raw file contents managed by Central Dogma + * and {@link JsonNode} values consumed by {@link CentralDogmaPropertySupplier}. + */ +public interface DecatonPropertyFileFormat { + /** + * Create and start a Watcher that emits {@link JsonNode} for each file update. + */ + Watcher createWatcher(CentralDogmaRepository repo, String fileName); + + /** + * Serialize the given node and wrap it as Central Dogma {@link Change} for initial file creation. + */ + Change createUpsertChange(String fileName, JsonNode initialNode) throws IOException; + + static DecatonPropertyFileFormat of(String fileName) { + String lower = fileName.toLowerCase(Locale.ROOT); + return (lower.endsWith(".yml") || lower.endsWith(".yaml")) + ? new YamlFormat() + : new JsonFormat(); + } +} diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/JsonFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/JsonFormat.java new file mode 100644 index 00000000..7d3725ae --- /dev/null +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/JsonFormat.java @@ -0,0 +1,23 @@ +/* + * Copyright 2025 LINE Corporation + */ + +package com.linecorp.decaton.centraldogma.internal; + +import com.fasterxml.jackson.databind.JsonNode; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.Watcher; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Query; + +public class JsonFormat implements DecatonPropertyFileFormat { + @Override + public Watcher createWatcher(CentralDogmaRepository repo, String fileName) { + return repo.watcher(Query.ofJsonPath(fileName)).start(); + } + + @Override + public Change createUpsertChange(String fileName, JsonNode initialNode) { + return Change.ofJsonUpsert(fileName, initialNode); + } +} diff --git a/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/YamlFormat.java b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/YamlFormat.java new file mode 100644 index 00000000..c99852e0 --- /dev/null +++ b/centraldogma/src/main/java/com/linecorp/decaton/centraldogma/internal/YamlFormat.java @@ -0,0 +1,46 @@ +/* + * Copyright 2025 LINE Corporation + * + * Licensed under the Apache License, Version 2.0 … + */ + +package com.linecorp.decaton.centraldogma.internal; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.linecorp.centraldogma.client.CentralDogmaRepository; +import com.linecorp.centraldogma.client.Watcher; +import com.linecorp.centraldogma.common.Change; +import com.linecorp.centraldogma.common.Query; + +import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; + +public class YamlFormat implements DecatonPropertyFileFormat { + private static final ObjectMapper YAML_MAPPER = new ObjectMapper( + new YAMLFactory() + .disable(WRITE_DOC_START_MARKER) + ); + + @Override + public Watcher createWatcher(CentralDogmaRepository repo, String fileName) { + return repo.watcher(Query.ofText(fileName)) + .map(text -> { + try { + return YAML_MAPPER.readTree(text); + } catch (IOException e) { + throw new UncheckedIOException("Failed to parse YAML from " + fileName, e); + } + }) + .start(); + } + + @Override + public Change createUpsertChange(String fileName, JsonNode initialNode) throws IOException { + String yaml = YAML_MAPPER.writeValueAsString(initialNode); + return Change.ofTextUpsert(fileName, yaml); + } +} diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java index 9fa135f0..1916e07f 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierIntegrationTest.java @@ -16,12 +16,14 @@ package com.linecorp.decaton.centraldogma; +import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; import static com.linecorp.decaton.processor.runtime.ProcessorProperties.CONFIG_PARTITION_CONCURRENCY; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -33,8 +35,13 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import java.util.stream.Stream; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.linecorp.decaton.processor.runtime.PropertyDefinition; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; @@ -54,6 +61,8 @@ import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; public class CentralDogmaPropertySupplierIntegrationTest { @RegisterExtension @@ -73,7 +82,6 @@ protected void configureHttpClient(WebClientBuilder builder) { private static final String PROJECT_NAME = "unit-test"; private static final String REPOSITORY_NAME = "repo"; - private static final String FILENAME = "/subscription.json"; private JsonNode defaultProperties() { return CentralDogmaPropertySupplier.convertPropertyListToJsonNode( @@ -82,7 +90,8 @@ private JsonNode defaultProperties() { @Test @Timeout(50) - public void testCDIntegration() throws InterruptedException { + public void testCDIntegrationJson() throws InterruptedException { + final String FILENAME = "/subscription.json"; CentralDogma client = extension.client(); final String ORIGINAL = @@ -140,32 +149,176 @@ public void testCDIntegration() throws InterruptedException { } @Test - public void testFileExist() { + @Timeout(50) + void testCDIntegrationYaml() throws Exception { + final String FILE = "/subscription.yaml"; CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); - CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME) - .join(); + CentralDogmaRepository repo = + client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); - centralDogmaRepository - .commit("test", Change.ofJsonUpsert(FILENAME, "{}")) - .push() - .join(); - assertTrue(CentralDogmaPropertySupplier - .fileExists(centralDogmaRepository, FILENAME, Revision.HEAD)); + final String ORIGINAL_YAML = + "# processor properties\n" + + "decaton.partition.concurrency: 10\n" + + "\n" + + "# keys to ignore\n" + + "decaton.ignore.keys:\n" + + " - \"123456\" # hi\n" + + " - \"79797979\" # hello\n" + + "\n" + + "decaton.processing.rate.per.partition: 50\n"; + + repo.commit("init-yaml", Change.ofTextUpsert(FILE, ORIGINAL_YAML)) + .push().join(); + + CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE); + + Property concurrency = + supplier.getProperty(CONFIG_PARTITION_CONCURRENCY).get(); + Property> ignoreKeys = + supplier.getProperty(ProcessorProperties.CONFIG_IGNORE_KEYS).get(); + + assertEquals(10, concurrency.value()); + assertEquals(java.util.Arrays.asList("123456", "79797979"), ignoreKeys.value()); + + CountDownLatch latch = new CountDownLatch(2); + concurrency.listen((o, n) -> latch.countDown()); + + AtomicBoolean firstCall = new AtomicBoolean(true); + + ignoreKeys.listen((oldVal, newVal) -> { + // null to list is allowed + if (firstCall.getAndSet(false)) { + return; + } + fail("ignoreKeys should not be updated after the first call"); + }); + + final String UPDATED_YAML = + "# processor properties\n" + + "decaton.partition.concurrency: 20\n" // This is changed + + "\n" + + "# keys to ignore\n" + + "decaton.ignore.keys:\n" + + " - \"123456\" # hi\n" + + " - \"79797979\" # hello\n" + + "\n" + + "decaton.processing.rate.per.partition: 50\n"; + + repo.commit("patch-yaml", Change.ofTextPatch(FILE, ORIGINAL_YAML, UPDATED_YAML)) + .push().join(); + + latch.await(); + assertEquals(20, concurrency.value()); + assertEquals(java.util.Arrays.asList("123456", "79797979"), ignoreKeys.value()); + + assertEquals(20, IntStream + .range(0, 10_000) + .mapToObj(i -> CONFIG_PARTITION_CONCURRENCY) + .map(supplier::getProperty) + .reduce((l, r) -> { + assertSame(l.get(), r.get()); + return l; + }).get().get().value().intValue()); } @Test - public void testFileNonExistent() { + @Timeout(50) + public void testCDIntegrationDynamicPropertyJson() throws InterruptedException { + final String FILE = "/subscription.json"; CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); - CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); - assertFalse(CentralDogmaPropertySupplier - .fileExists(centralDogmaRepository, FILENAME, Revision.HEAD)); + CentralDogmaRepository repo = + client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + + final String topic = "orders"; + final String dynamicName = + "decaton.shaping.topic.processing.rate.per.partition." + topic; + assertFalse(ProcessorProperties.defaultProperties().stream() + .anyMatch(p -> p.definition().name().equals(dynamicName))); + + final String ORIGINAL = + "{\n" + + " \"" + dynamicName + "\": 7,\n" + + " \"decaton.partition.concurrency\": 10\n" + + "}\n"; + + repo.commit("init-json", Change.ofJsonUpsert(FILE, ORIGINAL)).push().join(); + + CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE); + + PropertyDefinition DYNAMIC_INT = + PropertyDefinition.define(dynamicName, Integer.class, 0, v -> v instanceof Integer); + + Property prop = supplier.getProperty(DYNAMIC_INT).get(); + assertEquals(7, prop.value().intValue()); + + final String UPDATED = + "{\n" + + " \"" + dynamicName + "\": 11,\n" // This is changed + + " \"decaton.partition.concurrency\": 10\n" + + "}\n"; + + CountDownLatch latch = new CountDownLatch(2); + prop.listen((oldV, newV) -> latch.countDown()); + + repo.commit("patch-json", Change.ofJsonPatch(FILE, ORIGINAL, UPDATED)).push().join(); + + latch.await(); + assertEquals(11, prop.value().intValue()); + } + + @Test + @Timeout(50) + public void testCDIntegrationDynamicPropertyYaml() throws Exception { + final String FILE = "/subscription.yaml"; + CentralDogma client = extension.client(); + + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository repo = + client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + + final String topic = "payments"; + final String dynamicName = + "decaton.shaping.topic.processing.rate.per.partition." + topic; + assertFalse(ProcessorProperties.defaultProperties().stream() + .anyMatch(p -> p.definition().name().equals(dynamicName))); + + final String ORIGINAL_YAML = + "decaton.partition.concurrency: 10\n" + + dynamicName + ": 3\n"; + + repo.commit("init-yaml", Change.ofTextUpsert(FILE, ORIGINAL_YAML)) + .push().join(); + + CentralDogmaPropertySupplier supplier = new CentralDogmaPropertySupplier(repo, FILE); + + PropertyDefinition DYNAMIC_INT = + PropertyDefinition.define(dynamicName, Integer.class, 0, v -> v instanceof Integer); + + Property prop = supplier.getProperty(DYNAMIC_INT).get(); + assertEquals(3, prop.value().intValue()); + + final String UPDATED_YAML = + "decaton.partition.concurrency: 10\n" + + dynamicName + ": 9\n"; // This is changed + + CountDownLatch latch = new CountDownLatch(2); + prop.listen((o, n) -> latch.countDown()); + + repo.commit("patch-yaml", Change.ofTextPatch(FILE, ORIGINAL_YAML, UPDATED_YAML)) + .push().join(); + + latch.await(); + assertEquals(9, prop.value().intValue()); } @Test @Timeout(10) - public void testCDRegisterSuccess() { + public void testCDRegisterSuccessJson() { + final String FILENAME = "/subscription.json"; CentralDogma client = extension.client(); client.createProject(PROJECT_NAME).join(); CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); @@ -173,40 +326,41 @@ public void testCDRegisterSuccess() { CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); Entry prop = centralDogmaRepository.file(Query.ofJson(FILENAME)).get().join(); - assertEquals(defaultProperties().asText(), - prop.content().asText()); + JsonNode expected = defaultProperties(); + JsonNode actual = prop.content(); + assertEquals(expected.toString(), actual.toString(), + () -> "\nexpected: " + expected.toPrettyString() + + "\nactual: " + actual.toPrettyString()); } @Test @Timeout(10) - public void testCDRegisterNonExistentProject() { - assertThrows(RuntimeException.class, () -> { - CentralDogmaPropertySupplier.register(extension.client(), - "non-existent-project", REPOSITORY_NAME, FILENAME); - }); - } - - @Test - @Timeout(15) - public void testCDRegisterTimeout() { + public void testCDRegisterSuccessYaml() throws Exception { + String yamlFile = "/subscription.yaml"; CentralDogma client = extension.client(); client.createProject(PROJECT_NAME).join(); - CentralDogmaRepository centralDogmaRepository = spy(client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join()); + CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); - doReturn(CompletableFuture.completedFuture(new Revision(1))) - .when(centralDogmaRepository) - .normalize(any()); + CentralDogmaPropertySupplier.register(centralDogmaRepository, yamlFile); - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); + String actualText = centralDogmaRepository.file(Query.ofText(yamlFile)).get().join().content(); - assertThrows(RuntimeException.class, () -> { - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); - }); + ObjectMapper yaml = new ObjectMapper(new YAMLFactory()); + JsonNode actual = yaml.readTree(actualText); + JsonNode expected = defaultProperties(); + + assertEquals(expected.toString(), actual.toString(), + () -> "\nexpected: " + expected.toPrettyString() + + "\nactual: " + actual.toPrettyString()); + + assertFalse(actualText.startsWith("---"), "YAML should not include doc start marker"); + assertFalse(actualText.trim().startsWith("{"), "YAML should not be JSON text"); } @Test @Timeout(15) - public void testCDRegisterConflict() throws Exception { + public void testCDRegisterConflictJson() throws Exception { + final String FILENAME = "/subscription.json"; CountDownLatch userAIsRunning = new CountDownLatch(1); CountDownLatch userBIsRunning = new CountDownLatch(1); @@ -246,4 +400,163 @@ public void testCDRegisterConflict() throws Exception { assertEquals(userBPush, prop.content()); } + + @Test + @Timeout(15) + void testCDRegisterConflictYaml() throws Exception { + final String FILE = "/subscription.yaml"; + CountDownLatch userAIsRunning = new CountDownLatch(1); + CountDownLatch userBIsRunning = new CountDownLatch(1); + + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + + CentralDogmaRepository userB = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + CentralDogmaRepository userA = spy(client.forRepo(PROJECT_NAME, REPOSITORY_NAME)); + + final String userBYaml = + "# pushed by user‑B (should win the race)\n" + + "foo: bar\n"; + + JsonNode userBYamlAsJsonNode = Jackson.readTree("{\"foo\":\"bar\"}"); + + String defaultYaml = new ObjectMapper(new YAMLFactory().disable(WRITE_DOC_START_MARKER)) + .writeValueAsString(defaultProperties()); + + doAnswer(inv -> { + userAIsRunning.countDown(); + userBIsRunning.await(); + return inv.callRealMethod(); + }).when(userA) + .commit(any(), eq(Change.ofTextUpsert(FILE, defaultYaml))); + + ExecutorService svc = Executors.newFixedThreadPool(2); + svc.submit(() -> CentralDogmaPropertySupplier.register(userA, FILE)); + svc.submit(() -> { + try { + userAIsRunning.await(); + userB.commit("userB‑push", Change.ofTextUpsert(FILE, userBYaml)) + .push().join(); + userBIsRunning.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }); + + svc.shutdown(); + svc.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + + Entry entry = userA.file(Query.ofText(FILE)).get().join(); + + assertEquals(userBYaml, entry.content()); + + JsonNode actual = new ObjectMapper(new YAMLFactory()) + .readTree(entry.content()); + assertEquals(userBYamlAsJsonNode, actual); + } + + + interface FormatCase { + String file(); + + Change upsert(String body); + + String emptyBody(); + } + + private static final FormatCase JSON = new FormatCase() { + public String file() { + return "/subscription.json"; + } + + public Change upsert(String body) { + return Change.ofJsonUpsert(file(), body); + } + + public String emptyBody() { + return "{}"; + } + + @Override + public String toString() { + return "JSON"; + } + }; + + private static final FormatCase YAML = new FormatCase() { + public String file() { + return "/subscription.yaml"; + } + + public Change upsert(String body) { + return Change.ofTextUpsert(file(), body); + } + + public String emptyBody() { + return ""; + } + + @Override + public String toString() { + return "YAML"; + } + }; + + static Stream formats() { + return Stream.of(JSON, YAML); + } + + @ParameterizedTest() + @MethodSource("formats") + @Timeout(15) + void testCDRegisterTimeout(FormatCase testCase) { + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository centralDogmaRepository = spy(client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join()); + + doReturn(CompletableFuture.completedFuture(new Revision(1))) + .when(centralDogmaRepository) + .normalize(any()); + + CentralDogmaPropertySupplier.register(centralDogmaRepository, testCase.file()); + + assertThrows(RuntimeException.class, () -> { + CentralDogmaPropertySupplier.register(centralDogmaRepository, testCase.file()); + }); + } + + @ParameterizedTest() + @MethodSource("formats") + void testCDRegisterNonExistentProject(FormatCase testCase) { + assertThrows(RuntimeException.class, () -> { + CentralDogmaPropertySupplier.register(extension.client(), + "non-existent-project", REPOSITORY_NAME, testCase.file()); + }); + } + + @ParameterizedTest() + @MethodSource("formats") + void testFileExist(FormatCase testCase) { + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + + centralDogmaRepository + .commit("test", testCase.upsert(testCase.emptyBody())) + .push() + .join(); + assertTrue(CentralDogmaPropertySupplier + .fileExists(centralDogmaRepository, testCase.file(), Revision.HEAD)); + } + + @ParameterizedTest() + @MethodSource("formats") + void testFileNonExistent(FormatCase testCase) { + CentralDogma client = extension.client(); + client.createProject(PROJECT_NAME).join(); + CentralDogmaRepository centralDogmaRepository = client.createRepository(PROJECT_NAME, REPOSITORY_NAME).join(); + assertFalse(CentralDogmaPropertySupplier + .fileExists(centralDogmaRepository, testCase.file(), Revision.HEAD)); + } } diff --git a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java index d3573416..f2074266 100644 --- a/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java +++ b/centraldogma/src/test/java/com/linecorp/decaton/centraldogma/CentralDogmaPropertySupplierTest.java @@ -16,6 +16,7 @@ package com.linecorp.decaton.centraldogma; +import static com.fasterxml.jackson.dataformat.yaml.YAMLGenerator.Feature.WRITE_DOC_START_MARKER; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertSame; @@ -23,6 +24,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -33,12 +35,13 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -47,6 +50,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.linecorp.centraldogma.client.CentralDogmaRepository; import com.linecorp.centraldogma.client.CommitRequest; @@ -69,8 +73,10 @@ @MockitoSettings(strictness = Strictness.STRICT_STUBS) public class CentralDogmaPropertySupplierTest { private final ObjectMapper objectMapper = new ObjectMapper(); - - private static final String FILENAME = "/subscription.json"; + private static final ObjectMapper YAML_MAPPER = new ObjectMapper( + new YAMLFactory() + .disable(WRITE_DOC_START_MARKER) + ); private static final PropertyDefinition LONG_PROPERTY = PropertyDefinition.define("num.property", Long.class, 0L, @@ -89,37 +95,44 @@ public class CentralDogmaPropertySupplierTest { @Mock Watcher rootWatcher; - private CentralDogmaPropertySupplier supplier; + private static Stream fileParams() { + return Stream.of( + Arguments.of("/subscription.json"), + Arguments.of("/subscription.yaml") + ); + } - @BeforeEach - public void setUp() { - when(centralDogmaRepository.watcher(Query.ofJsonPath(FILENAME))).thenReturn(watcherRequest); + @SuppressWarnings("unchecked") + private CentralDogmaPropertySupplier setup(String fileName) { + when(centralDogmaRepository.watcher(any(Query.class))) + .thenReturn(watcherRequest); + + // yaml mode + if (fileName.endsWith(".yaml")) { + doReturn(watcherRequest) + .when(watcherRequest) + .map(any()); + } when(watcherRequest.start()).thenReturn(rootWatcher); - supplier = new CentralDogmaPropertySupplier(centralDogmaRepository, FILENAME); + return new CentralDogmaPropertySupplier(centralDogmaRepository, fileName); } - @Test - @SuppressWarnings("unchecked") - public void testWatcherSetup() { + @ParameterizedTest() + @MethodSource("fileParams") + public void testWatcherSetup(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + when(rootWatcher.latestValue()).thenReturn( objectMapper.createObjectNode().put(LONG_PROPERTY.name(), 123L)); - Watcher longPropertyWatcher = mock(Watcher.class); - Watcher listPropertyWatcher = mock(Watcher.class); - - when(rootWatcher.newChild((Query) any())) - .thenReturn(longPropertyWatcher) - .thenReturn(listPropertyWatcher) - .thenReturn(null); - assertTrue(supplier.getProperty(LONG_PROPERTY).isPresent()); - - verify(rootWatcher).newChild(any()); - verify(longPropertyWatcher).watch(any(Consumer.class)); } - @Test - public void testConvertValue() { + @ParameterizedTest() + @MethodSource("fileParams") + public void testConvertValue(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + JsonNodeFactory factory = objectMapper.getNodeFactory(); Object convertedLong = supplier.convertNodeToValue( @@ -133,8 +146,11 @@ public void testConvertValue() { assertEquals(Arrays.asList("foo", "bar"), convertedList); } - @Test - public void testSetValue() { + @ParameterizedTest() + @MethodSource("fileParams") + public void testSetValue(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + JsonNodeFactory factory = objectMapper.getNodeFactory(); DynamicProperty prop = spy(new DynamicProperty<>(LONG_PROPERTY)); @@ -142,8 +158,10 @@ public void testSetValue() { verify(prop).checkingSet(10L); } - @Test - public void testSetNullValue() { + @ParameterizedTest() + @MethodSource("fileParams") + public void testSetNullValue(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); JsonNodeFactory factory = objectMapper.getNodeFactory(); DynamicProperty prop = spy(new DynamicProperty<>(LONG_PROPERTY)); @@ -151,16 +169,30 @@ public void testSetNullValue() { verify(prop).checkingSet(LONG_PROPERTY.defaultValue()); } - @Test - public void testGetPropertyAbsentName() { + @ParameterizedTest() + @MethodSource("fileParams") + public void testGetPropertyAbsentName(String fileName) { + CentralDogmaPropertySupplier supplier = setup(fileName); + when(rootWatcher.latestValue()).thenReturn(objectMapper.createObjectNode()); PropertyDefinition missingProperty = PropertyDefinition.define("absent.value", Long.class); assertFalse(supplier.getProperty(missingProperty).isPresent()); } - @Test - public void testRegisterWithDefaultSettings() { + private Change expectedChange(String fileName, JsonNode node) throws Exception { + if (fileName.endsWith(".yaml") || fileName.endsWith(".yml")) { + return Change.ofTextUpsert(fileName, YAML_MAPPER.writeValueAsString(node)); + } else { + return Change.ofJsonUpsert(fileName, node); + } + } + + @ParameterizedTest() + @MethodSource("fileParams") + public void testRegisterWithDefaultSettings(String fileName) throws Exception { + setup(fileName); + when(centralDogmaRepository.normalize(Revision.HEAD)) .thenReturn(CompletableFuture.completedFuture(Revision.HEAD)); @@ -169,19 +201,23 @@ public void testRegisterWithDefaultSettings() { when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap())); final CommitRequest commitRequest = mock(CommitRequest.class); - when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode())))).thenReturn(commitRequest); + final Change upsert = expectedChange(fileName, defaultPropertiesAsJsonNode()); + when(centralDogmaRepository.commit(anyString(), eq(upsert))).thenReturn(commitRequest); when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, 1))); - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME); + CentralDogmaPropertySupplier.register(centralDogmaRepository, fileName); verify(centralDogmaRepository).commit( anyString(), - eq(Change.ofJsonUpsert(FILENAME, defaultPropertiesAsJsonNode())) + eq(upsert) ); } - @Test - public void testRegisterWithCustomizedSettings() { + @ParameterizedTest() + @MethodSource("fileParams") + public void testRegisterWithCustomizedSettings(String fileName) throws Exception { + setup(fileName); + final int settingForPartitionConcurrency = 188; final int settingForMaxPendingRecords = 121212; final int whenCentralDogmaPushed = 111111; @@ -219,15 +255,16 @@ public void testRegisterWithCustomizedSettings() { when(centralDogmaRepository.file(any(PathPattern.class))).thenReturn(filesRequest); when(filesRequest.list(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(Collections.emptyMap())); + final Change upsert = expectedChange(fileName, jsonNodeProperties); final CommitRequest commitRequest = mock(CommitRequest.class); - when(centralDogmaRepository.commit(anyString(), eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)))).thenReturn(commitRequest); + when(centralDogmaRepository.commit(anyString(), eq(upsert))).thenReturn(commitRequest); when(commitRequest.push(Revision.HEAD)).thenReturn(CompletableFuture.completedFuture(new PushResult(Revision.HEAD, whenCentralDogmaPushed))); - CentralDogmaPropertySupplier.register(centralDogmaRepository, FILENAME, supplier); + CentralDogmaPropertySupplier.register(centralDogmaRepository, fileName, supplier); verify(centralDogmaRepository).commit( anyString(), - eq(Change.ofJsonUpsert(FILENAME, jsonNodeProperties)) + eq(upsert) ); } diff --git a/docs/dynamic-property-configuration.adoc b/docs/dynamic-property-configuration.adoc index 949af88c..5e795742 100644 --- a/docs/dynamic-property-configuration.adoc +++ b/docs/dynamic-property-configuration.adoc @@ -1,6 +1,7 @@ = Dynamic Property Configuration :base_version: 9.0.0 :modules: centraldogma,processor +:toc: == Property Supplier Decaton provides some properties for you to configure how it processes tasks. These properties don't need to be hard-coded. Decaton lets you configure some of the properties so they can be loaded dynamically. @@ -79,6 +80,51 @@ public class CentralDogmaSupplierMain { } ---- +== Use YAML instead of JSON + +From decaton v9.4.0, you can store the property file in YAML as well as JSON. +Nothing changes in your code except the file‐name extension. + +Note you cannot use YAML's tag, anchor, or alias features. Just you can add comment in the file. + +[source,java] +---- +CentralDogmaPropertySupplier supplier = + CentralDogmaPropertySupplier.register( + centralDogma, + "project", + "repository", + "/properties.yaml"); // <1> +---- +<1> Use `.yaml` (or `.yml`) instead of `.json`. +All other APIs and behaviours remain exactly the same. + +You may keep the flat, dot‑separated keys. + +[source,yaml] +---- +# You should use the flat, dot-separated keys like JSON. +decaton.partition.concurrency: 8 +decaton.processing.rate.per.partition: 50 +---- + +As with JSON, you cannot use nested structures in YAML. +Therefore, the following is NOT allowed: + +[source,yaml] +---- +# This style is not supported. +decaton: + partition: + concurrency: 8 + processing: + rate: + per: + partition: 50 +---- + +Comments (`# like this`) are allowed and ignored by decaton. + == Multiple Property Suppliers You can specify multiple property suppliers including one provides hardcoded properties. @@ -96,7 +142,7 @@ Hence you should put supplier with higher priority before others. ---- -== JSON Schema +== Validate your configuration file with JSON Schema Decaton ships a set of https://json-schema.org/[JSON Schema] files that precisely describe every key available in `CentralDogmaPropertySupplier` including each key’s type and default value. Leveraging these schemas in your configuration files gives you two immediate benefits: @@ -110,6 +156,7 @@ Leveraging these schemas in your configuration files gives you two immediate ben [source,json] +.decaton-processor-dynamic-configuration.json ---- { "$schema": "https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json", @@ -118,7 +165,19 @@ Leveraging these schemas in your configuration files gives you two immediate ben ... } ---- -For example, you can use JSON Schema by adding a `$schema` directive at the top of the file as shown above.follows. + +[source,yaml] +.decaton-processor-dynamic-configuration.yaml +---- +# $schema: https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json +# yaml-language-server: $schema=https://raw.githubusercontent.com/line/decaton/vX.Y.Z/centraldogma/src/jsonschema/dist/decaton-processor-properties-central-dogma-schema-draft_7.json +decaton.partition.concurrency: 10000 +decaton.processing.rate.per.partition: -1 +... +---- + + +For example, you can use JSON Schema by adding a `$schema` directive at the top of the file as shown above follows. Of course, there may be other ways to use it. Replace `vX.Y.Z` with the exact Decaton version your application depends on. If you prefer living at HEAD, you can also reference `master`, but pinning to a release tag guarantees repeatable builds.