Skip to content

Conversation

nitsanw
Copy link
Contributor

@nitsanw nitsanw commented Sep 29, 2025

patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD

  • EPPv1 - all new code
  • Cursor compaction integration
  • JMH benchmarks for compaction and cursor impls
  • EPPv1 - New tests
  • Existing tests tweaks for new code
  • [revert?] change the default partitioner to expand testing of new code
  • [revert?] test data used some benchmarks
  • [revert?] jmh tweak GC settings for stability
  • [revert?] javadoc typos, marking unused params
  • [revert?] clarifying comment
  • [revert?] toString improvement
  • [revert?] remove spurious keywords
  • [revert?] marking metadata collection
  • [revert?] cursor verifier
  • Exclude SAI and counter column
  • Exclude BTI and legacy versions
  • Temporarily skip very long running test

Thanks for sending a pull request! Here are some tips if you're new here:

  • Ensure you have added or run the appropriate tests for your PR.
  • Be sure to keep the PR description updated to reflect all changes.
  • Write your PR title to summarize what this PR proposes.
  • If possible, provide a concise example to reproduce the issue for a faster review.
  • Read our contributor guidelines
  • If you're making a documentation change, see our guide to documentation contribution

Commit messages should follow the following format:

<One sentence description, usually Jira title or CHANGES.txt summary>

<Optional lengthier description (context on patch)>

patch by <Authors>; reviewed by <Reviewers> for CASSANDRA-#####

Co-authored-by: Name1 <email1>
Co-authored-by: Name2 <email2>

The Cassandra Jira

Nitsan Wakart and others added 2 commits August 28, 2025 14:26
patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD

- EPPv1 - all new code
- Cursor compaction integration
- JMH benchmarks for compaction and cursor impls
- EPPv1 - New tests
- Existing tests tweaks for new code
- [revert?] change the default partitioner to expand testing of new code
- [revert?] test data used some benchmarks
- [revert?] jmh tweak GC settings for stability
- [revert?] javadoc typos, marking unused params
- [revert?] clarifying comment
- [revert?] toString improvement
- [revert?] remove spurious keywords
- [revert?] marking metadata collection
- [revert?] cursor verifier
- Exclude SAI and counter column
- Exclude BTI and legacy versions
- Temporarily skip very long running test
@nitsanw nitsanw changed the title Add cursor based optimized compaction path (WIP) CASSANDRA-20918 Add cursor-based low allocation optimized compaction implementation Sep 29, 2025
@blambov blambov self-requested a review September 30, 2025 07:40
{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
public static final int MEGABYTE = 1024 * 1024 * 1024;
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this to Config.java? One advantage of having it there is that the AST tests (like SingleNodeTableWalkTest) that generate random config would be able to exercise it. That's our best path to coverage with lots of different schemas and configurations.

If you can locally do a longer run of that test with cursor-compaction enabled, that would be useful too. That would be done via overriding StatefulASTBase#clusterConfig with the new config set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.

Let's make sure that among test, test-oa and test-latest we have at least one that is running with cursor compaction and one without.

collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result));
return StatsAccumulation.unpackCellCount(result);
collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result)); // matched
return StatsAccumulation.unpackCellCount(result); // matched
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intend to keep?

return (int)skipBytes((long)n);
}

public long skipBytes(long n) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long overflow on current + n below? Would have existed before as well, I suppose. And probably exceeds max buffer size anyway.

{
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
public static final int MEGABYTE = 1024 * 1024 * 1024;
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.

Let's make sure that among test, test-oa and test-latest we have at least one that is running with cursor compaction and one without.

Copy link
Contributor

@blambov blambov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It must be stated that this approach that bundles all the steps of the processing in one single file will be quite difficult to maintain and keep in sync with the combination of iterators and transformations that we use in other parts of the code such as the query path. However, once we have reached a point of stability for a piece of functionality where we do not expect it to change significantly for a long time, it does makes sense to unpack the code and present it in a way that makes its execution as direct as possible, and this patch is a good such representation of the compaction process.

Personally, I am very unhappy about switching to mutable, pooled and reused objects, which are significantly more unwieldy and error prone, especially in contexts where concurrent access can occur. It seems this is becoming a necessity if we need to achieve acceptable performance with the current state of our heap usage, but we still need to very carefully separate the mutable versions of concepts from the immutable ones used throughout the code base. Suddenly making a DeletionTime mutable is not an acceptable change.

First batch of targeted comments below, mainly going over CompactionCursor.java.

{
private static final int MEGABYTE = 1024 * 1024;
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class);
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This property should be in CassandraRelevantProperties.

else if (e instanceof CompactionInterruptedException)
throw (CompactionInterruptedException) e;
else
throw new IllegalStateException(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this conversion addressing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defensively checking for incorrect exception types

* data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly
* an optimization).</li>
* </ul>
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the JavaDoc and explain the approach.

this.activeCompactions.beginCompaction(this); // note that CompactionTask also calls this, but CT only creates CompactionIterator with a NOOP ActiveCompactions

TableMetadata metadata = metadata();
if (!(metadata.partitioner instanceof Murmur3Partitioner))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's separate these checks, and call them from AbstractCompactionPipeline.create instead of aborting here.

throw new IllegalArgumentException("SSTableReader is not a murmur3 partitioner:" + metadata.partitioner.getClass().getCanonicalName() +" cursor compactions are only supported for Murmur3Partitioner.");

if (metadata.indexes.size() != 0)
throw new IllegalArgumentException("SAI is not supported for cursor compactions: " + metadata.indexes +".");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to any secondary index, not just SAI.

}

// Merge any common normal rows
int elementMergeLimit = partitionMergeLimit;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace "element" with "unfiltered" throughout to conform with the existing nomenclature.

}

// merge deletion/liveness
// TODO: ignoring shadowable deletions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do ignore them. They haven't been in use for a long time.

* likely at this stage (probably).
* {@link Row.Merger#merge(DeletionTime)}
*/
private boolean mergeRows(int rowMergeLimit, DeletionTime partitionDeletion, boolean isStatic, boolean isFirstElement) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partitionDeletion here is misleading, rename to active.

elementCount++;
lastClustering = sstableCursors[0].rHeader;
}
if (activeOpenRangeDeletion == DeletionTime.LIVE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would feel safer if we used null for no deletion. Since we already explicitly check for it every time, it would not be more complex than this.

Otherwise we run a (however small and carefully managed) chance of inadvertently modifying LIVE, which will have devastating consequences.

continueReadingAfterMerge(elementMergeLimit, ELEMENT_END);
}

boolean partitionWritten = isPartitionStarted();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that it would be simpler to move the decision of when to actually write the partition header to the writer. Are there any reasons to prefer to do it here?

Copy link
Contributor

@blambov blambov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next batch of comments.

{
out.writeByte(clustering.kind().ordinal());
Clustering.serializer.serialize((Clustering<?>)clustering, out, version, types);
Clustering.serializer.serialize((Clustering<?>)clustering, out, unused, types);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not quite clear if this really is unused. Do you rely on them being unused? If so, shouldn't we remove the versioning support for clusterings altogether (preferably in a separate commit)?


AbstractType<?> type = types[clusteringIndex];

boolean v1Present = (clusteringBlock1 & 0x11) == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be 0b11?

{
if (clusteringIndex % 32 == 0)
{
clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the manual offset tracking more efficient than advancing c1 and c2 with VIntCoding.readUnsignedVInt?

ofst2 += vlen2;
}
// present > not present
else if (v1Present && !v2Present)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The else case can be done as

                    // null (0b10) is smaller than empty (0b01) which is smaller than valued (0b00);
                    // compare swapped arguments to reverse the order
                    int cmp = Long.compare(clusteringBlock2 & 0b11, clusteringBlock1 & 0b11);
                    if (cmp != 0)
                        return cmp;

}
if (!UnfilteredSerializer.hasAllColumns(flags))
{
// TODO: re-implement GC free
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataStax's branch has an implementation of it.

{
long finishResult = partitionWriter.finish();
// not inclusive of last byte
long partitionLength = partitionWriter.finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not at all guaranteed to be the partition length, changing the name here is very misleading.


@Override
protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key)
protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey unused)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to rename the parameter?

}

public void updateClusteringValues(ClusteringDescriptor newClustering) {
if (newClustering == null || newClustering.clusteringKind().isBoundary())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to copy the comment from updateClusteringValuesByBoundOrBoundary to explain skipping boundaries.

Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance));
Slice coveredClustering;
if (minClusteringDescriptor.clusteringKind() != ClusteringPrefix.Kind.EXCL_START_BOUND) // min is end only if the descriptors are unused
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The minimum can certainly be EXCL_START_BOUND when it is used, if a partition starts with a range tombstone. The maximum, on the other hand, can't.

If you want to do this by a single operation (and also remove the minClusteringDescriptor.clusteringColumnsBound() == 0 check in updateClusteringValues), you can change the uninitialized min kind to SSTABLE_UPPER_BOUND, because that won't ever be given to updateClusteringValues.

}
else
{
boolean v1Null = (clusteringBlock1 & 0x10) == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, this should be 0b10.

Copy link
Contributor

@blambov blambov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next batch of comments.

return retval;
}

public static long getUnsignedVInt(byte[] input, int offset, int length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These don't appear to be used.

import org.apache.cassandra.dht.Murmur3Partitioner;
import org.jctools.util.UnsafeAccess;

public class ReusableLongToken extends Murmur3Partitioner.LongToken
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This shouldn't need to be public.

import org.apache.cassandra.utils.ByteArrayUtil;
import org.apache.cassandra.utils.ByteBufferUtil;

public class ReusableDecoratedKey extends DecoratedKey
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is pretty hacky. It shouldn't be hard to move the support for reusable tokens to the partitioner (throwing exceptions for all except Murmur and local).

private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY);

protected final long timestamp;
protected long timestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class can be easily converted to interface so that we don't have to make the base mutable.

* Base class for the sstable writers used by CQLSSTableWriter.
*/
abstract class AbstractSSTableSimpleWriter implements Closeable
public abstract class AbstractSSTableSimpleWriter implements Closeable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't need to be made public.

/** common to rows/tombstones. Call continue(); for next element, or maybe partition end */
int ELEMENT_END = 1 << 8;
/** at {@link UnfilteredSerializer#isEndOfPartition(int)} */
int PARTITION_END = 1 << 9;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what the benefit of these END states is. Why can't we advance to the next entry directly?

public interface State
{
/** start of file, after partition end but before EOF */
int PARTITION_START = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we don't know the key at partition/unfiltered/cell start leads to some odd-looking code in CompactionCursor. Is there a benefit to delay the reading of the header until it is explicitly requested?

return state;
}

private int checkNextFlags() throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller of this method appears to be pretty well aware what kind of flags/state it expects this to be called in. Would it make sense to split it into checkNext(Partition|Unfiltered|Cell)Flags?

appendBIGIndex(partitionKey, partitionKeyLength, partitionStart, headerLength, partitionDeletionTime, partitionEnd);
}

private void appendBIGIndex(byte[] key, int keyLength, long partitionStart, int headerLength, DeletionTime partitionDeletionTime, long partitionEnd) throws IOException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not easy to modify and reuse the index building code from BigFormatPartitionWriter? The duplication here seems quite unnecessary.

private final int indexBlockThreshold;


private SSTableCursorWriter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class should be split into a common SortedTableCursorWriter, with format-specific subclasses that instantiate the index builders it uses, and placed into the correct per-format packages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants