From 390097fd4e6ef7c4cdd15664ca44a97cae47e480 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Wed, 28 May 2025 13:40:00 +0200 Subject: [PATCH] obsolete expired SSTables before compaction starts --- .../db/compaction/CompactionTask.java | 1 + .../db/lifecycle/LifecycleTransaction.java | 2 +- .../cassandra/io/sstable/SSTableRewriter.java | 19 ++ .../db/compaction/CompactionTaskTest.java | 203 +++++++++++++++++- 4 files changed, 223 insertions(+), 2 deletions(-) diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 15f317b5656f..7336c4543a8a 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -188,6 +188,7 @@ protected final void runMayThrow() throws Exception if (!fullyExpiredSSTables.isEmpty()) { logger.debug("Compaction {} dropping expired sstables: {}", transaction.opIdString(), fullyExpiredSSTables); + fullyExpiredSSTables.forEach(transaction::obsolete); actuallyCompact.removeAll(fullyExpiredSSTables); } diff --git a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java index b6accd98783a..582621c3c672 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LifecycleTransaction.java @@ -375,7 +375,7 @@ public void checkpoint() } private Throwable checkpoint(Throwable accumulate) { - logger.trace("Checkpointing staged {}", staged); + logger.info("Checkpointing staged {}", staged); if (staged.isEmpty()) return accumulate; diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java index 35b23101902d..ae597e4888e9 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java @@ -23,6 +23,9 @@ import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DecoratedKey; @@ -133,8 +136,11 @@ public void forEachWriter(Consumer op) op.accept(writer); } + private static final Logger logger = LoggerFactory.getLogger(SSTableRewriter.class); + public AbstractRowIndexEntry append(UnfilteredRowIterator partition) { + logger.info("APPENDING"); // we do this before appending to ensure we can resetAndTruncate() safely if appending fails DecoratedKey key = partition.partitionKey(); maybeReopenEarly(key); @@ -158,26 +164,34 @@ public AbstractRowIndexEntry tryAppend(UnfilteredRowIterator partition) private void maybeReopenEarly(DecoratedKey key) { + logger.info("IN maybeReopenEarly - BEFORE IF"); if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval) { + logger.info("IN IF AFTER writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval"); if (transaction.isOffline()) { + logger.info("IN IF - TRANSACTION OFFLINE"); for (SSTableReader reader : transaction.originals()) { + logger.info("IN IF - TRANSACTION OFFLINE - IN FOR {}", reader.descriptor.baseFile()); reader.trySkipFileCacheBefore(key); } } else { writer.setMaxDataAge(maxAge); + logger.info("BEFORE OPENING EARLY"); writer.openEarly(reader -> { transaction.update(reader, false); currentlyOpenedEarlyAt = writer.getFilePointer(); moveStarts(reader.getLast()); + logger.info("IN IF - TRANSACTION ONLINE - CALLING CHECKPOINT"); transaction.checkpoint(); }); + logger.info("AFTER OPENING EARLY"); } } + logger.info("IN maybeReopenEarly - AFTER IF"); } protected Throwable doAbort(Throwable accumulate) @@ -265,6 +279,7 @@ public void switchWriter(SSTableWriter newWriter) } writer = newWriter; + logger.info("RETURNING FROM switchWriter PREMATURELY"); return; } @@ -277,7 +292,9 @@ public void switchWriter(SSTableWriter newWriter) SSTableReader reader = writer.openFinalEarly(); transaction.update(reader, false); moveStarts(reader.getLast()); + logger.info("BEFORE CHECKPOINTING IN switchWriter"); transaction.checkpoint(); + logger.info("AFTER CHECKPOINTING IN switchWriter"); } currentlyOpenedEarlyAt = 0; @@ -340,7 +357,9 @@ protected void doPrepare() transaction.update(reader, false); preparedForCommit.add(reader); } + logger.info("BEFORE CHECKPOINTING IN doPrepare"); transaction.checkpoint(); + logger.info("AFTER CHECKPOINTING IN doPrepare"); if (throwLate) throw new RuntimeException("exception thrown after all sstables finished, for testing"); diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java index a69bb5ff3455..4830c334b1ed 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java @@ -21,24 +21,37 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; +import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter; +import org.apache.cassandra.db.lifecycle.ILifecycleTransaction; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction; import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.io.sstable.SSTableRewriter; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; @@ -46,6 +59,7 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.TimeUUID; +import org.apache.cassandra.utils.concurrent.Refs; import org.apache.cassandra.utils.concurrent.Transactional; import static java.lang.String.format; @@ -57,13 +71,18 @@ public class CompactionTaskTest private static TableMetadata cfm; private static ColumnFamilyStore cfs; + private static TableMetadata gcGraceCfm; + private static ColumnFamilyStore gcGraceCfs; + @BeforeClass public static void setUpClass() throws Exception { SchemaLoader.prepareServer(); cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "ks").build(); - SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), cfm); + gcGraceCfm = CreateTableStatement.parse("CREATE TABLE tbl2 (k INT PRIMARY KEY, col1 INT, col2 INT, col3 INT, data TEXT) WITH gc_grace_seconds = 0", "ks").build(); + SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), cfm, gcGraceCfm); cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + gcGraceCfs = Schema.instance.getColumnFamilyStoreInstance(gcGraceCfm.id); } @Before @@ -71,6 +90,9 @@ public void setUp() throws Exception { cfs.getCompactionStrategyManager().enable(); cfs.truncateBlocking(); + + gcGraceCfs.getCompactionStrategyManager().enable(); + gcGraceCfs.truncateBlocking(); } @Test @@ -142,6 +164,185 @@ public void compactionInterruption() throws Exception Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state()); } + /** + * Test that even some SSTables are fully expired, we can still select and reference them + * while they are part of compaction. + * + * @see toBeObsolete = new HashSet<>(gcGraceCfs.getLiveSSTables()); + Assert.assertEquals(2, toBeObsolete.size()); + + // SSTable 3 (not fully expired) + for (int k = 0; k < numKeys; k++) + { + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, data) VALUES (?, 1, ?) USING TIMESTAMP 4 AND TTL 1", k, data); + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, data) VALUES (?, 1, ?) USING TIMESTAMP 7 AND TTL 1", k, data); + } + Util.flush(gcGraceCfs); + + // SSTable 4 (not fully expired - col3 has longer TTL) + for (int k = 0; k < numKeys; k++) + { + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, data) VALUES (?, 1, ?) USING TIMESTAMP 6 AND TTL 3", k, data); + QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, data) VALUES (?, 1, ?) USING TIMESTAMP 8 AND TTL 1", k, data); + } + Util.flush(gcGraceCfs); + + Set sstables = gcGraceCfs.getLiveSSTables(); + Assert.assertEquals(4, sstables.size()); + + // Enable preemptive opening so that SSTableRewriter.switchWriter() calls checkpoint(). + int originalPreemptiveOpenInterval = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB(); + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(2); + + // collector of stacktraces to later check if checkpoint was called in switchWriter + final List stacks = new ArrayList<>(); + + try + { + Logger logger = LoggerFactory.getLogger(CompactionTaskTest.class); + AtomicInteger checkpointCount = new AtomicInteger(0); + + // Hook into transaction's checkpoint and commit methods to verify that after + // checkpointing, all SSTables (including fully expired ones) remain referenceable. + ILifecycleTransaction txn = new WrappedLifecycleTransaction(gcGraceCfs.getTracker().tryModify(sstables, OperationType.COMPACTION)) + { + @Override + public void obsoleteOriginals() + { + logger.info("OBSOLETE ORIGINALS"); + super.obsoleteOriginals(); + } + + @Override + public void checkpoint() + { + stacks.add(Thread.currentThread().getStackTrace()); + + logger.info("IN CHECKPOINT #{}", checkpointCount.get() + 1); + + for (SSTableReader r : toBeObsolete) + Assert.assertTrue(this.isObsolete(r)); + + assertAllSSTablesAreReferenceable(); + + super.checkpoint(); + + // This is the critical assertion: after checkpoint(), all SSTables in the + // CANONICAL view must still be referenceable. Before the fix, fully expired + // SSTables would lose their references here, causing selectAndReference() to + // spin loop (CASSANDRA-19776). + assertAllSSTablesAreReferenceable(); + + checkpointCount.incrementAndGet(); + } + + @Override + public Throwable commit(Throwable accumulate) + { + logger.info("IN COMMIT"); + + assertAllSSTablesAreReferenceable(); + Throwable commit = super.commit(accumulate); + + return commit; + } + + private void assertAllSSTablesAreReferenceable() + { + // This simulates what EstimatedPartitionCount metric and similar code paths do. + // It is crucial that tryRef does not return null; a null result means some SSTables + // are not referenceable, which would cause selectAndReference() to spin loop. + ColumnFamilyStore.ViewFragment view = gcGraceCfs.select(View.selectFunction(SSTableSet.CANONICAL)); + Refs refs = Refs.tryRef(view.sstables); + Assert.assertNotNull("Some SSTables in CANONICAL view are not referenceable (CASSANDRA-19776)", refs); + refs.close(); + } + }; + + // Use MaxSSTableSizeWriter with a small max size (2 MiB) to force output sstable switches during compaction. + long maxSSTableSize = 2L * 1024 * 1024; // 2 MiB + CompactionTask task = new CompactionTask(gcGraceCfs, txn, FBUtilities.nowInSeconds() + 2) + { + @Override + public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, + Directories directories, + ILifecycleTransaction transaction, + Set nonExpiredSSTables) + { + return new MaxSSTableSizeWriter(cfs, directories, transaction, nonExpiredSSTables, + maxSSTableSize, 0); + } + }; + + try (CompactionController compactionController = task.getCompactionController(task.inputSSTables())) + { + Set fullyExpiredSSTables = compactionController.getFullyExpiredSSTables(); + Assert.assertEquals(2, fullyExpiredSSTables.size()); + task.execute(null); + } + + // Verify that checkpoint was called more than once, proving that output sstable switching + // happened during compaction. Without MaxSSTableSizeWriter and large enough SSTables, + // checkpoint would only be called at the end, which would not exercise the CASSANDRA-19776 fix. + Assert.assertTrue("Expected checkpoint() to be called more than once during compaction, but was called " + + checkpointCount.get() + " time(s). Output sstable switching did not occur.", + checkpointCount.get() > 1); + } + finally + { + DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(originalPreemptiveOpenInterval); + } + + Assert.assertFalse(stacks.isEmpty()); + + boolean checkpointCalledInSwitchWriter = false; + + for (int i = 0; i < stacks.size(); i++) + { + for (StackTraceElement element : stacks.get(i)) + { + if (element.getClassName().equals(SSTableRewriter.class.getName()) + && element.getMethodName().equals("switchWriter")) + { + checkpointCalledInSwitchWriter = true; + break; + } + } + } + + Assert.assertTrue(checkpointCalledInSwitchWriter); + } + private static void mutateRepaired(SSTableReader sstable, long repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException { sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);