Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/java/org/apache/cassandra/db/Clustering.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,16 @@ public String toString(TableMetadata metadata)
/**
* Serializer for Clustering object.
* <p>
* Because every clustering in a given table must have the same size (ant that size cannot actually change once the table
* Because every clustering in a given table must have the same size (and that size cannot actually change once the table
* has been defined), we don't record that size.
*/
public static class Serializer
{
public void serialize(Clustering<?> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
public void serialize(Clustering<?> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
assert clustering != STATIC_CLUSTERING : "We should never serialize a static clustering";
assert clustering.size() == types.size() : "Invalid clustering for the table: " + clustering;
ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, version, types);
ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, out, unused, types);
}

public ByteBuffer serialize(Clustering<?> clustering, int version, List<AbstractType<?>> types)
Expand All @@ -158,9 +158,9 @@ public ByteBuffer serialize(Clustering<?> clustering, int version, List<Abstract
}
}

public long serializedSize(Clustering<?> clustering, int version, List<AbstractType<?>> types)
public long serializedSize(Clustering<?> clustering, int unused, List<AbstractType<?>> types)
{
return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, version, types);
return ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, unused, types);
}

public void skip(DataInputPlus in, int version, List<AbstractType<?>> types) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ default String toString(ClusteringComparator comparator)

public static class Serializer
{
public <T> void serialize(ClusteringBoundOrBoundary<T> bound, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
public <T> void serialize(ClusteringBoundOrBoundary<T> bound, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
out.writeByte(bound.kind().ordinal());
out.writeShort(bound.size());
ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, version, types);
ClusteringPrefix.serializer.serializeValuesWithoutSize(bound, out, unused, types);
}

public <T> long serializedSize(ClusteringBoundOrBoundary<T> bound, int version, List<AbstractType<?>> types)
Expand Down
129 changes: 126 additions & 3 deletions src/java/org/apache/cassandra/db/ClusteringComparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;

import org.apache.cassandra.io.sstable.ClusteringDescriptor;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.serializers.MarshalException;

import org.apache.cassandra.io.sstable.IndexInfo;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
import org.apache.cassandra.utils.vint.VIntCoding;

import static org.apache.cassandra.utils.bytecomparable.ByteSource.EXCLUDED;
import static org.apache.cassandra.utils.bytecomparable.ByteSource.NEXT_COMPONENT;
Expand Down Expand Up @@ -156,6 +159,126 @@ public <V1, V2> int compare(ClusteringPrefix<V1> c1, ClusteringPrefix<V2> c2)
return s1 < s2 ? c1.kind().comparedToClustering : -c2.kind().comparedToClustering;
}

public static int compare(ClusteringDescriptor c1, ClusteringDescriptor c2)
{
final int c1Size = c1.clusteringColumnsBound();
final int c2Size = c2.clusteringColumnsBound();
final int minColumns = Math.min(c1Size, c2Size);

final int cmp = compare(c1.clusteringTypes(), c1.clusteringBuffer(), c2.clusteringBuffer(), minColumns);
if (cmp != 0)
return cmp;

final ClusteringPrefix.Kind c1Kind = c1.clusteringKind();
final ClusteringPrefix.Kind c2Kind = c2.clusteringKind();
if (c1Size == c2Size)
{
return ClusteringPrefix.Kind.compare(c1Kind, c2Kind);
}

return c1Size < c2Size ? c1Kind.comparedToClustering : -c2Kind.comparedToClustering;
}

public static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2) {
return compare(types, c1, c2, types.length);
}

private static int compare(AbstractType<?>[] types, ByteBuffer c1, ByteBuffer c2, int size)
{
long clusteringBlock1 = 0;
long clusteringBlock2 = 0;
final int position1 = c1.position();
final int position2 = c2.position();
final int limit1 = c1.limit();
final int limit2 = c2.limit();
try
{
int ofst1 = position1;
int ofst2 = position2;
for (int clusteringIndex = 0; clusteringIndex < size; clusteringIndex++)
{
if (clusteringIndex % 32 == 0)
{
clusteringBlock1 = VIntCoding.getUnsignedVInt(c1, ofst1, limit1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the manual offset tracking more efficient than advancing c1 and c2 with VIntCoding.readUnsignedVInt?

ofst1 += VIntCoding.computeUnsignedVIntSize(clusteringBlock1);
clusteringBlock2 = VIntCoding.getUnsignedVInt(c2, ofst2, limit2);
ofst2 += VIntCoding.computeUnsignedVIntSize(clusteringBlock2);
}

AbstractType<?> type = types[clusteringIndex];

boolean v1Present = (clusteringBlock1 & 0x11) == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be 0b11?

boolean v2Present = (clusteringBlock2 & 0x11) == 0;

if (v1Present && v2Present)
{
boolean isByteOrderComparable = type.isByteOrderComparable;
int vlen1,vlen2;
if (type.isValueLengthFixed())
{
vlen1 = vlen2 = type.valueLengthIfFixed();
}
else
{
vlen1 = VIntCoding.getUnsignedVInt32(c1, ofst1, limit1);
ofst1 += VIntCoding.computeUnsignedVIntSize(vlen1);
vlen2 = VIntCoding.getUnsignedVInt32(c2, ofst2, limit2);
ofst2 += VIntCoding.computeUnsignedVIntSize(vlen2);
}
int v1Limit = ofst1 + vlen1;
if (v1Limit > limit1)
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
c1.position(ofst1).limit(v1Limit);
int v2Limit = ofst2 + vlen2;
if (v2Limit > limit2)
throw new IllegalArgumentException("Value limit exceeds buffer limit.");
c2.position(ofst2).limit(v2Limit);
int cmp = isByteOrderComparable ?
ByteBufferUtil.compareUnsigned(c1, c2) :
type.compareCustom(c1, ByteBufferAccessor.instance, c2, ByteBufferAccessor.instance);
if (cmp != 0)
return cmp;
c1.limit(limit1);
c2.limit(limit2);
ofst1 += vlen1;
ofst2 += vlen2;
}
// present > not present
else if (v1Present && !v2Present)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The else case can be done as

                    // null (0b10) is smaller than empty (0b01) which is smaller than valued (0b00);
                    // compare swapped arguments to reverse the order
                    int cmp = Long.compare(clusteringBlock2 & 0b11, clusteringBlock1 & 0b11);
                    if (cmp != 0)
                        return cmp;

{
return 1;
}
else if (!v1Present && v2Present)
{
return -1;
}
else
{
boolean v1Null = (clusteringBlock1 & 0x10) == 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, this should be 0b10.

boolean v2Null = (clusteringBlock2 & 0x10) == 0;
// empty > null
if (!v1Null && v2Null)
{
return 1;
}
else if (v1Null && !v2Null)
{
return -1;
}
// empty == empty, continue...
}
clusteringBlock1 = clusteringBlock1 >>> 2;
clusteringBlock2 = clusteringBlock2 >>> 2;
}
}
finally
{
c1.position(position1).limit(limit1);
c2.position(position2).limit(limit2);
}
return 0;
}

public <V1, V2> int compare(Clustering<V1> c1, Clustering<V2> c2)
{
return compare(c1, c2, size());
Expand Down
18 changes: 9 additions & 9 deletions src/java/org/apache/cassandra/db/ClusteringPrefix.java
Original file line number Diff line number Diff line change
Expand Up @@ -425,18 +425,18 @@ public default String clusteringString(List<AbstractType<?>> types)

public static class Serializer
{
public void serialize(ClusteringPrefix<?> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
public void serialize(ClusteringPrefix<?> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
// We shouldn't serialize static clusterings
assert clustering.kind() != Kind.STATIC_CLUSTERING;
if (clustering.kind() == Kind.CLUSTERING)
{
out.writeByte(clustering.kind().ordinal());
Clustering.serializer.serialize((Clustering<?>)clustering, out, version, types);
Clustering.serializer.serialize((Clustering<?>)clustering, out, unused, types);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not quite clear if this really is unused. Do you rely on them being unused? If so, shouldn't we remove the versioning support for clusterings altogether (preferably in a separate commit)?

}
else
{
ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary<?>)clustering, out, version, types);
ClusteringBoundOrBoundary.serializer.serialize((ClusteringBoundOrBoundary<?>)clustering, out, unused, types);
}
}

Expand All @@ -462,17 +462,17 @@ public ClusteringPrefix<byte[]> deserialize(DataInputPlus in, int version, List<
return ClusteringBoundOrBoundary.serializer.deserializeValues(in, kind, version, types);
}

public long serializedSize(ClusteringPrefix<?> clustering, int version, List<AbstractType<?>> types)
public long serializedSize(ClusteringPrefix<?> clustering, int unused, List<AbstractType<?>> types)
{
// We shouldn't serialize static clusterings
assert clustering.kind() != Kind.STATIC_CLUSTERING;
if (clustering.kind() == Kind.CLUSTERING)
return 1 + Clustering.serializer.serializedSize((Clustering<?>)clustering, version, types);
return 1 + Clustering.serializer.serializedSize((Clustering<?>)clustering, unused, types);
else
return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary<?>)clustering, version, types);
return ClusteringBoundOrBoundary.serializer.serializedSize((ClusteringBoundOrBoundary<?>)clustering, unused, types);
}

<V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPlus out, int version, List<AbstractType<?>> types) throws IOException
<V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPlus out, int unused, List<AbstractType<?>> types) throws IOException
{
int offset = 0;
int clusteringSize = clustering.size();
Expand All @@ -496,7 +496,7 @@ <V> void serializeValuesWithoutSize(ClusteringPrefix<V> clustering, DataOutputPl
}
}

<V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int version, List<AbstractType<?>> types)
<V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int unused, List<AbstractType<?>> types)
{
long result = 0;
int offset = 0;
Expand All @@ -519,7 +519,7 @@ <V> long valuesWithoutSizeSerializedSize(ClusteringPrefix<V> clustering, int ver
return result;
}

byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
public byte[][] deserializeValuesWithoutSize(DataInputPlus in, int size, int version, List<AbstractType<?>> types) throws IOException
{
// Callers of this method should handle the case where size = 0 (in all case we want to return a special value anyway).
assert size > 0;
Expand Down
5 changes: 5 additions & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2323,6 +2323,11 @@ public boolean shouldIgnoreGcGraceForKey(DecoratedKey dk)
return partitionKeySetIgnoreGcGrace.contains(dk);
}

public boolean shouldIgnoreGcGraceForAnyKey()
{
return !partitionKeySetIgnoreGcGrace.isEmpty();
}

public static Iterable<ColumnFamilyStore> all()
{
List<Iterable<ColumnFamilyStore>> stores = new ArrayList<>(Schema.instance.getKeyspaces().size());
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.ColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.UnfilteredSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
Expand Down Expand Up @@ -529,6 +531,7 @@ public void serializeSubset(Collection<ColumnMetadata> columns, Columns superset
int supersetCount = superset.size();
if (columnCount == supersetCount)
{
/** This is prevented by caller for row serialization: {@link UnfilteredSerializer#serializeRowBody(Row, int, SerializationHelper, DataOutputPlus)}*/
out.writeUnsignedVInt32(0);
}
else if (supersetCount < 64)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/DecoratedKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public ByteSource asComparableBytes(Version version)
// The OSS50 version avoids this by adding a terminator.
return ByteSource.withTerminatorMaybeLegacy(version,
ByteSource.END_OF_STREAM,
token.asComparableBytes(version),
getToken().asComparableBytes(version),
keyComparableBytes(version));
}

Expand All @@ -127,7 +127,7 @@ public ByteComparable asComparableBound(boolean before)

return ByteSource.withTerminator(
before ? ByteSource.LT_NEXT_COMPONENT : ByteSource.GT_NEXT_COMPONENT,
token.asComparableBytes(version),
getToken().asComparableBytes(version),
keyComparableBytes(version));
};
}
Expand Down
8 changes: 4 additions & 4 deletions src/java/org/apache/cassandra/db/DeletionPurger.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

public interface DeletionPurger
{
public static final DeletionPurger PURGE_ALL = (ts, ldt) -> true;
DeletionPurger PURGE_ALL = (ts, ldt) -> true;

public boolean shouldPurge(long timestamp, long localDeletionTime);
boolean shouldPurge(long timestamp, long localDeletionTime);

public default boolean shouldPurge(DeletionTime dt)
default boolean shouldPurge(DeletionTime dt)
{
return !dt.isLive() && shouldPurge(dt.markedForDeleteAt(), dt.localDeletionTime());
}

public default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
default boolean shouldPurge(LivenessInfo liveness, long nowInSec)
{
return !liveness.isLive(nowInSec) && shouldPurge(liveness.timestamp(), liveness.localExpirationTime());
}
Expand Down
Loading