Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView}
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
import io.aiven.inkless.merge.FileMerger
import io.aiven.inkless.metrics.InklessLogMetrics
import io.aiven.inkless.produce.AppendHandler
import kafka.cluster.Partition
import kafka.log.LogManager
Expand Down Expand Up @@ -274,6 +275,7 @@ class ReplicaManager(val config: KafkaConfig,
private val inklessFileCleaner: Option[FileCleaner] = inklessSharedState.map(new FileCleaner(_))
// FIXME: FileMerger is having issues with hanging queries. Disabling until fixed.
private val inklessFileMerger: Option[FileMerger] = None // inklessSharedState.map(new FileMerger(_))
private val inklessLogMetrics: Option[InklessLogMetrics] = inklessSharedState.map(new InklessLogMetrics(_))

/* epoch of the controller that last changed the leader */
protected val localBrokerId = config.brokerId
Expand Down Expand Up @@ -366,6 +368,8 @@ class ReplicaManager(val config: KafkaConfig,

// There are internal delays in case of errors or absence of work items, no need for extra delays here.
scheduler.schedule("inkless-file-merger", () => inklessFileMerger.foreach(_.run()), sharedState.config().fileMergerInterval().toMillis, sharedState.config().fileMergerInterval().toMillis)

scheduler.schedule("inkless-log-metrics", () => inklessLogMetrics.foreach(_.run()), config.logInitialTaskDelayMs, 30000L)
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InklessLogMetrics is refreshed via a hard-coded 30000L interval. Since other Inkless background tasks take their intervals from config (fileCleanerInterval, fileMergerInterval), consider making this refresh interval configurable (or at least define it as a named constant/Duration) to avoid magic numbers and to allow tuning in production if the Control Plane load or desired freshness changes.

Copilot uses AI. Check for mistakes.
}
}

Expand Down Expand Up @@ -2484,6 +2488,7 @@ class ReplicaManager(val config: KafkaConfig,
inklessFetchOffsetHandler.foreach(_.close())
inklessRetentionEnforcer.foreach(_.close())
inklessFileCleaner.foreach(_.close())
inklessLogMetrics.foreach(_.close())
inklessSharedState.foreach(_.close())
info("Shut down completely")
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for the details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.metrics;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.log.LogMetricNames;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import io.aiven.inkless.common.SharedState;
import io.aiven.inkless.control_plane.ControlPlane;
import io.aiven.inkless.control_plane.GetLogInfoRequest;
import io.aiven.inkless.control_plane.GetLogInfoResponse;
import io.aiven.inkless.control_plane.MetadataView;

/**
* Exposes JMX metrics for diskless (Inkless) partitions that mirror the classic
* {@code kafka.log:type=Log} metrics (LogStartOffset, LogEndOffset, Size). Values are
* sourced from the Control Plane and tagged with topic and partition so existing
* tooling and dashboards work unchanged.
*/
public final class InklessLogMetrics implements Runnable, Closeable {

private static final String METRICS_GROUP_PKG = "kafka.log";
private static final String METRICS_GROUP_TYPE = "Log";

private final ControlPlane controlPlane;
private final MetadataView metadataView;
private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(METRICS_GROUP_PKG, METRICS_GROUP_TYPE);

/** Cache of log info per partition, updated by {@link #run()}. */
private final ConcurrentHashMap<TopicIdPartition, LogInfoSnapshot> cache = new ConcurrentHashMap<>();

/** Partitions that currently have gauges registered, for diffing on refresh. */
private final Set<TopicIdPartition> registeredPartitions = ConcurrentHashMap.newKeySet();

public InklessLogMetrics(final SharedState sharedState) {
this(sharedState.controlPlane(), sharedState.metadata());
}

/**
* Constructor for tests; avoids building a full SharedState.
*/
InklessLogMetrics(final ControlPlane controlPlane, final MetadataView metadataView) {
this.controlPlane = controlPlane;
this.metadataView = metadataView;
run();
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test-only constructor calls run() immediately. In production this means new InklessLogMetrics(sharedState) will synchronously hit the Control Plane during ReplicaManager initialization, potentially increasing broker startup latency or failing startup if the Control Plane is temporarily unavailable. Consider making construction side-effect free (register gauges + schedule the first refresh) and let the scheduler trigger the initial fetch, or provide an explicit start()/init() step called from the scheduled task.

Suggested change
run();

Copilot uses AI. Check for mistakes.
}

@Override
public void run() {
final Set<TopicIdPartition> currentSet = metadataView.getDisklessTopicPartitions();
if (currentSet.isEmpty()) {
removeGaugesForPartitions(registeredPartitions);
registeredPartitions.clear();
cache.clear();
return;
}

final List<TopicIdPartition> currentOrdered = new ArrayList<>(currentSet);
final List<GetLogInfoRequest> requests = new ArrayList<>();
for (final TopicIdPartition tp : currentOrdered) {
requests.add(new GetLogInfoRequest(tp.topicId(), tp.partition()));
}
final List<GetLogInfoResponse> responses = controlPlane.getLogInfo(requests);

final Map<TopicIdPartition, LogInfoSnapshot> newCache = new ConcurrentHashMap<>();
for (int i = 0; i < currentOrdered.size(); i++) {
final TopicIdPartition tp = currentOrdered.get(i);
final GetLogInfoResponse r = responses.get(i);
final long logStartOffset = r.errors() != Errors.NONE
? GetLogInfoResponse.INVALID_OFFSET
: r.logStartOffset();
final long highWatermark = r.errors() != Errors.NONE
? GetLogInfoResponse.INVALID_OFFSET
: r.highWatermark();
final long byteSize = r.errors() != Errors.NONE
? GetLogInfoResponse.INVALID_BYTE_SIZE
: r.byteSize();
newCache.put(tp, new LogInfoSnapshot(logStartOffset, highWatermark, byteSize));
}
Comment on lines +84 to +105
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getLogInfo responses are consumed positionally (responses.get(i)), but ControlPlane#getLogInfo does not document (or enforce) an in-order response contract the way findBatches does. The Postgres implementation builds a VALUES table without an explicit ordering, which can legally return rows in a different order, leading to metrics being attributed to the wrong partition. Consider changing the API to return keyed results (include topicId/partition in the response or return a Map<TopicIdPartition, ...>), or add an explicit request index and guarantee ordering end-to-end; at minimum, validate responses.size() before indexing.

Copilot uses AI. Check for mistakes.
cache.clear();
cache.putAll(newCache);

for (final TopicIdPartition tp : currentSet) {
if (!registeredPartitions.contains(tp)) {
registerGaugesForPartition(tp);
registeredPartitions.add(tp);
}
}
final Set<TopicIdPartition> toRemove = new java.util.HashSet<>(registeredPartitions);
toRemove.removeAll(currentSet);
removeGaugesForPartitions(toRemove);
registeredPartitions.retainAll(currentSet);
}

private static Map<String, String> tagsFor(final TopicIdPartition tp) {
final Map<String, String> tags = new LinkedHashMap<>();
final String topic = tp.topic();
tags.put("topic", topic != null ? topic : "unknown");
tags.put("partition", String.valueOf(tp.partition()));
return tags;
}

private void registerGaugesForPartition(final TopicIdPartition tp) {
final Map<String, String> tags = tagsFor(tp);
metricsGroup.newGauge(LogMetricNames.LOG_START_OFFSET, () -> {
final LogInfoSnapshot s = cache.get(tp);
return s != null ? s.logStartOffset : GetLogInfoResponse.INVALID_OFFSET;
}, tags);
metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, () -> {
final LogInfoSnapshot s = cache.get(tp);
return s != null ? s.highWatermark : GetLogInfoResponse.INVALID_OFFSET;
}, tags);
Comment on lines +135 to +138
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogMetricNames.LOG_END_OFFSET in Kafka’s UnifiedLog is logEndOffset (LEO), but this gauge is currently wired to highWatermark. If Inkless only has a high watermark available, exposing it under the LogEndOffset name will make existing dashboards/alerts misleading. Either source/report the actual log end offset from the Control Plane, or expose high watermark under a distinct metric name/tag so semantics stay consistent with classic kafka.log:type=Log metrics.

Copilot uses AI. Check for mistakes.
metricsGroup.newGauge(LogMetricNames.SIZE, () -> {
final LogInfoSnapshot s = cache.get(tp);
return s != null ? s.byteSize : GetLogInfoResponse.INVALID_BYTE_SIZE;
}, tags);
}

private void removeGaugesForPartitions(final Set<TopicIdPartition> partitions) {
for (final TopicIdPartition tp : partitions) {
final Map<String, String> tags = tagsFor(tp);
metricsGroup.removeMetric(LogMetricNames.LOG_START_OFFSET, tags);
metricsGroup.removeMetric(LogMetricNames.LOG_END_OFFSET, tags);
metricsGroup.removeMetric(LogMetricNames.SIZE, tags);
}
}

@Override
public void close() throws IOException {
removeGaugesForPartitions(Set.copyOf(registeredPartitions));
registeredPartitions.clear();
cache.clear();
}

private record LogInfoSnapshot(long logStartOffset, long highWatermark, long byteSize) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Inkless
* Copyright (C) 2024 - 2025 Aiven OY
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for the details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package io.aiven.inkless.metrics;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.storage.internals.log.LogMetricNames;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import io.aiven.inkless.control_plane.ControlPlane;
import io.aiven.inkless.control_plane.GetLogInfoResponse;
import io.aiven.inkless.control_plane.MetadataView;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
class InklessLogMetricsTest {

private static final String TOPIC = "diskless-metrics-test-topic";
private static final Uuid TOPIC_ID = new Uuid(100L, 200L);
private static final TopicIdPartition T0P0 = new TopicIdPartition(TOPIC_ID, 0, TOPIC);
private static final TopicIdPartition T0P1 = new TopicIdPartition(TOPIC_ID, 1, TOPIC);

@Mock
private ControlPlane controlPlane;
@Mock
private MetadataView metadataView;

private InklessLogMetrics metrics;

@BeforeEach
void setUp() {
when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of());
}

@AfterEach
void tearDown() {
if (metrics != null) {
try {
metrics.close();
} catch (Exception ignored) {
// cleanup
}
}
removeDisklessLogMetricsFromRegistry();
}

private static void removeDisklessLogMetricsFromRegistry() {
List<MetricName> toRemove = KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
.filter(n -> "kafka.log".equals(n.getGroup()) && "Log".equals(n.getType())
&& (LogMetricNames.LOG_START_OFFSET.equals(n.getName())
|| LogMetricNames.LOG_END_OFFSET.equals(n.getName())
|| LogMetricNames.SIZE.equals(n.getName())))
.filter(n -> n.getMBeanName().contains("topic=" + TOPIC))
.collect(Collectors.toList());
toRemove.forEach(KafkaYammerMetrics.defaultRegistry()::removeMetric);
}

private static long gaugeValue(String metricName, String topic, int partition) {
String suffix = ",name=" + metricName + ",topic=" + topic + ",partition=" + partition;
return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(e -> e.getKey().getMBeanName().endsWith(suffix))
.findFirst()
.map(e -> ((Number) ((Gauge<?>) e.getValue()).value()).longValue())
.orElse(-999L);
}

@Test
void registersGaugesAndReturnsValuesFromGetLogInfo() {
when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0, T0P1));
when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(
GetLogInfoResponse.success(0L, 10L, 100L),
GetLogInfoResponse.success(1L, 11L, 200L)
));

metrics = new InklessLogMetrics(controlPlane, metadataView);

assertThat(gaugeValue(LogMetricNames.LOG_START_OFFSET, TOPIC, 0)).isEqualTo(0L);
assertThat(gaugeValue(LogMetricNames.LOG_END_OFFSET, TOPIC, 0)).isEqualTo(10L);
assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(100L);
assertThat(gaugeValue(LogMetricNames.LOG_START_OFFSET, TOPIC, 1)).isEqualTo(1L);
assertThat(gaugeValue(LogMetricNames.LOG_END_OFFSET, TOPIC, 1)).isEqualTo(11L);
assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(200L);
}

@Test
void removesGaugesWhenPartitionDisappears() {
when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0, T0P1));
when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(
GetLogInfoResponse.success(0L, 10L, 100L),
GetLogInfoResponse.success(1L, 11L, 200L)
));

metrics = new InklessLogMetrics(controlPlane, metadataView);
assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(200L);

when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0));
when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(GetLogInfoResponse.success(0L, 10L, 100L)));
metrics.run();

assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(100L);
assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(-999L);
}

@Test
void closeRemovesAllGauges() throws Exception {
when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0));
when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(GetLogInfoResponse.success(0L, 5L, 50L)));

metrics = new InklessLogMetrics(controlPlane, metadataView);
assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(50L);

metrics.close();
metrics = null;

assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(-999L);
}
}