From 1256a1e4ab079ade1589085eafb30ce5093c25f0 Mon Sep 17 00:00:00 2001 From: Viktor Somogyi-Vass Date: Fri, 13 Feb 2026 14:32:25 +0100 Subject: [PATCH] feat(metadata:inkless): Log metrics for Inkless --- .../scala/kafka/server/ReplicaManager.scala | 5 + .../inkless/metrics/InklessLogMetrics.java | 162 ++++++++++++++++++ .../metrics/InklessLogMetricsTest.java | 152 ++++++++++++++++ 3 files changed, 319 insertions(+) create mode 100644 storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java create mode 100644 storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2a39547fbc..6c42c7991b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -22,6 +22,7 @@ import io.aiven.inkless.consume.{FetchHandler, FetchOffsetHandler} import io.aiven.inkless.control_plane.{BatchInfo, FindBatchRequest, FindBatchResponse, MetadataView} import io.aiven.inkless.delete.{DeleteRecordsInterceptor, FileCleaner, RetentionEnforcer} import io.aiven.inkless.merge.FileMerger +import io.aiven.inkless.metrics.InklessLogMetrics import io.aiven.inkless.produce.AppendHandler import kafka.cluster.Partition import kafka.log.LogManager @@ -274,6 +275,7 @@ class ReplicaManager(val config: KafkaConfig, private val inklessFileCleaner: Option[FileCleaner] = inklessSharedState.map(new FileCleaner(_)) // FIXME: FileMerger is having issues with hanging queries. Disabling until fixed. private val inklessFileMerger: Option[FileMerger] = None // inklessSharedState.map(new FileMerger(_)) + private val inklessLogMetrics: Option[InklessLogMetrics] = inklessSharedState.map(new InklessLogMetrics(_)) /* epoch of the controller that last changed the leader */ protected val localBrokerId = config.brokerId @@ -366,6 +368,8 @@ class ReplicaManager(val config: KafkaConfig, // There are internal delays in case of errors or absence of work items, no need for extra delays here. scheduler.schedule("inkless-file-merger", () => inklessFileMerger.foreach(_.run()), sharedState.config().fileMergerInterval().toMillis, sharedState.config().fileMergerInterval().toMillis) + + scheduler.schedule("inkless-log-metrics", () => inklessLogMetrics.foreach(_.run()), config.logInitialTaskDelayMs, 30000L) } } @@ -2484,6 +2488,7 @@ class ReplicaManager(val config: KafkaConfig, inklessFetchOffsetHandler.foreach(_.close()) inklessRetentionEnforcer.foreach(_.close()) inklessFileCleaner.foreach(_.close()) + inklessLogMetrics.foreach(_.close()) inklessSharedState.foreach(_.close()) info("Shut down completely") } diff --git a/storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java b/storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java new file mode 100644 index 0000000000..63fd2c4dfc --- /dev/null +++ b/storage/inkless/src/main/java/io/aiven/inkless/metrics/InklessLogMetrics.java @@ -0,0 +1,162 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for the details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.metrics; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.log.LogMetricNames; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import io.aiven.inkless.common.SharedState; +import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.control_plane.GetLogInfoRequest; +import io.aiven.inkless.control_plane.GetLogInfoResponse; +import io.aiven.inkless.control_plane.MetadataView; + +/** + * Exposes JMX metrics for diskless (Inkless) partitions that mirror the classic + * {@code kafka.log:type=Log} metrics (LogStartOffset, LogEndOffset, Size). Values are + * sourced from the Control Plane and tagged with topic and partition so existing + * tooling and dashboards work unchanged. + */ +public final class InklessLogMetrics implements Runnable, Closeable { + + private static final String METRICS_GROUP_PKG = "kafka.log"; + private static final String METRICS_GROUP_TYPE = "Log"; + + private final ControlPlane controlPlane; + private final MetadataView metadataView; + private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(METRICS_GROUP_PKG, METRICS_GROUP_TYPE); + + /** Cache of log info per partition, updated by {@link #run()}. */ + private final ConcurrentHashMap cache = new ConcurrentHashMap<>(); + + /** Partitions that currently have gauges registered, for diffing on refresh. */ + private final Set registeredPartitions = ConcurrentHashMap.newKeySet(); + + public InklessLogMetrics(final SharedState sharedState) { + this(sharedState.controlPlane(), sharedState.metadata()); + } + + /** + * Constructor for tests; avoids building a full SharedState. + */ + InklessLogMetrics(final ControlPlane controlPlane, final MetadataView metadataView) { + this.controlPlane = controlPlane; + this.metadataView = metadataView; + run(); + } + + @Override + public void run() { + final Set currentSet = metadataView.getDisklessTopicPartitions(); + if (currentSet.isEmpty()) { + removeGaugesForPartitions(registeredPartitions); + registeredPartitions.clear(); + cache.clear(); + return; + } + + final List currentOrdered = new ArrayList<>(currentSet); + final List requests = new ArrayList<>(); + for (final TopicIdPartition tp : currentOrdered) { + requests.add(new GetLogInfoRequest(tp.topicId(), tp.partition())); + } + final List responses = controlPlane.getLogInfo(requests); + + final Map newCache = new ConcurrentHashMap<>(); + for (int i = 0; i < currentOrdered.size(); i++) { + final TopicIdPartition tp = currentOrdered.get(i); + final GetLogInfoResponse r = responses.get(i); + final long logStartOffset = r.errors() != Errors.NONE + ? GetLogInfoResponse.INVALID_OFFSET + : r.logStartOffset(); + final long highWatermark = r.errors() != Errors.NONE + ? GetLogInfoResponse.INVALID_OFFSET + : r.highWatermark(); + final long byteSize = r.errors() != Errors.NONE + ? GetLogInfoResponse.INVALID_BYTE_SIZE + : r.byteSize(); + newCache.put(tp, new LogInfoSnapshot(logStartOffset, highWatermark, byteSize)); + } + cache.clear(); + cache.putAll(newCache); + + for (final TopicIdPartition tp : currentSet) { + if (!registeredPartitions.contains(tp)) { + registerGaugesForPartition(tp); + registeredPartitions.add(tp); + } + } + final Set toRemove = new java.util.HashSet<>(registeredPartitions); + toRemove.removeAll(currentSet); + removeGaugesForPartitions(toRemove); + registeredPartitions.retainAll(currentSet); + } + + private static Map tagsFor(final TopicIdPartition tp) { + final Map tags = new LinkedHashMap<>(); + final String topic = tp.topic(); + tags.put("topic", topic != null ? topic : "unknown"); + tags.put("partition", String.valueOf(tp.partition())); + return tags; + } + + private void registerGaugesForPartition(final TopicIdPartition tp) { + final Map tags = tagsFor(tp); + metricsGroup.newGauge(LogMetricNames.LOG_START_OFFSET, () -> { + final LogInfoSnapshot s = cache.get(tp); + return s != null ? s.logStartOffset : GetLogInfoResponse.INVALID_OFFSET; + }, tags); + metricsGroup.newGauge(LogMetricNames.LOG_END_OFFSET, () -> { + final LogInfoSnapshot s = cache.get(tp); + return s != null ? s.highWatermark : GetLogInfoResponse.INVALID_OFFSET; + }, tags); + metricsGroup.newGauge(LogMetricNames.SIZE, () -> { + final LogInfoSnapshot s = cache.get(tp); + return s != null ? s.byteSize : GetLogInfoResponse.INVALID_BYTE_SIZE; + }, tags); + } + + private void removeGaugesForPartitions(final Set partitions) { + for (final TopicIdPartition tp : partitions) { + final Map tags = tagsFor(tp); + metricsGroup.removeMetric(LogMetricNames.LOG_START_OFFSET, tags); + metricsGroup.removeMetric(LogMetricNames.LOG_END_OFFSET, tags); + metricsGroup.removeMetric(LogMetricNames.SIZE, tags); + } + } + + @Override + public void close() throws IOException { + removeGaugesForPartitions(Set.copyOf(registeredPartitions)); + registeredPartitions.clear(); + cache.clear(); + } + + private record LogInfoSnapshot(long logStartOffset, long highWatermark, long byteSize) {} +} diff --git a/storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java b/storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java new file mode 100644 index 0000000000..e97786abf1 --- /dev/null +++ b/storage/inkless/src/test/java/io/aiven/inkless/metrics/InklessLogMetricsTest.java @@ -0,0 +1,152 @@ +/* + * Inkless + * Copyright (C) 2024 - 2025 Aiven OY + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for the details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package io.aiven.inkless.metrics; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.storage.internals.log.LogMetricNames; + +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.core.MetricName; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import io.aiven.inkless.control_plane.ControlPlane; +import io.aiven.inkless.control_plane.GetLogInfoResponse; +import io.aiven.inkless.control_plane.MetadataView; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.STRICT_STUBS) +class InklessLogMetricsTest { + + private static final String TOPIC = "diskless-metrics-test-topic"; + private static final Uuid TOPIC_ID = new Uuid(100L, 200L); + private static final TopicIdPartition T0P0 = new TopicIdPartition(TOPIC_ID, 0, TOPIC); + private static final TopicIdPartition T0P1 = new TopicIdPartition(TOPIC_ID, 1, TOPIC); + + @Mock + private ControlPlane controlPlane; + @Mock + private MetadataView metadataView; + + private InklessLogMetrics metrics; + + @BeforeEach + void setUp() { + when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of()); + } + + @AfterEach + void tearDown() { + if (metrics != null) { + try { + metrics.close(); + } catch (Exception ignored) { + // cleanup + } + } + removeDisklessLogMetricsFromRegistry(); + } + + private static void removeDisklessLogMetricsFromRegistry() { + List toRemove = KafkaYammerMetrics.defaultRegistry().allMetrics().keySet().stream() + .filter(n -> "kafka.log".equals(n.getGroup()) && "Log".equals(n.getType()) + && (LogMetricNames.LOG_START_OFFSET.equals(n.getName()) + || LogMetricNames.LOG_END_OFFSET.equals(n.getName()) + || LogMetricNames.SIZE.equals(n.getName()))) + .filter(n -> n.getMBeanName().contains("topic=" + TOPIC)) + .collect(Collectors.toList()); + toRemove.forEach(KafkaYammerMetrics.defaultRegistry()::removeMetric); + } + + private static long gaugeValue(String metricName, String topic, int partition) { + String suffix = ",name=" + metricName + ",topic=" + topic + ",partition=" + partition; + return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() + .filter(e -> e.getKey().getMBeanName().endsWith(suffix)) + .findFirst() + .map(e -> ((Number) ((Gauge) e.getValue()).value()).longValue()) + .orElse(-999L); + } + + @Test + void registersGaugesAndReturnsValuesFromGetLogInfo() { + when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0, T0P1)); + when(controlPlane.getLogInfo(anyList())).thenReturn(List.of( + GetLogInfoResponse.success(0L, 10L, 100L), + GetLogInfoResponse.success(1L, 11L, 200L) + )); + + metrics = new InklessLogMetrics(controlPlane, metadataView); + + assertThat(gaugeValue(LogMetricNames.LOG_START_OFFSET, TOPIC, 0)).isEqualTo(0L); + assertThat(gaugeValue(LogMetricNames.LOG_END_OFFSET, TOPIC, 0)).isEqualTo(10L); + assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(100L); + assertThat(gaugeValue(LogMetricNames.LOG_START_OFFSET, TOPIC, 1)).isEqualTo(1L); + assertThat(gaugeValue(LogMetricNames.LOG_END_OFFSET, TOPIC, 1)).isEqualTo(11L); + assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(200L); + } + + @Test + void removesGaugesWhenPartitionDisappears() { + when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0, T0P1)); + when(controlPlane.getLogInfo(anyList())).thenReturn(List.of( + GetLogInfoResponse.success(0L, 10L, 100L), + GetLogInfoResponse.success(1L, 11L, 200L) + )); + + metrics = new InklessLogMetrics(controlPlane, metadataView); + assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(200L); + + when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0)); + when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(GetLogInfoResponse.success(0L, 10L, 100L))); + metrics.run(); + + assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(100L); + assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 1)).isEqualTo(-999L); + } + + @Test + void closeRemovesAllGauges() throws Exception { + when(metadataView.getDisklessTopicPartitions()).thenReturn(Set.of(T0P0)); + when(controlPlane.getLogInfo(anyList())).thenReturn(List.of(GetLogInfoResponse.success(0L, 5L, 50L))); + + metrics = new InklessLogMetrics(controlPlane, metadataView); + assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(50L); + + metrics.close(); + metrics = null; + + assertThat(gaugeValue(LogMetricNames.SIZE, TOPIC, 0)).isEqualTo(-999L); + } +}