Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ protected final void runMayThrow() throws Exception
if (!fullyExpiredSSTables.isEmpty())
{
logger.debug("Compaction {} dropping expired sstables: {}", transaction.opIdString(), fullyExpiredSSTables);
fullyExpiredSSTables.forEach(transaction::obsolete);
actuallyCompact.removeAll(fullyExpiredSSTables);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ public void checkpoint()
}
private Throwable checkpoint(Throwable accumulate)
{
logger.trace("Checkpointing staged {}", staged);
logger.info("Checkpointing staged {}", staged);

if (staged.isEmpty())
return accumulate;
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
Expand Down Expand Up @@ -133,8 +136,11 @@ public void forEachWriter(Consumer<SSTableWriter> op)
op.accept(writer);
}

private static final Logger logger = LoggerFactory.getLogger(SSTableRewriter.class);

public AbstractRowIndexEntry append(UnfilteredRowIterator partition)
{
logger.info("APPENDING");
// we do this before appending to ensure we can resetAndTruncate() safely if appending fails
DecoratedKey key = partition.partitionKey();
maybeReopenEarly(key);
Expand All @@ -158,26 +164,34 @@ public AbstractRowIndexEntry tryAppend(UnfilteredRowIterator partition)

private void maybeReopenEarly(DecoratedKey key)
{
logger.info("IN maybeReopenEarly - BEFORE IF");
if (writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval)
{
logger.info("IN IF AFTER writer.getFilePointer() - currentlyOpenedEarlyAt > preemptiveOpenInterval");
if (transaction.isOffline())
{
logger.info("IN IF - TRANSACTION OFFLINE");
for (SSTableReader reader : transaction.originals())
{
logger.info("IN IF - TRANSACTION OFFLINE - IN FOR {}", reader.descriptor.baseFile());
reader.trySkipFileCacheBefore(key);
}
}
else
{
writer.setMaxDataAge(maxAge);
logger.info("BEFORE OPENING EARLY");
writer.openEarly(reader -> {
transaction.update(reader, false);
currentlyOpenedEarlyAt = writer.getFilePointer();
moveStarts(reader.getLast());
logger.info("IN IF - TRANSACTION ONLINE - CALLING CHECKPOINT");
transaction.checkpoint();
});
logger.info("AFTER OPENING EARLY");
}
}
logger.info("IN maybeReopenEarly - AFTER IF");
}

protected Throwable doAbort(Throwable accumulate)
Expand Down Expand Up @@ -265,6 +279,7 @@ public void switchWriter(SSTableWriter newWriter)
}
writer = newWriter;

logger.info("RETURNING FROM switchWriter PREMATURELY");
return;
}

Expand All @@ -277,7 +292,9 @@ public void switchWriter(SSTableWriter newWriter)
SSTableReader reader = writer.openFinalEarly();
transaction.update(reader, false);
moveStarts(reader.getLast());
logger.info("BEFORE CHECKPOINTING IN switchWriter");
transaction.checkpoint();
logger.info("AFTER CHECKPOINTING IN switchWriter");
}

currentlyOpenedEarlyAt = 0;
Expand Down Expand Up @@ -340,7 +357,9 @@ protected void doPrepare()
transaction.update(reader, false);
preparedForCommit.add(reader);
}
logger.info("BEFORE CHECKPOINTING IN doPrepare");
transaction.checkpoint();
logger.info("AFTER CHECKPOINTING IN doPrepare");

if (throwLate)
throw new RuntimeException("exception thrown after all sstables finished, for testing");
Expand Down
203 changes: 202 additions & 1 deletion test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,45 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.db.lifecycle.WrappedLifecycleTransaction;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.io.sstable.SSTableRewriter;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.utils.concurrent.Transactional;

import static java.lang.String.format;
Expand All @@ -57,20 +71,28 @@ public class CompactionTaskTest
private static TableMetadata cfm;
private static ColumnFamilyStore cfs;

private static TableMetadata gcGraceCfm;
private static ColumnFamilyStore gcGraceCfs;

@BeforeClass
public static void setUpClass() throws Exception
{
SchemaLoader.prepareServer();
cfm = CreateTableStatement.parse("CREATE TABLE tbl (k INT PRIMARY KEY, v INT)", "ks").build();
SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), cfm);
gcGraceCfm = CreateTableStatement.parse("CREATE TABLE tbl2 (k INT PRIMARY KEY, col1 INT, col2 INT, col3 INT, data TEXT) WITH gc_grace_seconds = 0", "ks").build();
SchemaLoader.createKeyspace("ks", KeyspaceParams.simple(1), cfm, gcGraceCfm);
cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id);
gcGraceCfs = Schema.instance.getColumnFamilyStoreInstance(gcGraceCfm.id);
}

@Before
public void setUp() throws Exception
{
cfs.getCompactionStrategyManager().enable();
cfs.truncateBlocking();

gcGraceCfs.getCompactionStrategyManager().enable();
gcGraceCfs.truncateBlocking();
}

@Test
Expand Down Expand Up @@ -142,6 +164,185 @@ public void compactionInterruption() throws Exception
Assert.assertEquals(Transactional.AbstractTransactional.State.ABORTED, txn.state());
}

/**
* Test that even some SSTables are fully expired, we can still select and reference them
* while they are part of compaction.
*
* @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19776>CASSANDRA-19776</a>
*/
@Test
public void testFullyExpiredSSTablesAreNotReleasedPrematurely()
{
Assert.assertEquals(0, gcGraceCfs.getLiveSSTables().size());
gcGraceCfs.getCompactionStrategyManager().disable();

// Use large SSTables (10+ MiB) so that switching to new output SSTables happens during
// compaction. Without large enough SSTables, the output fits in one SSTable and no switch happens
int numKeys = 5000;
String data = "x".repeat(2048); // ~2KB padding per row

// Similar technique to get fully expired SSTables as in TTLExpiryTest#testAggressiveFullyExpired
// SSTable 1 (will be fully expired - superseded by SSTable 2)
for (int k = 0; k < numKeys; k++)
{
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, data) VALUES (?, 1, ?) USING TIMESTAMP 1 AND TTL 1", k, data);
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, data) VALUES (?, 1, ?) USING TIMESTAMP 3 AND TTL 1", k, data);
}
Util.flush(gcGraceCfs);

// SSTable 2 (will be fully expired - superseded by SSTables 3 and 4)
for (int k = 0; k < numKeys; k++)
{
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, data) VALUES (?, 1, ?) USING TIMESTAMP 2 AND TTL 1", k, data);
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, data) VALUES (?, 1, ?) USING TIMESTAMP 5 AND TTL 1", k, data);
}
Util.flush(gcGraceCfs);

Set<SSTableReader> toBeObsolete = new HashSet<>(gcGraceCfs.getLiveSSTables());
Assert.assertEquals(2, toBeObsolete.size());

// SSTable 3 (not fully expired)
for (int k = 0; k < numKeys; k++)
{
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col1, data) VALUES (?, 1, ?) USING TIMESTAMP 4 AND TTL 1", k, data);
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, data) VALUES (?, 1, ?) USING TIMESTAMP 7 AND TTL 1", k, data);
}
Util.flush(gcGraceCfs);

// SSTable 4 (not fully expired - col3 has longer TTL)
for (int k = 0; k < numKeys; k++)
{
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col3, data) VALUES (?, 1, ?) USING TIMESTAMP 6 AND TTL 3", k, data);
QueryProcessor.executeInternal("INSERT INTO ks.tbl2 (k, col2, data) VALUES (?, 1, ?) USING TIMESTAMP 8 AND TTL 1", k, data);
}
Util.flush(gcGraceCfs);

Set<SSTableReader> sstables = gcGraceCfs.getLiveSSTables();
Assert.assertEquals(4, sstables.size());

// Enable preemptive opening so that SSTableRewriter.switchWriter() calls checkpoint().
int originalPreemptiveOpenInterval = DatabaseDescriptor.getSSTablePreemptiveOpenIntervalInMiB();
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(2);

// collector of stacktraces to later check if checkpoint was called in switchWriter
final List<StackTraceElement[]> stacks = new ArrayList<>();

try
{
Logger logger = LoggerFactory.getLogger(CompactionTaskTest.class);
AtomicInteger checkpointCount = new AtomicInteger(0);

// Hook into transaction's checkpoint and commit methods to verify that after
// checkpointing, all SSTables (including fully expired ones) remain referenceable.
ILifecycleTransaction txn = new WrappedLifecycleTransaction(gcGraceCfs.getTracker().tryModify(sstables, OperationType.COMPACTION))
{
@Override
public void obsoleteOriginals()
{
logger.info("OBSOLETE ORIGINALS");
super.obsoleteOriginals();
}

@Override
public void checkpoint()
{
stacks.add(Thread.currentThread().getStackTrace());

logger.info("IN CHECKPOINT #{}", checkpointCount.get() + 1);

for (SSTableReader r : toBeObsolete)
Assert.assertTrue(this.isObsolete(r));

assertAllSSTablesAreReferenceable();

super.checkpoint();

// This is the critical assertion: after checkpoint(), all SSTables in the
// CANONICAL view must still be referenceable. Before the fix, fully expired
// SSTables would lose their references here, causing selectAndReference() to
// spin loop (CASSANDRA-19776).
assertAllSSTablesAreReferenceable();

checkpointCount.incrementAndGet();
}

@Override
public Throwable commit(Throwable accumulate)
{
logger.info("IN COMMIT");

assertAllSSTablesAreReferenceable();
Throwable commit = super.commit(accumulate);

return commit;
}

private void assertAllSSTablesAreReferenceable()
{
// This simulates what EstimatedPartitionCount metric and similar code paths do.
// It is crucial that tryRef does not return null; a null result means some SSTables
// are not referenceable, which would cause selectAndReference() to spin loop.
ColumnFamilyStore.ViewFragment view = gcGraceCfs.select(View.selectFunction(SSTableSet.CANONICAL));
Refs<SSTableReader> refs = Refs.tryRef(view.sstables);
Assert.assertNotNull("Some SSTables in CANONICAL view are not referenceable (CASSANDRA-19776)", refs);
refs.close();
}
};

// Use MaxSSTableSizeWriter with a small max size (2 MiB) to force output sstable switches during compaction.
long maxSSTableSize = 2L * 1024 * 1024; // 2 MiB
CompactionTask task = new CompactionTask(gcGraceCfs, txn, FBUtilities.nowInSeconds() + 2)
{
@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
Directories directories,
ILifecycleTransaction transaction,
Set<SSTableReader> nonExpiredSSTables)
{
return new MaxSSTableSizeWriter(cfs, directories, transaction, nonExpiredSSTables,
maxSSTableSize, 0);
}
};

try (CompactionController compactionController = task.getCompactionController(task.inputSSTables()))
{
Set<SSTableReader> fullyExpiredSSTables = compactionController.getFullyExpiredSSTables();
Assert.assertEquals(2, fullyExpiredSSTables.size());
task.execute(null);
}

// Verify that checkpoint was called more than once, proving that output sstable switching
// happened during compaction. Without MaxSSTableSizeWriter and large enough SSTables,
// checkpoint would only be called at the end, which would not exercise the CASSANDRA-19776 fix.
Assert.assertTrue("Expected checkpoint() to be called more than once during compaction, but was called "
+ checkpointCount.get() + " time(s). Output sstable switching did not occur.",
checkpointCount.get() > 1);
}
finally
{
DatabaseDescriptor.setSSTablePreemptiveOpenIntervalInMiB(originalPreemptiveOpenInterval);
}

Assert.assertFalse(stacks.isEmpty());

boolean checkpointCalledInSwitchWriter = false;

for (int i = 0; i < stacks.size(); i++)
{
for (StackTraceElement element : stacks.get(i))
{
if (element.getClassName().equals(SSTableRewriter.class.getName())
&& element.getMethodName().equals("switchWriter"))
{
checkpointCalledInSwitchWriter = true;
break;
}
}
}

Assert.assertTrue(checkpointCalledInSwitchWriter);
}

private static void mutateRepaired(SSTableReader sstable, long repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException
{
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
Expand Down