-
Notifications
You must be signed in to change notification settings - Fork 3.8k
CASSANDRA-20918 Add cursor-based low allocation optimized compaction implementation #4402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,14 +26,17 @@ | |
import com.google.common.base.Joiner; | ||
import com.google.common.collect.ImmutableList; | ||
|
||
import org.apache.cassandra.io.sstable.ClusteringDescriptor; | ||
import org.apache.cassandra.db.marshal.AbstractType; | ||
import org.apache.cassandra.db.marshal.ByteBufferAccessor; | ||
import org.apache.cassandra.db.marshal.ValueAccessor; | ||
import org.apache.cassandra.db.rows.Row; | ||
import org.apache.cassandra.db.marshal.AbstractType; | ||
import org.apache.cassandra.serializers.MarshalException; | ||
|
||
import org.apache.cassandra.io.sstable.IndexInfo; | ||
import org.apache.cassandra.serializers.MarshalException; | ||
import org.apache.cassandra.utils.ByteBufferUtil; | ||
import org.apache.cassandra.utils.bytecomparable.ByteComparable; | ||
import org.apache.cassandra.utils.bytecomparable.ByteSource; | ||
import org.apache.cassandra.utils.vint.VIntCoding; | ||
|
||
import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED; | ||
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT; | ||
|
@@ -156,6 +159,126 @@ public <V1, V2> int compare(ClusteringPrefix<V1> c1, ClusteringPrefix<V2> c2) | |
return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering; | ||
} | ||
|
||
public static int compare(ClusteringDescriptor c1, ClusteringDescriptor c2) | ||
{ | ||
final int c1Size = c1.clusteringColumnsBound(); | ||
final int c2Size = c2.clusteringColumnsBound(); | ||
final int minColumns = Math.min(c1Size, c2Size); | ||
|
||
final int cmp = compare(c1.clusteringTypes(), c1.clusteringBuffer(), c2.clusteringBuffer(), minColumns); | ||
if (cmp != 0) | ||
return cmp; | ||
|
||
final ClusteringPrefix.Kind c1Kind = c1.clusteringKind(); | ||
final ClusteringPrefix.Kind c2Kind = c2.clusteringKind(); | ||
if (c1Size == c2Size) | ||
{ | ||
return ClusteringPrefix.Kind.compare(c1Kind, c2Kind); | ||
} | ||
|
||
return c1Size < c2Size ? c1Kind.comparedToClustering : -c2Kind.comparedToClustering; | ||
} | ||
|
||
public static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2) { | ||
return compare(types, c1, c2, types.length); | ||
} | ||
|
||
private static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2, int size) | ||
{ | ||
long clusteringBlock1 = 0; | ||
long clusteringBlock2 = 0; | ||
final int position1 = c1.position(); | ||
final int position2 = c2.position(); | ||
final int limit1 = c1.limit(); | ||
final int limit2 = c2.limit(); | ||
try | ||
{ | ||
int ofst1 = position1; | ||
int ofst2 = position2; | ||
for (int clusteringIndex = 0; clusteringIndex < size; clusteringIndex++) | ||
{ | ||
if (clusteringIndex % 32 == 0) | ||
{ | ||
clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1); | ||
ofst1 += VIntCoding.computeUnsignedVIntSize(clusteringBlock1); | ||
clusteringBlock2 = VIntCoding.getUnsignedVInt(c2, ofst2, limit2); | ||
ofst2 += VIntCoding.computeUnsignedVIntSize(clusteringBlock2); | ||
} | ||
|
||
AbstractType<?> type = types[clusteringIndex]; | ||
|
||
boolean v1Present = (clusteringBlock1 & 0x11) == 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be |
||
boolean v2Present = (clusteringBlock2 & 0x11) == 0; | ||
|
||
if (v1Present && v2Present) | ||
{ | ||
boolean isByteOrderComparable = type.isByteOrderComparable; | ||
int vlen1,vlen2; | ||
if (type.isValueLengthFixed()) | ||
{ | ||
vlen1 = vlen2 = type.valueLengthIfFixed(); | ||
} | ||
else | ||
{ | ||
vlen1 = VIntCoding.getUnsignedVInt32(c1, ofst1, limit1); | ||
ofst1 += VIntCoding.computeUnsignedVIntSize(vlen1); | ||
vlen2 = VIntCoding.getUnsignedVInt32(c2, ofst2, limit2); | ||
ofst2 += VIntCoding.computeUnsignedVIntSize(vlen2); | ||
} | ||
int v1Limit = ofst1 + vlen1; | ||
if (v1Limit > limit1) | ||
throw new IllegalArgumentException("Value limit exceeds buffer limit."); | ||
c1.position(ofst1).limit(v1Limit); | ||
int v2Limit = ofst2 + vlen2; | ||
if (v2Limit > limit2) | ||
throw new IllegalArgumentException("Value limit exceeds buffer limit."); | ||
c2.position(ofst2).limit(v2Limit); | ||
int cmp = isByteOrderComparable ? | ||
ByteBufferUtil.compareUnsigned(c1, c2) : | ||
type.compareCustom(c1, ByteBufferAccessor.instance, c2, ByteBufferAccessor.instance); | ||
if (cmp != 0) | ||
return cmp; | ||
c1.limit(limit1); | ||
c2.limit(limit2); | ||
ofst1 += vlen1; | ||
ofst2 += vlen2; | ||
} | ||
// present > not present | ||
else if (v1Present && !v2Present) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: The
|
||
{ | ||
return 1; | ||
} | ||
else if (!v1Present && v2Present) | ||
{ | ||
return -1; | ||
} | ||
else | ||
{ | ||
boolean v1Null = (clusteringBlock1 & 0x10) == 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise, this should be |
||
boolean v2Null = (clusteringBlock2 & 0x10) == 0; | ||
// empty > null | ||
if (!v1Null && v2Null) | ||
{ | ||
return 1; | ||
} | ||
else if (v1Null && !v2Null) | ||
{ | ||
return -1; | ||
} | ||
// empty == empty, continue... | ||
} | ||
clusteringBlock1 = clusteringBlock1 >>> 2; | ||
clusteringBlock2 = clusteringBlock2 >>> 2; | ||
} | ||
} | ||
finally | ||
{ | ||
c1.position(position1).limit(limit1); | ||
c2.position(position2).limit(limit2); | ||
} | ||
return 0; | ||
} | ||
|
||
public <V1, V2> int compare(Clustering<V1> c1, Clustering<V2> c2) | ||
{ | ||
return compare(c1, c2, size()); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -425,18 +425,18 @@ public default String clusteringString(List<AbstractType<?>> types) | |
|
||
public static class Serializer | ||
{ | ||
public void serialize(ClusteringPrefix<?> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException | ||
public void serialize(ClusteringPrefix<?> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException | ||
{ | ||
// We shouldn't serialize static clusterings | ||
assert clustering.kind() != Kind.STATIC_CLUSTERING; | ||
if (clustering.kind() == Kind.CLUSTERING) | ||
{ | ||
out.writeByte(clustering.kind().ordinal()); | ||
Clustering.serializer.serialize((Clustering<?>)clustering, out, version, types); | ||
Clustering.serializer.serialize((Clustering<?>)clustering, out, unused, types); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not quite clear if this really is unused. Do you rely on them being unused? If so, shouldn't we remove the versioning support for clusterings altogether (preferably in a separate commit)? |
||
} | ||
else | ||
{ | ||
ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary<?>)clustering, out, version, types); | ||
ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary<?>)clustering, out, unused, types); | ||
} | ||
} | ||
|
||
|
@@ -462,17 +462,17 @@ public ClusteringPrefix<byte[]> deserialize(DataInputPlus in, int version, List< | |
return ClusteringBoundOrBoundary.serializer.deserializeValues(in, kind, version, types); | ||
} | ||
|
||
public long serializedSize(ClusteringPrefix<?> clustering, int version, List<AbstractType<?>> types) | ||
public long serializedSize(ClusteringPrefix<?> clustering, int unused, List<AbstractType<?>> types) | ||
{ | ||
// We shouldn't serialize static clusterings | ||
assert clustering.kind() != Kind.STATIC_CLUSTERING; | ||
if (clustering.kind() == Kind.CLUSTERING) | ||
return 1 + Clustering.serializer.serializedSize((Clustering<?>)clustering, version, types); | ||
return 1 + Clustering.serializer.serializedSize((Clustering<?>)clustering, unused, types); | ||
else | ||
return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary<?>)clustering, version, types); | ||
return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary<?>)clustering, unused, types); | ||
} | ||
|
||
<V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException | ||
<V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException | ||
{ | ||
int offset = 0; | ||
int clusteringSize = clustering.size(); | ||
|
@@ -496,7 +496,7 @@ <V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPl | |
} | ||
} | ||
|
||
<V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int version, List<AbstractType<?>> types) | ||
<V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int unused, List<AbstractType<?>> types) | ||
{ | ||
long result = 0; | ||
int offset = 0; | ||
|
@@ -519,7 +519,7 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver | |
return result; | ||
} | ||
|
||
byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException | ||
public byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException | ||
{ | ||
// Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway). | ||
assert size > 0; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the manual offset tracking more efficient than advancing
c1
andc2
withVIntCoding.readUnsignedVInt
?