From f868c6d3b2323b905f9e5b7c68f7ba4ba1c8a39c Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Wed, 3 Sep 2025 23:11:28 +0900 Subject: [PATCH 1/5] Allow build processors using kafka-clients Deserializer --- .../decaton/processor/RetryQueueingTest.java | 47 +++++++++++++++++++ .../processor/runtime/ConsumedRecord.java | 5 ++ .../runtime/ProcessorSubscription.java | 1 + .../processor/runtime/ProcessorsBuilder.java | 27 +++++++++-- .../internal/DefaultTaskExtractor.java | 9 ++-- .../runtime/internal/ProcessPipeline.java | 1 + .../runtime/internal/Processors.java | 19 +++++++- .../internal/DefaultTaskExtractorTest.java | 23 +++++++-- .../runtime/internal/ProcessorsTest.java | 4 +- .../testing/processor/ProcessorTestSuite.java | 21 +++++++-- 10 files changed, 139 insertions(+), 18 deletions(-) diff --git a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java index d34aa9cb..8265b890 100644 --- a/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java +++ b/processor/src/it/java/com/linecorp/decaton/processor/RetryQueueingTest.java @@ -17,6 +17,7 @@ package com.linecorp.decaton.processor; import java.time.Duration; +import java.util.Base64; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -26,6 +27,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -297,4 +301,47 @@ public void testRetryQueueingMigrateToHeader() throws Exception { .build() .run(); } + + // This test verifies that Decaton works properly when + // transformation is applied to value bytes on kafka-clients serializer/serializer layer + // even with retry enabled. + @Test + @Timeout(30) + public void testConsumingWithKafkaDeserializer() throws Exception { + ProcessorTestSuite + .builder(rule) + .numTasks(1000) + .producerSupplier(bootstrapServers -> TestUtils.producer( + bootstrapServers, + new ByteArraySerializer(), + (topic, bytes) -> Base64.getEncoder().encode(bytes))) + .customDeserializer(new Deserializer() { + @Override + public TestTask deserialize(String topic, byte[] data) { + throw new UnsupportedOperationException(); + } + + @Override + public TestTask deserialize(String topic, Headers headers, byte[] data) { + return new TestTask.TestTaskDeserializer() + .deserialize(Base64.getDecoder().decode(data)); + } + }) + .configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> { + if (ctx.metadata().retryCount() == 0) { + ctx.retry(); + } + })) + .retryConfig(RetryConfig.builder() + .retryTopic(retryTopic) + .backoff(Duration.ofMillis(10)) + .build()) + // If we retry tasks, there's no guarantee about ordering nor serial processing + .excludeSemantics( + GuaranteeType.PROCESS_ORDERING, + GuaranteeType.SERIAL_PROCESSING) + .customSemantics(new ProcessRetriedTask()) + .build() + .run(); + } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java index 83a5a7e3..a09c02f6 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ConsumedRecord.java @@ -29,6 +29,11 @@ @Builder @Accessors(fluent = true) public class ConsumedRecord { + /** + * The topic the record is consumed from + */ + String topic; + /** * The timestamp of the record */ diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java index e89fd342..dd0be214 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java @@ -284,6 +284,7 @@ private void cleanUp() { consumeManager.close(); quotaApplier.close(); metrics.close(); + processors.close(); updateState(SubscriptionStateListener.State.TERMINATED); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index fd7af07c..e3574686 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.function.Supplier; +import com.linecorp.decaton.client.DecatonClient; import com.linecorp.decaton.common.Deserializer; import com.linecorp.decaton.processor.DecatonProcessor; import com.linecorp.decaton.processor.TaskMetadata; @@ -39,12 +40,12 @@ public class ProcessorsBuilder { @Getter private final String topic; - private final Deserializer userSuppliedDeserializer; + private final org.apache.kafka.common.serialization.Deserializer userSuppliedDeserializer; private final TaskExtractor userSuppliedTaskExtractor; private final List> suppliers; - ProcessorsBuilder(String topic, Deserializer userSuppliedDeserializer, TaskExtractor userSuppliedTaskExtractor) { + ProcessorsBuilder(String topic, org.apache.kafka.common.serialization.Deserializer userSuppliedDeserializer, TaskExtractor userSuppliedTaskExtractor) { this.topic = topic; this.userSuppliedDeserializer = userSuppliedDeserializer; this.userSuppliedTaskExtractor = userSuppliedTaskExtractor; @@ -66,12 +67,29 @@ public class ProcessorsBuilder { * @return an instance of {@link ProcessorsBuilder}. */ public static ProcessorsBuilder consuming(String topic, Deserializer deserializer) { + return new ProcessorsBuilder<>(topic, (t, bytes) -> deserializer.deserialize(bytes), null); + } + + /** + * Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type + * which can be parsed by deserializer. + *

+ * If you want to extract custom {@link TaskMetadata} (e.g. for delayed processing), you can use + * {@link #consuming(String, TaskExtractor)} instead. + * @param topic the name of topic to consume. + * @param deserializer the deserializer to instantiate task of type {@link T} from serialized bytes. + * @param the type of instantiated tasks. + * @return an instance of {@link ProcessorsBuilder}. + */ + public static ProcessorsBuilder consuming(String topic, org.apache.kafka.common.serialization.Deserializer deserializer) { return new ProcessorsBuilder<>(topic, deserializer, null); } /** * Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type * which can be parsed by taskExtractor. + * When this overload is used, it is user's responsibility to extract {@link TaskMetadata} from record + * even for ones produced by {@link DecatonClient}. * @param topic the name of topic to consume. * @param taskExtractor the extractor to extract task of type {@link T} from message bytes. * @param the type of instantiated tasks. @@ -138,7 +156,7 @@ Processors build(DecatonProcessorSupplier retryProcessorSupplier, Pro retryTaskExtractor = new RetryTaskExtractor<>(legacyFallbackEnabledProperty, userSuppliedTaskExtractor); } - return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); + return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor, userSuppliedDeserializer); } private static class RetryTaskExtractor implements TaskExtractor { @@ -148,7 +166,7 @@ private static class RetryTaskExtractor implements TaskExtractor { RetryTaskExtractor(Property legacyFallbackEnabledProperty, TaskExtractor innerExtractor) { this.innerExtractor = innerExtractor; - this.outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes, legacyFallbackEnabledProperty); + outerExtractor = new DefaultTaskExtractor<>((topic, bytes) -> bytes, legacyFallbackEnabledProperty); } @Override @@ -160,6 +178,7 @@ public DecatonTask extract(ConsumedRecord record) { DecatonTask outerTask = outerExtractor.extract(record); ConsumedRecord inner = ConsumedRecord .builder() + .topic(record.topic()) .recordTimestampMillis(record.recordTimestampMillis()) .headers(record.headers()) .key(record.key()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java index 139034cd..8f580b57 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java @@ -16,9 +16,10 @@ package com.linecorp.decaton.processor.runtime.internal; +import org.apache.kafka.common.serialization.Deserializer; + import com.google.protobuf.InvalidProtocolBufferException; -import com.linecorp.decaton.common.Deserializer; import com.linecorp.decaton.client.internal.TaskMetadataUtil; import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; @@ -42,7 +43,7 @@ public DecatonTask extract(ConsumedRecord record) { byte[] taskDataBytes = record.value(); return new DecatonTask<>( TaskMetadata.fromProto(headerMeta), - taskDeserializer.deserialize(taskDataBytes), + taskDeserializer.deserialize(record.topic(), record.headers(), taskDataBytes), taskDataBytes); } else { // There are two cases where task metadata header is missing: @@ -59,13 +60,13 @@ public DecatonTask extract(ConsumedRecord record) { return new DecatonTask<>( metadata, - taskDeserializer.deserialize(taskDataBytes), + taskDeserializer.deserialize(record.topic(), record.headers(), taskDataBytes), taskDataBytes); } catch (InvalidProtocolBufferException e) { throw new IllegalArgumentException(e); } } else { - T task = taskDeserializer.deserialize(record.value()); + T task = taskDeserializer.deserialize(record.topic(), record.headers(), record.value()); return new DecatonTask<>( TaskMetadata.builder() .timestampMillis(record.recordTimestampMillis()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java index 090c48d3..b40d7355 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessPipeline.java @@ -127,6 +127,7 @@ DecatonTask extract(TaskRequest request) { final DecatonTask extracted; extracted = taskExtractor.extract( ConsumedRecord.builder() + .topic(request.topicPartition().topic()) .recordTimestampMillis(request.recordTimestamp()) .headers(request.headers()) .key(request.key()) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java index 982d2e8f..fe8fd543 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,22 +30,25 @@ import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier; import com.linecorp.decaton.processor.runtime.TaskExtractor; -public class Processors { +public class Processors implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(Processors.class); private final List> suppliers; private final DecatonProcessorSupplier retryProcessorSupplier; private final TaskExtractor taskExtractor; private final TaskExtractor retryTaskExtractor; + private final Deserializer userSuppliedDeserializer; public Processors(List> suppliers, DecatonProcessorSupplier retryProcessorSupplier, TaskExtractor taskExtractor, - TaskExtractor retryTaskExtractor) { + TaskExtractor retryTaskExtractor, + Deserializer userSuppliedDeserializer) { this.suppliers = Collections.unmodifiableList(suppliers); this.retryProcessorSupplier = retryProcessorSupplier; this.taskExtractor = taskExtractor; this.retryTaskExtractor = retryTaskExtractor; + this.userSuppliedDeserializer = userSuppliedDeserializer; } private DecatonProcessor retryProcessor(ThreadScope scope) { @@ -125,4 +129,15 @@ public void destroyThreadScope(String subscriptionId, TopicPartition tp, int thr retryProcessorSupplier.leaveThreadScope(subscriptionId, tp, threadId); } } + + @Override + public void close() { + if (userSuppliedDeserializer != null) { + try { + userSuppliedDeserializer.close(); + } catch (Exception e) { + logger.warn("User supplied deserializer threw exception while closing", e); + } + } + } } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java index 3e90c517..edc3d1d2 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractorTest.java @@ -19,15 +19,20 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.UncheckedIOException; + import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.Deserializer; import org.junit.jupiter.api.Test; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Parser; + import com.linecorp.decaton.processor.TaskMetadata; import com.linecorp.decaton.processor.runtime.ConsumedRecord; import com.linecorp.decaton.processor.runtime.DecatonTask; import com.linecorp.decaton.processor.runtime.ProcessorProperties; import com.linecorp.decaton.processor.runtime.Property; -import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer; import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest; import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto; import com.linecorp.decaton.protocol.Sample.HelloTask; @@ -43,11 +48,12 @@ public class DefaultTaskExtractorTest { @Test public void testExtract() { DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( - new ProtocolBuffersDeserializer<>(HelloTask.parser()), + deserializer(HelloTask.parser()), Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true)); ConsumedRecord record = ConsumedRecord .builder() + .topic("topic") .recordTimestampMillis(1561709151628L) .headers(new RecordHeaders()) .value(LEGACY_REQUEST.toByteArray()) @@ -64,11 +70,12 @@ public void testExtract() { @Test public void testExtractBypassLegacyFormatWhenHeaderMissing() { DefaultTaskExtractor extractor = new DefaultTaskExtractor<>( - new ProtocolBuffersDeserializer<>(HelloTask.parser()), + deserializer(HelloTask.parser()), Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, false)); ConsumedRecord record = ConsumedRecord .builder() + .topic("topic") .recordTimestampMillis(1561709151628L) .headers(new RecordHeaders()) .value(TASK.toByteArray()) @@ -84,4 +91,14 @@ public void testExtractBypassLegacyFormatWhenHeaderMissing() { assertArrayEquals(TASK.toByteArray(), extracted.taskDataBytes()); } + + private static Deserializer deserializer(Parser parser) { + return (topic, data) -> { + try { + return parser.parseFrom(data); + } catch (InvalidProtocolBufferException e) { + throw new UncheckedIOException(e); + } + }; + } } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java index f97adcc0..7cb5249e 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java @@ -70,9 +70,9 @@ public void testCleanupPartiallyInitializedProcessors() { Processors processors = new Processors<>( suppliers, null, - new DefaultTaskExtractor<>(bytes -> HelloTask.getDefaultInstance(), + new DefaultTaskExtractor<>((topic, bytes) -> HelloTask.getDefaultInstance(), Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED)), - null); + null, null); doThrow(new RuntimeException("exception")).when(suppliers.get(2)).getProcessor(any(), any(), anyInt()); diff --git a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java index d5e3aa79..9f1bc7ba 100644 --- a/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java +++ b/testing/src/main/java/com/linecorp/decaton/testing/processor/ProcessorTestSuite.java @@ -48,6 +48,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; import com.google.protobuf.ByteString; @@ -112,6 +113,7 @@ public class ProcessorTestSuite { private final TracingProvider tracingProvider; private final Function> producerSupplier; private final TaskExtractor customTaskExtractor; + private final Deserializer customDeserializer; private final boolean produceTasksInLegacyFormat; private static final int DEFAULT_NUM_TASKS = 10000; @@ -184,8 +186,14 @@ public static class Builder { private Function> producerSupplier = TestUtils::producer; /** * Supply custom {@link TaskExtractor} to be used to extract a task. + * Exclusive with {@link #customDeserializer} */ private TaskExtractor customTaskExtractor; + /** + * Supply custom {@link Deserializer} to be used to deserialize a task. + * Exclusive with {@link #customTaskExtractor} + */ + private Deserializer customDeserializer; /** * Specify whether to produce tasks in legacy DecatonTaskRequest format instead of header metadata */ @@ -215,6 +223,10 @@ private Builder(KafkaClusterExtension rule) { } public ProcessorTestSuite build() { + if (customDeserializer != null && customTaskExtractor != null) { + throw new IllegalArgumentException("customDeserializer and customTaskExtractor are exclusive"); + } + Set semantics = new HashSet<>(); for (GuaranteeType guaranteeType : defaultSemantics) { semantics.add(guaranteeType.get()); @@ -238,6 +250,7 @@ public ProcessorTestSuite build() { tracingProvider, producerSupplier, customTaskExtractor, + customDeserializer, produceTasksInLegacyFormat); } } @@ -318,10 +331,12 @@ private ProcessorSubscription newSubscription( }; final ProcessorsBuilder sourceBuilder; - if (customTaskExtractor == null) { - sourceBuilder = ProcessorsBuilder.consuming(topic, new TestTaskDeserializer()); - } else { + if (customTaskExtractor != null) { sourceBuilder = ProcessorsBuilder.consuming(topic, customTaskExtractor); + } else if (customDeserializer != null) { + sourceBuilder = ProcessorsBuilder.consuming(topic, customDeserializer); + } else { + sourceBuilder = ProcessorsBuilder.consuming(topic, new TestTaskDeserializer()); } ProcessorsBuilder processorsBuilder = configureProcessorsBuilder.apply(sourceBuilder.thenProcess(preprocessor)); From 0e639e7e29ee84ac8b2ffeda9f88269231525f0a Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Thu, 4 Sep 2025 14:34:12 +0900 Subject: [PATCH 2/5] fix --- .../runtime/ProcessorSubscription.java | 15 +++++++++-- .../processor/runtime/ProcessorsBuilder.java | 4 ++- .../runtime/SubscriptionBuilder.java | 1 + .../runtime/internal/Processors.java | 19 ++----------- .../runtime/ProcessorSubscriptionTest.java | 27 +++++++++++-------- .../runtime/internal/ProcessorsTest.java | 2 +- 6 files changed, 36 insertions(+), 32 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java index dd0be214..a2b7c7f2 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java @@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.Deserializer; import com.linecorp.decaton.processor.metrics.Metrics; import com.linecorp.decaton.processor.metrics.Metrics.SubscriptionMetrics; @@ -67,6 +68,7 @@ public class ProcessorSubscription extends Thread implements AsyncClosable { private final AssignmentManager assignManager; private final ConsumeManager consumeManager; private final QuotaApplier quotaApplier; + private final Deserializer userSuppliedDeserializer; private final CompletableFuture loopTerminateFuture; private volatile boolean started; private volatile boolean terminated; @@ -137,6 +139,7 @@ public void receive(ConsumerRecord record) { ProcessorSubscription(SubscriptionScope scope, Consumer consumer, QuotaApplier quotaApplier, + Deserializer userSuppliedDeserializer, Processors processors, ProcessorProperties props, SubscriptionStateListener stateListener, @@ -146,6 +149,7 @@ public void receive(ConsumerRecord record) { this.stateListener = stateListener; this.contexts = contexts; this.quotaApplier = quotaApplier; + this.userSuppliedDeserializer = userSuppliedDeserializer; metrics = Metrics.withTags("subscription", scope.subscriptionId()).new SubscriptionMetrics(); if (props.get(CONFIG_BIND_CLIENT_METRICS).value()) { @@ -166,10 +170,11 @@ public void receive(ConsumerRecord record) { public ProcessorSubscription(SubscriptionScope scope, Consumer consumer, QuotaApplier quotaApplier, + Deserializer userSuppliedDeserializer, Processors processors, ProcessorProperties props, SubscriptionStateListener stateListener) { - this(scope, consumer, quotaApplier, processors, props, stateListener, + this(scope, consumer, quotaApplier, userSuppliedDeserializer, processors, props, stateListener, new PartitionContexts(scope, processors)); } @@ -284,7 +289,13 @@ private void cleanUp() { consumeManager.close(); quotaApplier.close(); metrics.close(); - processors.close(); + if (userSuppliedDeserializer != null) { + try { + userSuppliedDeserializer.close(); + } catch (Exception e) { + log.warn("User supplied deserializer threw exception while closing", e); + } + } updateState(SubscriptionStateListener.State.TERMINATED); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index e3574686..7e61a719 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -28,6 +28,7 @@ import com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor; import com.linecorp.decaton.processor.runtime.internal.Processors; +import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; @@ -40,6 +41,7 @@ public class ProcessorsBuilder { @Getter private final String topic; + @Getter(AccessLevel.PACKAGE) private final org.apache.kafka.common.serialization.Deserializer userSuppliedDeserializer; private final TaskExtractor userSuppliedTaskExtractor; @@ -156,7 +158,7 @@ Processors build(DecatonProcessorSupplier retryProcessorSupplier, Pro retryTaskExtractor = new RetryTaskExtractor<>(legacyFallbackEnabledProperty, userSuppliedTaskExtractor); } - return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor, userSuppliedDeserializer); + return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); } private static class RetryTaskExtractor implements TaskExtractor { diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java index 81af148b..14975485 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java @@ -263,6 +263,7 @@ public ProcessorSubscription build() { return new ProcessorSubscription(scope, consumerSupplier.get(), quotaApplier(scope), + processorsBuilder.userSuppliedDeserializer(), processorsBuilder.build(maybeRetryProcessorSupplier(scope), props), props, stateListener); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java index fe8fd543..982d2e8f 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.Deserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,25 +29,22 @@ import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier; import com.linecorp.decaton.processor.runtime.TaskExtractor; -public class Processors implements AutoCloseable { +public class Processors { private static final Logger logger = LoggerFactory.getLogger(Processors.class); private final List> suppliers; private final DecatonProcessorSupplier retryProcessorSupplier; private final TaskExtractor taskExtractor; private final TaskExtractor retryTaskExtractor; - private final Deserializer userSuppliedDeserializer; public Processors(List> suppliers, DecatonProcessorSupplier retryProcessorSupplier, TaskExtractor taskExtractor, - TaskExtractor retryTaskExtractor, - Deserializer userSuppliedDeserializer) { + TaskExtractor retryTaskExtractor) { this.suppliers = Collections.unmodifiableList(suppliers); this.retryProcessorSupplier = retryProcessorSupplier; this.taskExtractor = taskExtractor; this.retryTaskExtractor = retryTaskExtractor; - this.userSuppliedDeserializer = userSuppliedDeserializer; } private DecatonProcessor retryProcessor(ThreadScope scope) { @@ -129,15 +125,4 @@ public void destroyThreadScope(String subscriptionId, TopicPartition tp, int thr retryProcessorSupplier.leaveThreadScope(subscriptionId, tp, threadId); } } - - @Override - public void close() { - if (userSuppliedDeserializer != null) { - try { - userSuppliedDeserializer.close(); - } catch (Exception e) { - logger.warn("User supplied deserializer threw exception while closing", e); - } - } - } } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index 04076617..055fd234 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -149,6 +149,7 @@ private static ProcessorSubscription subscription(Consumer consu scope, consumer, NoopQuotaApplier.INSTANCE, + builder.userSuppliedDeserializer(), builder.build(null, scope.props()), scope.props(), listener); @@ -272,15 +273,17 @@ public synchronized ConsumerRecords poll(Duration timeout) { asyncProcessingStarted.countDown(); }; SubscriptionScope scope = scope(tp.topic(), 9000L); + ProcessorsBuilder processorsBuilder = + ProcessorsBuilder.consuming(scope.originTopic(), + (ConsumedRecord record) -> new DecatonTask<>( + TaskMetadata.builder().build(), "dummy", record.value())) + .thenProcess(processor); final ProcessorSubscription subscription = new ProcessorSubscription( scope, consumer, NoopQuotaApplier.INSTANCE, - ProcessorsBuilder.consuming(scope.originTopic(), - (ConsumedRecord record) -> new DecatonTask<>( - TaskMetadata.builder().build(), "dummy", record.value())) - .thenProcess(processor) - .build(null, scope.props()), + processorsBuilder.userSuppliedDeserializer(), + processorsBuilder.build(null, scope.props()), scope.props(), newState -> { if (newState == State.RUNNING) { @@ -346,18 +349,20 @@ public synchronized void commitSync(Map offse Property.ofStatic(ProcessorProperties.CONFIG_COMMIT_INTERVAL_MS, Long.MAX_VALUE), Property.ofStatic(ProcessorProperties.CONFIG_GROUP_REBALANCE_TIMEOUT_MS, Long.MAX_VALUE) )); - final ProcessorSubscription subscription = new ProcessorSubscription( - scope, - consumer, - NoopQuotaApplier.INSTANCE, + ProcessorsBuilder> processorsBuilder = ProcessorsBuilder.consuming(scope.originTopic(), (byte[] bytes) -> new DecatonTask<>( TaskMetadata.builder().build(), "dummy", bytes)) .thenProcess((ctx, task) -> { ctx.deferCompletion().complete(); taskCompleted.countDown(); - }) - .build(null, scope.props()), + }); + final ProcessorSubscription subscription = new ProcessorSubscription( + scope, + consumer, + NoopQuotaApplier.INSTANCE, + processorsBuilder.userSuppliedDeserializer(), + processorsBuilder.build(null, scope.props()), scope.props(), null); subscription.start(); diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java index 7cb5249e..8cdba897 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessorsTest.java @@ -72,7 +72,7 @@ public void testCleanupPartiallyInitializedProcessors() { suppliers, null, new DefaultTaskExtractor<>((topic, bytes) -> HelloTask.getDefaultInstance(), Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED)), - null, null); + null); doThrow(new RuntimeException("exception")).when(suppliers.get(2)).getProcessor(any(), any(), anyInt()); From f54d38ce1f4b21db5ed6153813a34d91b61d1e16 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Mon, 8 Sep 2025 21:58:37 +0900 Subject: [PATCH 3/5] fix --- .../runtime/ProcessorSubscription.java | 15 ++--------- .../processor/runtime/ProcessorsBuilder.java | 14 ++++++++-- .../runtime/SubscriptionBuilder.java | 1 - .../processor/runtime/TaskExtractor.java | 11 +++++++- .../internal/DefaultTaskExtractor.java | 5 ++++ .../runtime/internal/Processors.java | 19 ++++++++++++- .../runtime/ProcessorSubscriptionTest.java | 27 ++++++++----------- 7 files changed, 58 insertions(+), 34 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java index a2b7c7f2..dd0be214 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RebalanceInProgressException; import org.apache.kafka.common.errors.TimeoutException; -import org.apache.kafka.common.serialization.Deserializer; import com.linecorp.decaton.processor.metrics.Metrics; import com.linecorp.decaton.processor.metrics.Metrics.SubscriptionMetrics; @@ -68,7 +67,6 @@ public class ProcessorSubscription extends Thread implements AsyncClosable { private final AssignmentManager assignManager; private final ConsumeManager consumeManager; private final QuotaApplier quotaApplier; - private final Deserializer userSuppliedDeserializer; private final CompletableFuture loopTerminateFuture; private volatile boolean started; private volatile boolean terminated; @@ -139,7 +137,6 @@ public void receive(ConsumerRecord record) { ProcessorSubscription(SubscriptionScope scope, Consumer consumer, QuotaApplier quotaApplier, - Deserializer userSuppliedDeserializer, Processors processors, ProcessorProperties props, SubscriptionStateListener stateListener, @@ -149,7 +146,6 @@ public void receive(ConsumerRecord record) { this.stateListener = stateListener; this.contexts = contexts; this.quotaApplier = quotaApplier; - this.userSuppliedDeserializer = userSuppliedDeserializer; metrics = Metrics.withTags("subscription", scope.subscriptionId()).new SubscriptionMetrics(); if (props.get(CONFIG_BIND_CLIENT_METRICS).value()) { @@ -170,11 +166,10 @@ public void receive(ConsumerRecord record) { public ProcessorSubscription(SubscriptionScope scope, Consumer consumer, QuotaApplier quotaApplier, - Deserializer userSuppliedDeserializer, Processors processors, ProcessorProperties props, SubscriptionStateListener stateListener) { - this(scope, consumer, quotaApplier, userSuppliedDeserializer, processors, props, stateListener, + this(scope, consumer, quotaApplier, processors, props, stateListener, new PartitionContexts(scope, processors)); } @@ -289,13 +284,7 @@ private void cleanUp() { consumeManager.close(); quotaApplier.close(); metrics.close(); - if (userSuppliedDeserializer != null) { - try { - userSuppliedDeserializer.close(); - } catch (Exception e) { - log.warn("User supplied deserializer threw exception while closing", e); - } - } + processors.close(); updateState(SubscriptionStateListener.State.TERMINATED); } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java index 7e61a719..187a1f22 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorsBuilder.java @@ -28,9 +28,9 @@ import com.linecorp.decaton.processor.runtime.internal.DefaultTaskExtractor; import com.linecorp.decaton.processor.runtime.internal.Processors; -import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; /** * A class defines processing pipeline for {@link ProcessorSubscription}. @@ -41,7 +41,6 @@ public class ProcessorsBuilder { @Getter private final String topic; - @Getter(AccessLevel.PACKAGE) private final org.apache.kafka.common.serialization.Deserializer userSuppliedDeserializer; private final TaskExtractor userSuppliedTaskExtractor; @@ -161,6 +160,7 @@ Processors build(DecatonProcessorSupplier retryProcessorSupplier, Pro return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor); } + @Slf4j private static class RetryTaskExtractor implements TaskExtractor { private final DefaultTaskExtractor outerExtractor; private final TaskExtractor innerExtractor; @@ -193,5 +193,15 @@ public DecatonTask extract(ConsumedRecord record) { extracted.taskData(), extracted.taskDataBytes()); } + + @Override + public void close() { + try { + innerExtractor.close(); + } catch (Exception e) { + log.error("Failed to close innerExtractor", e); + } + outerExtractor.close(); + } } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java index 14975485..81af148b 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/SubscriptionBuilder.java @@ -263,7 +263,6 @@ public ProcessorSubscription build() { return new ProcessorSubscription(scope, consumerSupplier.get(), quotaApplier(scope), - processorsBuilder.userSuppliedDeserializer(), processorsBuilder.build(maybeRetryProcessorSupplier(scope), props), props, stateListener); diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java index 398acbe8..647c2b40 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/TaskExtractor.java @@ -20,7 +20,7 @@ * An interface for classes extracting {@link DecatonTask} from given record. * @param type of task. */ -public interface TaskExtractor { +public interface TaskExtractor extends AutoCloseable { /** * Extract object of type {@link DecatonTask} from given bytes. * @param record {@link ConsumedRecord} to extract task from. @@ -29,4 +29,13 @@ public interface TaskExtractor { * If the method throws an exception, the task will be discarded and processor continues to process subsequent tasks. */ DecatonTask extract(ConsumedRecord record); + + /** + * Clean up any resources associated with it. + * This method is invoked when {@link ProcessorSubscription} is being closed. + */ + @Override + default void close() { + // no-op + } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java index 8f580b57..87617f47 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/DefaultTaskExtractor.java @@ -76,4 +76,9 @@ public DecatonTask extract(ConsumedRecord record) { } } } + + @Override + public void close() { + taskDeserializer.close(); + } } diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java index 982d2e8f..ff63fe71 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java @@ -29,7 +29,10 @@ import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier; import com.linecorp.decaton.processor.runtime.TaskExtractor; -public class Processors { +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class Processors implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(Processors.class); private final List> suppliers; @@ -125,4 +128,18 @@ public void destroyThreadScope(String subscriptionId, TopicPartition tp, int thr retryProcessorSupplier.leaveThreadScope(subscriptionId, tp, threadId); } } + + @Override + public void close() { + closeExtractor(taskExtractor); + closeExtractor(retryTaskExtractor); + } + + private void closeExtractor(TaskExtractor extractor) { + try { + extractor.close(); + } catch (Exception e) { + log.error("Exception thrown while closing extractor", e); + } + } } diff --git a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java index 055fd234..04076617 100644 --- a/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java +++ b/processor/src/test/java/com/linecorp/decaton/processor/runtime/ProcessorSubscriptionTest.java @@ -149,7 +149,6 @@ private static ProcessorSubscription subscription(Consumer consu scope, consumer, NoopQuotaApplier.INSTANCE, - builder.userSuppliedDeserializer(), builder.build(null, scope.props()), scope.props(), listener); @@ -273,17 +272,15 @@ public synchronized ConsumerRecords poll(Duration timeout) { asyncProcessingStarted.countDown(); }; SubscriptionScope scope = scope(tp.topic(), 9000L); - ProcessorsBuilder processorsBuilder = - ProcessorsBuilder.consuming(scope.originTopic(), - (ConsumedRecord record) -> new DecatonTask<>( - TaskMetadata.builder().build(), "dummy", record.value())) - .thenProcess(processor); final ProcessorSubscription subscription = new ProcessorSubscription( scope, consumer, NoopQuotaApplier.INSTANCE, - processorsBuilder.userSuppliedDeserializer(), - processorsBuilder.build(null, scope.props()), + ProcessorsBuilder.consuming(scope.originTopic(), + (ConsumedRecord record) -> new DecatonTask<>( + TaskMetadata.builder().build(), "dummy", record.value())) + .thenProcess(processor) + .build(null, scope.props()), scope.props(), newState -> { if (newState == State.RUNNING) { @@ -349,20 +346,18 @@ public synchronized void commitSync(Map offse Property.ofStatic(ProcessorProperties.CONFIG_COMMIT_INTERVAL_MS, Long.MAX_VALUE), Property.ofStatic(ProcessorProperties.CONFIG_GROUP_REBALANCE_TIMEOUT_MS, Long.MAX_VALUE) )); - ProcessorsBuilder> processorsBuilder = + final ProcessorSubscription subscription = new ProcessorSubscription( + scope, + consumer, + NoopQuotaApplier.INSTANCE, ProcessorsBuilder.consuming(scope.originTopic(), (byte[] bytes) -> new DecatonTask<>( TaskMetadata.builder().build(), "dummy", bytes)) .thenProcess((ctx, task) -> { ctx.deferCompletion().complete(); taskCompleted.countDown(); - }); - final ProcessorSubscription subscription = new ProcessorSubscription( - scope, - consumer, - NoopQuotaApplier.INSTANCE, - processorsBuilder.userSuppliedDeserializer(), - processorsBuilder.build(null, scope.props()), + }) + .build(null, scope.props()), scope.props(), null); subscription.start(); From 1ea3042651abdb5b9d20e4d42c6362b67db60207 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Mon, 8 Sep 2025 22:35:14 +0900 Subject: [PATCH 4/5] fix --- .../decaton/processor/runtime/internal/Processors.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java index ff63fe71..0637ed50 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/Processors.java @@ -137,7 +137,9 @@ public void close() { private void closeExtractor(TaskExtractor extractor) { try { - extractor.close(); + if (extractor != null) { + extractor.close(); + } } catch (Exception e) { log.error("Exception thrown while closing extractor", e); } From a77863cfad80de23d05db55f7733a67b48021238 Mon Sep 17 00:00:00 2001 From: Haruki Okada Date: Wed, 17 Sep 2025 16:28:53 +0900 Subject: [PATCH 5/5] address feedback --- .../runtime/ProcessorSubscription.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java index dd0be214..5d92bf08 100644 --- a/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java +++ b/processor/src/main/java/com/linecorp/decaton/processor/runtime/ProcessorSubscription.java @@ -280,11 +280,11 @@ private void consumeLoop() { } private void cleanUp() { - contexts.close(); - consumeManager.close(); - quotaApplier.close(); - metrics.close(); - processors.close(); + safeClose(contexts); + safeClose(consumeManager); + safeClose(quotaApplier); + safeClose(metrics); + safeClose(processors); updateState(SubscriptionStateListener.State.TERMINATED); } @@ -297,4 +297,12 @@ public CompletableFuture asyncClose() { } return loopTerminateFuture.whenComplete((unused, throwable) -> cleanUp()); } + + private static void safeClose(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + log.error("Exception thrown while closing {}", closeable.getClass().getSimpleName(), e); + } + } }