Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.linecorp.decaton.processor;

import java.time.Duration;
import java.util.Base64;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -26,6 +27,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -297,4 +301,47 @@ public void testRetryQueueingMigrateToHeader() throws Exception {
.build()
.run();
}

// This test verifies that Decaton works properly when
// transformation is applied to value bytes on kafka-clients serializer/serializer layer
// even with retry enabled.
@Test
@Timeout(30)
public void testConsumingWithKafkaDeserializer() throws Exception {
ProcessorTestSuite
.builder(rule)
.numTasks(1000)
.producerSupplier(bootstrapServers -> TestUtils.producer(
bootstrapServers,
new ByteArraySerializer(),
(topic, bytes) -> Base64.getEncoder().encode(bytes)))
.customDeserializer(new Deserializer<TestTask>() {
@Override
public TestTask deserialize(String topic, byte[] data) {
throw new UnsupportedOperationException();
}

@Override
public TestTask deserialize(String topic, Headers headers, byte[] data) {
return new TestTask.TestTaskDeserializer()
.deserialize(Base64.getDecoder().decode(data));
}
})
.configureProcessorsBuilder(builder -> builder.thenProcess((ctx, task) -> {
if (ctx.metadata().retryCount() == 0) {
ctx.retry();
}
}))
.retryConfig(RetryConfig.builder()
.retryTopic(retryTopic)
.backoff(Duration.ofMillis(10))
.build())
// If we retry tasks, there's no guarantee about ordering nor serial processing
.excludeSemantics(
GuaranteeType.PROCESS_ORDERING,
GuaranteeType.SERIAL_PROCESSING)
.customSemantics(new ProcessRetriedTask())
.build()
.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
@Builder
@Accessors(fluent = true)
public class ConsumedRecord {
/**
* The topic the record is consumed from
*/
String topic;

/**
* The timestamp of the record
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,11 @@ private void consumeLoop() {
}

private void cleanUp() {
contexts.close();
consumeManager.close();
quotaApplier.close();
metrics.close();
safeClose(contexts);
safeClose(consumeManager);
safeClose(quotaApplier);
safeClose(metrics);
safeClose(processors);
updateState(SubscriptionStateListener.State.TERMINATED);
}

Expand All @@ -296,4 +297,12 @@ public CompletableFuture<Void> asyncClose() {
}
return loopTerminateFuture.whenComplete((unused, throwable) -> cleanUp());
}

private static void safeClose(AutoCloseable closeable) {
try {
closeable.close();
} catch (Exception e) {
log.error("Exception thrown while closing {}", closeable.getClass().getSimpleName(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.List;
import java.util.function.Supplier;

import com.linecorp.decaton.client.DecatonClient;
import com.linecorp.decaton.common.Deserializer;
import com.linecorp.decaton.processor.DecatonProcessor;
import com.linecorp.decaton.processor.TaskMetadata;
Expand All @@ -29,6 +30,7 @@

import lombok.Getter;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;

/**
* A class defines processing pipeline for {@link ProcessorSubscription}.
Expand All @@ -39,12 +41,12 @@
public class ProcessorsBuilder<T> {
@Getter
private final String topic;
private final Deserializer<T> userSuppliedDeserializer;
private final org.apache.kafka.common.serialization.Deserializer<T> userSuppliedDeserializer;
private final TaskExtractor<T> userSuppliedTaskExtractor;

private final List<DecatonProcessorSupplier<T>> suppliers;

ProcessorsBuilder(String topic, Deserializer<T> userSuppliedDeserializer, TaskExtractor<T> userSuppliedTaskExtractor) {
ProcessorsBuilder(String topic, org.apache.kafka.common.serialization.Deserializer<T> userSuppliedDeserializer, TaskExtractor<T> userSuppliedTaskExtractor) {
this.topic = topic;
this.userSuppliedDeserializer = userSuppliedDeserializer;
this.userSuppliedTaskExtractor = userSuppliedTaskExtractor;
Expand All @@ -66,12 +68,29 @@ public class ProcessorsBuilder<T> {
* @return an instance of {@link ProcessorsBuilder}.
*/
public static <T> ProcessorsBuilder<T> consuming(String topic, Deserializer<T> deserializer) {
return new ProcessorsBuilder<>(topic, (t, bytes) -> deserializer.deserialize(bytes), null);
}

/**
* Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type
* which can be parsed by deserializer.
* <p>
* If you want to extract custom {@link TaskMetadata} (e.g. for delayed processing), you can use
* {@link #consuming(String, TaskExtractor)} instead.
* @param topic the name of topic to consume.
* @param deserializer the deserializer to instantiate task of type {@link T} from serialized bytes.
* @param <T> the type of instantiated tasks.
* @return an instance of {@link ProcessorsBuilder}.
*/
public static <T> ProcessorsBuilder<T> consuming(String topic, org.apache.kafka.common.serialization.Deserializer<T> deserializer) {
return new ProcessorsBuilder<>(topic, deserializer, null);
}

/**
* Create new {@link ProcessorsBuilder} that consumes message from topic expecting tasks of type
* which can be parsed by taskExtractor.
* When this overload is used, it is user's responsibility to extract {@link TaskMetadata} from record
* even for ones produced by {@link DecatonClient}.
* @param topic the name of topic to consume.
* @param taskExtractor the extractor to extract task of type {@link T} from message bytes.
* @param <T> the type of instantiated tasks.
Expand Down Expand Up @@ -141,14 +160,15 @@ Processors<T> build(DecatonProcessorSupplier<byte[]> retryProcessorSupplier, Pro
return new Processors<>(suppliers, retryProcessorSupplier, taskExtractor, retryTaskExtractor);
}

@Slf4j
private static class RetryTaskExtractor<T> implements TaskExtractor<T> {
private final DefaultTaskExtractor<byte[]> outerExtractor;
private final TaskExtractor<T> innerExtractor;

RetryTaskExtractor(Property<Boolean> legacyFallbackEnabledProperty,
TaskExtractor<T> innerExtractor) {
this.innerExtractor = innerExtractor;
this.outerExtractor = new DefaultTaskExtractor<>(bytes -> bytes, legacyFallbackEnabledProperty);
outerExtractor = new DefaultTaskExtractor<>((topic, bytes) -> bytes, legacyFallbackEnabledProperty);
}

@Override
Expand All @@ -160,6 +180,7 @@ public DecatonTask<T> extract(ConsumedRecord record) {
DecatonTask<byte[]> outerTask = outerExtractor.extract(record);
ConsumedRecord inner = ConsumedRecord
.builder()
.topic(record.topic())
.recordTimestampMillis(record.recordTimestampMillis())
.headers(record.headers())
.key(record.key())
Expand All @@ -172,5 +193,15 @@ public DecatonTask<T> extract(ConsumedRecord record) {
extracted.taskData(),
extracted.taskDataBytes());
}

@Override
public void close() {
try {
innerExtractor.close();
} catch (Exception e) {
log.error("Failed to close innerExtractor", e);
}
outerExtractor.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
* An interface for classes extracting {@link DecatonTask} from given record.
* @param <T> type of task.
*/
public interface TaskExtractor<T> {
public interface TaskExtractor<T> extends AutoCloseable {
/**
* Extract object of type {@link DecatonTask} from given bytes.
* @param record {@link ConsumedRecord} to extract task from.
Expand All @@ -29,4 +29,13 @@ public interface TaskExtractor<T> {
* If the method throws an exception, the task will be discarded and processor continues to process subsequent tasks.
*/
DecatonTask<T> extract(ConsumedRecord record);

/**
* Clean up any resources associated with it.
* This method is invoked when {@link ProcessorSubscription} is being closed.
*/
@Override
default void close() {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.linecorp.decaton.processor.runtime.internal;

import org.apache.kafka.common.serialization.Deserializer;

import com.google.protobuf.InvalidProtocolBufferException;

import com.linecorp.decaton.common.Deserializer;
import com.linecorp.decaton.client.internal.TaskMetadataUtil;
import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
Expand All @@ -42,7 +43,7 @@ public DecatonTask<T> extract(ConsumedRecord record) {
byte[] taskDataBytes = record.value();
return new DecatonTask<>(
TaskMetadata.fromProto(headerMeta),
taskDeserializer.deserialize(taskDataBytes),
taskDeserializer.deserialize(record.topic(), record.headers(), taskDataBytes),
taskDataBytes);
} else {
// There are two cases where task metadata header is missing:
Expand All @@ -59,13 +60,13 @@ public DecatonTask<T> extract(ConsumedRecord record) {

return new DecatonTask<>(
metadata,
taskDeserializer.deserialize(taskDataBytes),
taskDeserializer.deserialize(record.topic(), record.headers(), taskDataBytes),
taskDataBytes);
} catch (InvalidProtocolBufferException e) {
throw new IllegalArgumentException(e);
}
} else {
T task = taskDeserializer.deserialize(record.value());
T task = taskDeserializer.deserialize(record.topic(), record.headers(), record.value());
return new DecatonTask<>(
TaskMetadata.builder()
.timestampMillis(record.recordTimestampMillis())
Expand All @@ -75,4 +76,9 @@ public DecatonTask<T> extract(ConsumedRecord record) {
}
}
}

@Override
public void close() {
taskDeserializer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ DecatonTask<T> extract(TaskRequest request) {
final DecatonTask<T> extracted;
extracted = taskExtractor.extract(
ConsumedRecord.builder()
.topic(request.topicPartition().topic())
.recordTimestampMillis(request.recordTimestamp())
.headers(request.headers())
.key(request.key())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import com.linecorp.decaton.processor.runtime.DecatonProcessorSupplier;
import com.linecorp.decaton.processor.runtime.TaskExtractor;

public class Processors<T> {
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Processors<T> implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(Processors.class);

private final List<DecatonProcessorSupplier<T>> suppliers;
Expand Down Expand Up @@ -125,4 +128,20 @@ public void destroyThreadScope(String subscriptionId, TopicPartition tp, int thr
retryProcessorSupplier.leaveThreadScope(subscriptionId, tp, threadId);
}
}

@Override
public void close() {
closeExtractor(taskExtractor);
closeExtractor(retryTaskExtractor);
}

private void closeExtractor(TaskExtractor<T> extractor) {
try {
if (extractor != null) {
extractor.close();
}
} catch (Exception e) {
log.error("Exception thrown while closing extractor", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.io.UncheckedIOException;

import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.jupiter.api.Test;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Parser;

import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.processor.runtime.ConsumedRecord;
import com.linecorp.decaton.processor.runtime.DecatonTask;
import com.linecorp.decaton.processor.runtime.ProcessorProperties;
import com.linecorp.decaton.processor.runtime.Property;
import com.linecorp.decaton.protobuf.ProtocolBuffersDeserializer;
import com.linecorp.decaton.protocol.internal.DecatonInternal.DecatonTaskRequest;
import com.linecorp.decaton.protocol.Decaton.TaskMetadataProto;
import com.linecorp.decaton.protocol.Sample.HelloTask;
Expand All @@ -43,11 +48,12 @@ public class DefaultTaskExtractorTest {
@Test
public void testExtract() {
DefaultTaskExtractor<HelloTask> extractor = new DefaultTaskExtractor<>(
new ProtocolBuffersDeserializer<>(HelloTask.parser()),
deserializer(HelloTask.parser()),
Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, true));

ConsumedRecord record = ConsumedRecord
.builder()
.topic("topic")
.recordTimestampMillis(1561709151628L)
.headers(new RecordHeaders())
.value(LEGACY_REQUEST.toByteArray())
Expand All @@ -64,11 +70,12 @@ public void testExtract() {
@Test
public void testExtractBypassLegacyFormatWhenHeaderMissing() {
DefaultTaskExtractor<HelloTask> extractor = new DefaultTaskExtractor<>(
new ProtocolBuffersDeserializer<>(HelloTask.parser()),
deserializer(HelloTask.parser()),
Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED, false));

ConsumedRecord record = ConsumedRecord
.builder()
.topic("topic")
.recordTimestampMillis(1561709151628L)
.headers(new RecordHeaders())
.value(TASK.toByteArray())
Expand All @@ -84,4 +91,14 @@ public void testExtractBypassLegacyFormatWhenHeaderMissing() {

assertArrayEquals(TASK.toByteArray(), extracted.taskDataBytes());
}

private static <T> Deserializer<T> deserializer(Parser<T> parser) {
return (topic, data) -> {
try {
return parser.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void testCleanupPartiallyInitializedProcessors() {

Processors<HelloTask> processors = new Processors<>(
suppliers, null,
new DefaultTaskExtractor<>(bytes -> HelloTask.getDefaultInstance(),
new DefaultTaskExtractor<>((topic, bytes) -> HelloTask.getDefaultInstance(),
Property.ofStatic(ProcessorProperties.CONFIG_LEGACY_PARSE_FALLBACK_ENABLED)),
null);

Expand Down
Loading