-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-20918 Add cursor-based low allocation optimized compaction implementation #4402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD - EPPv1 - all new code - Cursor compaction integration - JMH benchmarks for compaction and cursor impls - EPPv1 - New tests - Existing tests tweaks for new code - [revert?] change the default partitioner to expand testing of new code - [revert?] test data used some benchmarks - [revert?] jmh tweak GC settings for stability - [revert?] javadoc typos, marking unused params - [revert?] clarifying comment - [revert?] toString improvement - [revert?] remove spurious keywords - [revert?] marking metadata collection - [revert?] cursor verifier - Exclude SAI and counter column - Exclude BTI and legacy versions - Temporarily skip very long running test
{ | ||
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); | ||
public static final int MEGABYTE = 1024 * 1024 * 1024; | ||
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move this to Config.java? One advantage of having it there is that the AST tests (like SingleNodeTableWalkTest) that generate random config would be able to exercise it. That's our best path to coverage with lots of different schemas and configurations.
If you can locally do a longer run of that test with cursor-compaction enabled, that would be useful too. That would be done via overriding StatefulASTBase#clusterConfig
with the new config set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.
Let's make sure that among test
, test-oa
and test-latest
we have at least one that is running with cursor compaction and one without.
collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result)); | ||
return StatsAccumulation.unpackCellCount(result); | ||
collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result)); // matched | ||
return StatsAccumulation.unpackCellCount(result); // matched |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intend to keep?
return (int)skipBytes((long)n); | ||
} | ||
|
||
public long skipBytes(long n) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Long overflow on current + n below? Would have existed before as well, I suppose. And probably exceeds max buffer size anyway.
{ | ||
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); | ||
public static final int MEGABYTE = 1024 * 1024 * 1024; | ||
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also related to testing, we need to be running all tests both with this feature enabled as well as disabled.
Let's make sure that among test
, test-oa
and test-latest
we have at least one that is running with cursor compaction and one without.
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It must be stated that this approach that bundles all the steps of the processing in one single file will be quite difficult to maintain and keep in sync with the combination of iterators and transformations that we use in other parts of the code such as the query path. However, once we have reached a point of stability for a piece of functionality where we do not expect it to change significantly for a long time, it does makes sense to unpack the code and present it in a way that makes its execution as direct as possible, and this patch is a good such representation of the compaction process.
Personally, I am very unhappy about switching to mutable, pooled and reused objects, which are significantly more unwieldy and error prone, especially in contexts where concurrent access can occur. It seems this is becoming a necessity if we need to achieve acceptable performance with the current state of our heap usage, but we still need to very carefully separate the mutable versions of concepts from the immutable ones used throughout the code base. Suddenly making a DeletionTime
mutable is not an acceptable change.
First batch of targeted comments below, mainly going over CompactionCursor.java
.
{ | ||
private static final int MEGABYTE = 1024 * 1024; | ||
protected static final Logger logger = LoggerFactory.getLogger(CompactionTask.class); | ||
public static final boolean CURSOR_COMPACTION_ENABLED = SystemProperties.getBoolean("cassandra.enable_cursor_compaction", () -> true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This property should be in CassandraRelevantProperties
.
else if (e instanceof CompactionInterruptedException) | ||
throw (CompactionInterruptedException) e; | ||
else | ||
throw new IllegalStateException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this conversion addressing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defensively checking for incorrect exception types
* data that is not read often, so compaction "pro-actively" fix such index entries. This is mainly | ||
* an optimization).</li> | ||
* </ul> | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the JavaDoc and explain the approach.
this.activeCompactions.beginCompaction(this); // note that CompactionTask also calls this, but CT only creates CompactionIterator with a NOOP ActiveCompactions | ||
|
||
TableMetadata metadata = metadata(); | ||
if (!(metadata.partitioner instanceof Murmur3Partitioner)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's separate these checks, and call them from AbstractCompactionPipeline.create
instead of aborting here.
throw new IllegalArgumentException("SSTableReader is not a murmur3 partitioner:" + metadata.partitioner.getClass().getCanonicalName() +" cursor compactions are only supported for Murmur3Partitioner."); | ||
|
||
if (metadata.indexes.size() != 0) | ||
throw new IllegalArgumentException("SAI is not supported for cursor compactions: " + metadata.indexes +"."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This applies to any secondary index, not just SAI.
} | ||
|
||
// Merge any common normal rows | ||
int elementMergeLimit = partitionMergeLimit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace "element" with "unfiltered" throughout to conform with the existing nomenclature.
} | ||
|
||
// merge deletion/liveness | ||
// TODO: ignoring shadowable deletions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do ignore them. They haven't been in use for a long time.
* likely at this stage (probably). | ||
* {@link Row.Merger#merge(DeletionTime)} | ||
*/ | ||
private boolean mergeRows(int rowMergeLimit, DeletionTime partitionDeletion, boolean isStatic, boolean isFirstElement) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionDeletion
here is misleading, rename to active
.
elementCount++; | ||
lastClustering = sstableCursors[0].rHeader; | ||
} | ||
if (activeOpenRangeDeletion == DeletionTime.LIVE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would feel safer if we used null
for no deletion. Since we already explicitly check for it every time, it would not be more complex than this.
Otherwise we run a (however small and carefully managed) chance of inadvertently modifying LIVE
, which will have devastating consequences.
continueReadingAfterMerge(elementMergeLimit, ELEMENT_END); | ||
} | ||
|
||
boolean partitionWritten = isPartitionStarted(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that it would be simpler to move the decision of when to actually write the partition header to the writer. Are there any reasons to prefer to do it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next batch of comments.
{ | ||
out.writeByte(clustering.kind().ordinal()); | ||
Clustering.serializer.serialize((Clustering<?>)clustering, out, version, types); | ||
Clustering.serializer.serialize((Clustering<?>)clustering, out, unused, types); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not quite clear if this really is unused. Do you rely on them being unused? If so, shouldn't we remove the versioning support for clusterings altogether (preferably in a separate commit)?
|
||
AbstractType<?> type = types[clusteringIndex]; | ||
|
||
boolean v1Present = (clusteringBlock1 & 0x11) == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be 0b11
?
{ | ||
if (clusteringIndex % 32 == 0) | ||
{ | ||
clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the manual offset tracking more efficient than advancing c1
and c2
with VIntCoding.readUnsignedVInt
?
ofst2 += vlen2; | ||
} | ||
// present > not present | ||
else if (v1Present && !v2Present) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The else
case can be done as
// null (0b10) is smaller than empty (0b01) which is smaller than valued (0b00);
// compare swapped arguments to reverse the order
int cmp = Long.compare(clusteringBlock2 & 0b11, clusteringBlock1 & 0b11);
if (cmp != 0)
return cmp;
} | ||
if (!UnfilteredSerializer.hasAllColumns(flags)) | ||
{ | ||
// TODO: re-implement GC free |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataStax's branch has an implementation of it.
{ | ||
long finishResult = partitionWriter.finish(); | ||
// not inclusive of last byte | ||
long partitionLength = partitionWriter.finish(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not at all guaranteed to be the partition length, changing the name here is very misleading.
|
||
@Override | ||
protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey key) | ||
protected boolean shouldSwitchWriterInCurrentLocation(DecoratedKey unused) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to rename the parameter?
} | ||
|
||
public void updateClusteringValues(ClusteringDescriptor newClustering) { | ||
if (newClustering == null || newClustering.clusteringKind().isBoundary()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to copy the comment from updateClusteringValuesByBoundOrBoundary
to explain skipping boundaries.
Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class); | ||
components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); | ||
Slice coveredClustering; | ||
if (minClusteringDescriptor.clusteringKind() != ClusteringPrefix.Kind.EXCL_START_BOUND) // min is end only if the descriptors are unused |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The minimum can certainly be EXCL_START_BOUND
when it is used, if a partition starts with a range tombstone. The maximum, on the other hand, can't.
If you want to do this by a single operation (and also remove the minClusteringDescriptor.clusteringColumnsBound() == 0
check in updateClusteringValues
), you can change the uninitialized min kind to SSTABLE_UPPER_BOUND
, because that won't ever be given to updateClusteringValues
.
} | ||
else | ||
{ | ||
boolean v1Null = (clusteringBlock1 & 0x10) == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, this should be 0b10
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Next batch of comments.
return retval; | ||
} | ||
|
||
public static long getUnsignedVInt(byte[] input, int offset, int length) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These don't appear to be used.
import org.apache.cassandra.dht.Murmur3Partitioner; | ||
import org.jctools.util.UnsafeAccess; | ||
|
||
public class ReusableLongToken extends Murmur3Partitioner.LongToken |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: This shouldn't need to be public.
import org.apache.cassandra.utils.ByteArrayUtil; | ||
import org.apache.cassandra.utils.ByteBufferUtil; | ||
|
||
public class ReusableDecoratedKey extends DecoratedKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is pretty hacky. It shouldn't be hard to move the support for reusable tokens to the partitioner (throwing exceptions for all except Murmur and local).
private static final long UNSHARED_HEAP_SIZE = ObjectSizes.measure(EMPTY); | ||
|
||
protected final long timestamp; | ||
protected long timestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class can be easily converted to interface so that we don't have to make the base mutable.
* Base class for the sstable writers used by CQLSSTableWriter. | ||
*/ | ||
abstract class AbstractSSTableSimpleWriter implements Closeable | ||
public abstract class AbstractSSTableSimpleWriter implements Closeable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be made public.
/** common to rows/tombstones. Call continue(); for next element, or maybe partition end */ | ||
int ELEMENT_END = 1 << 8; | ||
/** at {@link UnfilteredSerializer#isEndOfPartition(int)} */ | ||
int PARTITION_END = 1 << 9; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious what the benefit of these END states is. Why can't we advance to the next entry directly?
public interface State | ||
{ | ||
/** start of file, after partition end but before EOF */ | ||
int PARTITION_START = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that we don't know the key at partition/unfiltered/cell start leads to some odd-looking code in CompactionCursor
. Is there a benefit to delay the reading of the header until it is explicitly requested?
return state; | ||
} | ||
|
||
private int checkNextFlags() throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The caller of this method appears to be pretty well aware what kind of flags/state it expects this to be called in. Would it make sense to split it into checkNext(Partition|Unfiltered|Cell)Flags
?
appendBIGIndex(partitionKey, partitionKeyLength, partitionStart, headerLength, partitionDeletionTime, partitionEnd); | ||
} | ||
|
||
private void appendBIGIndex(byte[] key, int keyLength, long partitionStart, int headerLength, DeletionTime partitionDeletionTime, long partitionEnd) throws IOException |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it not easy to modify and reuse the index building code from BigFormatPartitionWriter
? The duplication here seems quite unnecessary.
private final int indexBlockThreshold; | ||
|
||
|
||
private SSTableCursorWriter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class should be split into a common SortedTableCursorWriter
, with format-specific subclasses that instantiate the index builders it uses, and placed into the correct per-format packages.
patch by Nitsan Wakart; reviewed by TBD for CASSANDRA-TBD
Thanks for sending a pull request! Here are some tips if you're new here:
Commit messages should follow the following format:
The Cassandra Jira