diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2a39547fbc..6c42c7991b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -22,6 +22,7 @@ import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler}
import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView}
import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer}
import io.aiven.inkless.merge.FileMerger
+import io.aiven.inkless.metrics.InklessLogMetrics
import io.aiven.inkless.produce.AppendHandler
import kafka.cluster.Partition
import kafka.log.LogManager
@@ -274,6 +275,7 @@ class ReplicaManager(val config: KafkaConfig,
private val inklessFileCleaner: Option[FileCleaner] = inklessSharedState.map(new FileCleaner(_))
// FIXME: FileMerger is having issues with hanging queries. Disabling until fixed.
private val inklessFileMerger: Option[FileMerger] = None // inklessSharedState.map(new FileMerger(_))
+ private val inklessLogMetrics: Option[InklessLogMetrics] = inklessSharedState.map(new InklessLogMetrics(_))
/* epoch of the controller that last changed the leader */
protected val localBrokerId = config.brokerId
@@ -366,6 +368,8 @@ class ReplicaManager(val config: KafkaConfig,
// There are internal delays in case of errors or absence of work items, no need for extra delays here.
scheduler.schedule("inkless-file-merger", () => inklessFileMerger.foreach(_.run()), sharedState.config().fileMergerInterval().toMillis, sharedState.config().fileMergerInterval().toMillis)
+
+ scheduler.schedule("inkless-log-metrics", () => inklessLogMetrics.foreach(_.run()), config.logInitialTaskDelayMs, 30000L)
}
}
@@ -2484,6 +2488,7 @@ class ReplicaManager(val config: KafkaConfig,
inklessFetchOffsetHandler.foreach(_.close())
inklessRetentionEnforcer.foreach(_.close())
inklessFileCleaner.foreach(_.close())
+ inklessLogMetrics.foreach(_.close())
inklessSharedState.foreach(_.close())
info("Shut down completely")
}
diff --git a/storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java
new file mode 100644
index 0000000000..63fd2c4dfc
--- /dev/null
+++ b/storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java
@@ -0,0 +1,162 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for the details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package io.aiven.inkless.metrics;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.log.LogMetricNames;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.aiven.inkless.common.SharedState;
+import io.aiven.inkless.control_plane.ControlPlane;
+import io.aiven.inkless.control_plane.GetLogInfoRequest;
+import io.aiven.inkless.control_plane.GetLogInfoResponse;
+import io.aiven.inkless.control_plane.MetadataView;
+
+/**
+ * Exposes JMX metrics for diskless (Inkless) partitions that mirror the classic
+ * {@code kafka.log:type=Log} metrics (LogStartOffset, LogEndOffset, Size). Values are
+ * sourced from the Control Plane and tagged with topic and partition so existing
+ * tooling and dashboards work unchanged.
+ */
+public final class InklessLogMetrics implements Runnable, Closeable {
+
+ private static final String METRICS_GROUP_PKG = "kafka.log";
+ private static final String METRICS_GROUP_TYPE = "Log";
+
+ private final ControlPlane controlPlane;
+ private final MetadataView metadataView;
+ private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(METRICS_GROUP_PKG, METRICS_GROUP_TYPE);
+
+ /** Cache of log info per partition, updated by {@link #run()}. */
+ private final ConcurrentHashMap cache = new ConcurrentHashMap<>();
+
+ /** Partitions that currently have gauges registered, for diffing on refresh. */
+ private final Set registeredPartitions = ConcurrentHashMap.newKeySet();
+
+ public InklessLogMetrics(final SharedState sharedState) {
+ this(sharedState.controlPlane(), sharedState.metadata());
+ }
+
+ /**
+ * Constructor for tests; avoids building a full SharedState.
+ */
+ InklessLogMetrics(final ControlPlane controlPlane, final MetadataView metadataView) {
+ this.controlPlane = controlPlane;
+ this.metadataView = metadataView;
+ run();
+ }
+
+ @Override
+ public void run() {
+ final Set currentSet = metadataView.getDisklessTopicPartitions();
+ if (currentSet.isEmpty()) {
+ removeGaugesForPartitions(registeredPartitions);
+ registeredPartitions.clear();
+ cache.clear();
+ return;
+ }
+
+ final List currentOrdered = new ArrayList<>(currentSet);
+ final List requests = new ArrayList<>();
+ for (final TopicIdPartition tp : currentOrdered) {
+ requests.add(new GetLogInfoRequest(tp.topicId(), tp.partition()));
+ }
+ final List responses = controlPlane.getLogInfo(requests);
+
+ final Map newCache = new ConcurrentHashMap<>();
+ for (int i = 0; i < currentOrdered.size(); i++) {
+ final TopicIdPartition tp = currentOrdered.get(i);
+ final GetLogInfoResponse r = responses.get(i);
+ final long logStartOffset = r.errors() != Errors.NONE
+ ? GetLogInfoResponse.INVALID_OFFSET
+ : r.logStartOffset();
+ final long highWatermark = r.errors() != Errors.NONE
+ ? GetLogInfoResponse.INVALID_OFFSET
+ : r.highWatermark();
+ final long byteSize = r.errors() != Errors.NONE
+ ? GetLogInfoResponse.INVALID_BYTE_SIZE
+ : r.byteSize();
+ newCache.put(tp, new LogInfoSnapshot(logStartOffset, highWatermark, byteSize));
+ }
+ cache.clear();
+ cache.putAll(newCache);
+
+ for (final TopicIdPartition tp : currentSet) {
+ if (!registeredPartitions.contains(tp)) {
+ registerGaugesForPartition(tp);
+ registeredPartitions.add(tp);
+ }
+ }
+ final Set toRemove = new java.util.HashSet<>(registeredPartitions);
+ toRemove.removeAll(currentSet);
+ removeGaugesForPartitions(toRemove);
+ registeredPartitions.retainAll(currentSet);
+ }
+
+ private static Map tagsFor(final TopicIdPartition tp) {
+ final Map tags = new LinkedHashMap<>();
+ final String topic = tp.topic();
+ tags.put("topic", topic != null ? topic : "unknown");
+ tags.put("partition", String.valueOf(tp.partition()));
+ return tags;
+ }
+
+ private void registerGaugesForPartition(final TopicIdPartition tp) {
+ final Map tags = tagsFor(tp);
+ metricsGroup.newGauge(LogMetricNames.LOG_START_OFFSET, () -> {
+ final LogInfoSnapshot s = cache.get(tp);
+ return s != null ? s.logStartOffset : GetLogInfoResponse.INVALID_OFFSET;
+ }, tags);
+ metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, () -> {
+ final LogInfoSnapshot s = cache.get(tp);
+ return s != null ? s.highWatermark : GetLogInfoResponse.INVALID_OFFSET;
+ }, tags);
+ metricsGroup.newGauge(LogMetricNames.SIZE, () -> {
+ final LogInfoSnapshot s = cache.get(tp);
+ return s != null ? s.byteSize : GetLogInfoResponse.INVALID_BYTE_SIZE;
+ }, tags);
+ }
+
+ private void removeGaugesForPartitions(final Set partitions) {
+ for (final TopicIdPartition tp : partitions) {
+ final Map tags = tagsFor(tp);
+ metricsGroup.removeMetric(LogMetricNames.LOG_START_OFFSET, tags);
+ metricsGroup.removeMetric(LogMetricNames.LOG_END_OFFSET, tags);
+ metricsGroup.removeMetric(LogMetricNames.SIZE, tags);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ removeGaugesForPartitions(Set.copyOf(registeredPartitions));
+ registeredPartitions.clear();
+ cache.clear();
+ }
+
+ private record LogInfoSnapshot(long logStartOffset, long highWatermark, long byteSize) {}
+}
diff --git a/storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java b/storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java
new file mode 100644
index 0000000000..e97786abf1
--- /dev/null
+++ b/storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java
@@ -0,0 +1,152 @@
+/*
+ * Inkless
+ * Copyright (C) 2024 - 2025 Aiven OY
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for the details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+package io.aiven.inkless.metrics;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.storage.internals.log.LogMetricNames;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.aiven.inkless.control_plane.ControlPlane;
+import io.aiven.inkless.control_plane.GetLogInfoResponse;
+import io.aiven.inkless.control_plane.MetadataView;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+class InklessLogMetricsTest {
+
+ private static final String TOPIC = "diskless-metrics-test-topic";
+ private static final Uuid TOPIC_ID = new Uuid(100L, 200L);
+ private static final TopicIdPartition T0P0 = new TopicIdPartition(TOPIC_ID, 0, TOPIC);
+ private static final TopicIdPartition T0P1 = new TopicIdPartition(TOPIC_ID, 1, TOPIC);
+
+ @Mock
+ private ControlPlane controlPlane;
+ @Mock
+ private MetadataView metadataView;
+
+ private InklessLogMetrics metrics;
+
+ @BeforeEach
+ void setUp() {
+ when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of());
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (metrics != null) {
+ try {
+ metrics.close();
+ } catch (Exception ignored) {
+ // cleanup
+ }
+ }
+ removeDisklessLogMetricsFromRegistry();
+ }
+
+ private static void removeDisklessLogMetricsFromRegistry() {
+ List toRemove = KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream()
+ .filter(n -> "kafka.log".equals(n.getGroup()) && "Log".equals(n.getType())
+ && (LogMetricNames.LOG_START_OFFSET.equals(n.getName())
+ || LogMetricNames.LOG_END_OFFSET.equals(n.getName())
+ || LogMetricNames.SIZE.equals(n.getName())))
+ .filter(n -> n.getMBeanName().contains("topic=" + TOPIC))
+ .collect(Collectors.toList());
+ toRemove.forEach(KafkaYammerMetrics.defaultRegistry()::removeMetric);
+ }
+
+ private static long gaugeValue(String metricName, String topic, int partition) {
+ String suffix = ",name=" + metricName + ",topic=" + topic + ",partition=" + partition;
+ return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
+ .filter(e -> e.getKey().getMBeanName().endsWith(suffix))
+ .findFirst()
+ .map(e -> ((Number) ((Gauge>) e.getValue()).value()).longValue())
+ .orElse(-999L);
+ }
+
+ @Test
+ void registersGaugesAndReturnsValuesFromGetLogInfo() {
+ when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0, T0P1));
+ when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(
+ GetLogInfoResponse.success(0L, 10L, 100L),
+ GetLogInfoResponse.success(1L, 11L, 200L)
+ ));
+
+ metrics = new InklessLogMetrics(controlPlane, metadataView);
+
+ assertThat(gaugeValue(LogMetricNames.LOG_START_OFFSET, TOPIC, 0)).isEqualTo(0L);
+ assertThat(gaugeValue(LogMetricNames.LOG_END_OFFSET, TOPIC, 0)).isEqualTo(10L);
+ assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(100L);
+ assertThat(gaugeValue(LogMetricNames.LOG_START_OFFSET, TOPIC, 1)).isEqualTo(1L);
+ assertThat(gaugeValue(LogMetricNames.LOG_END_OFFSET, TOPIC, 1)).isEqualTo(11L);
+ assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(200L);
+ }
+
+ @Test
+ void removesGaugesWhenPartitionDisappears() {
+ when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0, T0P1));
+ when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(
+ GetLogInfoResponse.success(0L, 10L, 100L),
+ GetLogInfoResponse.success(1L, 11L, 200L)
+ ));
+
+ metrics = new InklessLogMetrics(controlPlane, metadataView);
+ assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(200L);
+
+ when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0));
+ when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(GetLogInfoResponse.success(0L, 10L, 100L)));
+ metrics.run();
+
+ assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(100L);
+ assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(-999L);
+ }
+
+ @Test
+ void closeRemovesAllGauges() throws Exception {
+ when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0));
+ when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(GetLogInfoResponse.success(0L, 5L, 50L)));
+
+ metrics = new InklessLogMetrics(controlPlane, metadataView);
+ assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(50L);
+
+ metrics.close();
+ metrics = null;
+
+ assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(-999L);
+ }
+}