Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/DiskBoundaryManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ private static List<PartitionPosition> getDiskBoundaries(RangesAtEndpoint replic
weightedRanges.add(new Splitter.WeightedRange(1.0, r));

for (Range<Token> r : Range.sort(replicas.onlyTransient().ranges()))
weightedRanges.add(new Splitter.WeightedRange(0.1, r));
weightedRanges.add(new Splitter.WeightedRange(0.00001, r));

weightedRanges.sort(Comparator.comparing(Splitter.WeightedRange::left));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,8 +1006,7 @@ private static void mutateFullyContainedSSTables(ColumnFamilyStore cfs,
Iterator<SSTableReader> sstableIterator,
Collection<Range<Token>> ranges,
LifecycleTransaction txn,
TimeUUID sessionID,
boolean isTransient) throws IOException
TimeUUID sessionID) throws IOException
{
if (ranges.isEmpty())
return;
Expand All @@ -1017,7 +1016,7 @@ private static void mutateFullyContainedSSTables(ColumnFamilyStore cfs,
Set<SSTableReader> fullyContainedSSTables = findSSTablesToAnticompact(sstableIterator, normalizedRanges, sessionID);

cfs.metric.bytesMutatedAnticompaction.mark(SSTableReader.getTotalBytes(fullyContainedSSTables));
cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, UNREPAIRED_SSTABLE, sessionID, isTransient);
cfs.getCompactionStrategyManager().mutateRepaired(fullyContainedSSTables, UNREPAIRED_SSTABLE, sessionID);
// since we're just re-writing the sstable metdata for the fully contained sstables, we don't want
// them obsoleted when the anti-compaction is complete. So they're removed from the transaction here
txn.cancel(fullyContainedSSTables);
Expand Down Expand Up @@ -1065,8 +1064,7 @@ public void performAnticompaction(ColumnFamilyStore cfs,

Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
validateSSTableBoundsForAnticompaction(sessionID, sstables, replicas);
mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), replicas.onlyFull().ranges(), txn, sessionID, false);
mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), replicas.onlyTransient().ranges(), txn, sessionID, true);
mutateFullyContainedSSTables(cfs, validatedForRepair, sstables.iterator(), replicas.ranges(), txn, sessionID);

assert txn.originals().equals(sstables);
if (!sstables.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,7 @@ List<PendingRepairManager> getPendingRepairManagers()
* Mutates sstable repairedAt times and notifies listeners of the change with the writeLock held. Prevents races
* with other processes between when the metadata is changed and when sstables are moved between strategies.
*/
public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, TimeUUID pendingRepair, boolean isTransient) throws IOException
public void mutateRepaired(Collection<SSTableReader> sstables, long repairedAt, TimeUUID pendingRepair) throws IOException
{
if (sstables.isEmpty())
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ protected void runMayThrow() throws Exception
try
{
logger.info("Moving {} from pending to repaired with repaired at = {} and session id = {}", transaction.originals(), repairedAt, sessionID);
cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR);
completed = true;
}
finally
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/schema/KeyspaceParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ public static KeyspaceParams nts(ReplicationType replicationType, Object... args
return new KeyspaceParams(true, ReplicationParams.nts(args), FastPathStrategy.simple(), replicationType);
}

public static KeyspaceParams ntsTracked(Object... args)
{
return nts(ReplicationType.tracked, args);
}

public static KeyspaceParams nts(Object... args)
{
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -5716,7 +5716,7 @@ public List<String> mutateSSTableRepairedState(boolean repaired, boolean preview
Set<SSTableReader> result = table.runWithCompactionsDisabled(() -> {
Set<SSTableReader> sstables = table.getLiveSSTables().stream().filter(predicate).collect(Collectors.toSet());
if (!preview)
table.getCompactionStrategyManager().mutateRepaired(sstables, repairedAt, null, false);
table.getCompactionStrategyManager().mutateRepaired(sstables, repairedAt, null);
return sstables;
}, predicate, OperationType.ANTICOMPACTION, true, false, true);
sstablesTouched.addAll(result.stream().map(sst -> sst.descriptor.baseFile().name()).collect(Collectors.toList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@
import java.util.function.Function;

import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.db.*;
import org.apache.cassandra.service.reads.DataResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionIterators;
Expand Down Expand Up @@ -229,7 +226,7 @@ private SingleRangeResponse executeNormal(ReplicaPlan.ForRangeRead replicaPlan,
for (Replica replica : replicaPlan.contacts())
{
Tracing.trace("Enqueuing request to {}", replica);
Message<ReadCommand> message = command.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
Message<ReadCommand> message = rangeCommand.createMessage(trackRepairedStatus && replica.isFull(), requestTime);
readCoordinator.sendReadCommand(message, replica.endpoint(), handler);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ private void unmarkRepaired(IInvokableInstance instance, String table)
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
try
{
cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), ActiveRepairService.UNREPAIRED_SSTABLE, null);
}
catch (IOException e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void testRepairStatusChanges() throws Exception
populateSSTables(store);
assertTrue(repaired.getSSTables().isEmpty());
assertFalse(unrepaired.getSSTables().isEmpty());
mgr.mutateRepaired(store.getLiveSSTables(), FBUtilities.nowInSeconds(), null, false);
mgr.mutateRepaired(store.getLiveSSTables(), FBUtilities.nowInSeconds(), null);
assertFalse(repaired.getSSTables().isEmpty());
assertTrue(unrepaired.getSSTables().isEmpty());

Expand All @@ -249,8 +249,7 @@ public void testRepairStatusChanges() throws Exception
// mark unrepair
mgr.mutateRepaired(store.getLiveSSTables().stream().filter(s -> s.isRepaired()).collect(Collectors.toList()),
ActiveRepairService.UNREPAIRED_SSTABLE,
null,
false);
null);
assertTrue(repaired.getSSTables().isEmpty());
assertFalse(unrepaired.getSSTables().isEmpty());
}
Expand Down
13 changes: 8 additions & 5 deletions test/unit/org/apache/cassandra/db/CleanupTransientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -73,9 +74,10 @@ public class CleanupTransientTest extends CassandraTestBase
@BeforeClass
public static void setup() throws Exception
{
DatabaseDescriptor.setMutationTrackingEnabled(true);
DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
SchemaLoader.createKeyspace(KEYSPACE1,
KeyspaceParams.simple("2/1"),
KeyspaceParams.simpleWitness("2/1"),
SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD1),
SchemaLoader.compositeIndexCFMD(KEYSPACE1, CF_INDEXED1, true));

Expand Down Expand Up @@ -153,10 +155,11 @@ protected void fillCF(ColumnFamilyStore cfs, String colName, int rowsPerSSTable)
{
String key = String.valueOf(i);
// create a row and update the birthdate value, test that the index query fetches the new version
new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), ByteBufferUtil.bytes(key))
.clustering(COLUMN)
.add(colName, VALUE)
.build()
Mutation mutation = new RowUpdateBuilder(cfs.metadata(), System.currentTimeMillis(), ByteBufferUtil.bytes(key))
.clustering(COLUMN)
.add(colName, VALUE)
.build();
mutation.withMutationId(MutationTrackingService.instance.nextMutationId(cfs.metadata().keyspace, mutation.key().getToken()))
.applyUnsafe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,30 +208,6 @@ public void antiCompactOneFull() throws Exception
assertOnDiskState(store, 2);
}

@Test
public void antiCompactOneMixed() throws Exception
{
ColumnFamilyStore store = prepareColumnFamilyStore();
SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
assertEquals(3, stats.numLiveSSTables);
assertEquals(stats.pendingKeys, 4);
assertEquals(stats.transKeys, 4);
assertEquals(stats.unrepairedKeys, 2);
assertOnDiskState(store, 3);
}

@Test
public void antiCompactOneTransOnly() throws Exception
{
ColumnFamilyStore store = prepareColumnFamilyStore();
SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
assertEquals(2, stats.numLiveSSTables);
assertEquals(stats.pendingKeys, 0);
assertEquals(stats.transKeys, 4);
assertEquals(stats.unrepairedKeys, 6);
assertOnDiskState(store, 2);
}

@Test
public void antiCompactionSizeTest() throws InterruptedException, IOException, NoSuchRepairSessionException
{
Expand Down Expand Up @@ -322,48 +298,6 @@ public void antiCompactTenFull() throws IOException, NoSuchRepairSessionExceptio
assertOnDiskState(store, 10);
}

@Test
public void antiCompactTenTrans() throws IOException, NoSuchRepairSessionException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
store.disableAutoCompaction();

for (int table = 0; table < 10; table++)
{
generateSStable(store,Integer.toString(table));
}
SSTableStats stats = antiCompactRanges(store, atEndpoint(NO_RANGES, range(0, 4)));
/*
Anticompaction will be anti-compacting 10 SSTables but will be doing this two at a time
so there will be no net change in the number of sstables
*/
assertEquals(10, stats.numLiveSSTables);
assertEquals(stats.pendingKeys, 0);
assertEquals(stats.transKeys, 40);
assertEquals(stats.unrepairedKeys, 60);
assertOnDiskState(store, 10);
}

@Test
public void antiCompactTenMixed() throws IOException, NoSuchRepairSessionException
{
Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
store.disableAutoCompaction();

for (int table = 0; table < 10; table++)
{
generateSStable(store,Integer.toString(table));
}
SSTableStats stats = antiCompactRanges(store, atEndpoint(range(0, 4), range(4, 8)));
assertEquals(15, stats.numLiveSSTables);
assertEquals(stats.pendingKeys, 40);
assertEquals(stats.transKeys, 40);
assertEquals(stats.unrepairedKeys, 20);
assertOnDiskState(store, 15);
}

@Test
public void shouldMutatePendingRepair() throws InterruptedException, IOException, NoSuchRepairSessionException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ public void testSSTablesAssignedToCorrectCompactionStrategy() throws IOException
if (i % 3 == 0)
{
//make 1 third of sstables repaired
cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null, false);
cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, System.currentTimeMillis(), null);
}
else if (i % 3 == 1)
{
//make 1 third of sstables pending repair
cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, nextTimeUUID(), false);
cfs.getCompactionStrategyManager().mutateRepaired(newSSTables, 0, nextTimeUUID());
}
previousSSTables = currentSSTables;
}
Expand Down Expand Up @@ -294,7 +294,7 @@ private static void assertHolderExclusivity(boolean isRepaired, boolean isPendin
assertEquals(1, matches);
}

private static void assertInvalieHolderConfig(boolean isRepaired, boolean isPendingRepair)
private static void assertInvalidHolderConfig(boolean isRepaired, boolean isPendingRepair)
{
ColumnFamilyStore cfs = Keyspace.open(KS_PREFIX).getColumnFamilyStore(TABLE_PREFIX);
CompactionStrategyManager csm = cfs.getCompactionStrategyManager();
Expand All @@ -320,10 +320,7 @@ public void testMutualExclusiveHolderClassification() throws Exception
assertHolderExclusivity(true, false, CompactionStrategyHolder.class);
assertHolderExclusivity(false, true, PendingRepairHolder.class);
assertHolderExclusivity(false, true, PendingRepairHolder.class);
assertInvalieHolderConfig(true, true);
assertInvalieHolderConfig(true, true);
assertInvalieHolderConfig(false, false);
assertInvalieHolderConfig(true, false);
assertInvalidHolderConfig(true, true);
}

PartitionPosition forKey(int key)
Expand Down Expand Up @@ -354,8 +351,8 @@ public void groupSSTables() throws Exception
repaired.add(createSSTableWithKey(cfs.getKeyspaceName(), cfs.name, key++));
}

cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, nextTimeUUID(), false);
cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null, false);
cfs.getCompactionStrategyManager().mutateRepaired(pendingRepair, 0, nextTimeUUID());
cfs.getCompactionStrategyManager().mutateRepaired(repaired, 1000, null);

DiskBoundaries boundaries = new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(),
Lists.newArrayList(forKey(100), forKey(200), forKey(300)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public void testStopCompactionRepaired(Consumer<ColumnFamilyStore> compactionRun
}
Util.flush(cfs);
}
cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), System.currentTimeMillis(), null, false);
cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), System.currentTimeMillis(), null);
for (int i = 0; i < 5; i++)
{
for (int j = 0; j < 10; j++)
Expand Down
17 changes: 1 addition & 16 deletions test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void testMutateMetadataCSM() throws Exception
TimeUUID random = nextTimeUUID();
try
{
cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, random, false);
cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, random);
if (!sstable.descriptor.version.hasPendingRepair())
fail("We should fail setting pending repair on unsupported sstables "+sstable);
}
Expand All @@ -259,21 +259,6 @@ public void testMutateMetadataCSM() throws Exception
fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable);
}
}
// set transient
for (SSTableReader sstable : cfs.getLiveSSTables())
{
try
{
cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, nextTimeUUID(), true);
if (!sstable.descriptor.version.hasIsTransient())
fail("We should fail setting pending repair on unsupported sstables "+sstable);
}
catch (IllegalStateException e)
{
if (sstable.descriptor.version.hasIsTransient())
fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ public class SimpleStrategyTest extends CassandraTestBase
@Before
public void defineSchema()
{
DatabaseDescriptor.setMutationTrackingEnabled(true);
DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
recreateCMS();
SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1));
SchemaLoader.createKeyspace(MULTIDC, KeyspaceParams.simple(3));
DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true);
}

@Test
Expand Down Expand Up @@ -283,10 +284,7 @@ public void transientReplica() throws Exception
tokens.put(endpoints.get(3), tk(400));
tokens.forEach(ClusterMetadataTestHelper::addEndpoint);

Map<String, String> configOptions = new HashMap<String, String>();
configOptions.put(ReplicationParams.CLASS, SimpleStrategy.class.getName());
configOptions.put("replication_factor", "3/1");
SchemaLoader.createKeyspace("ks", KeyspaceParams.create(false, configOptions));
SchemaLoader.createKeyspace("ks", KeyspaceParams.simpleWitness("3/1"));
Range<Token> range1 = range(Murmur3Partitioner.MINIMUM.token, 100);

Util.assertRCEquals(EndpointsForToken.of(range1.right,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testMutateSSTableRepairedStateTableUnrepaired() throws Exception
table1.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
SchemaLoader.insertData(TEST_KEYSPACE, table1.name, 0, 1);
table1.forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
table1.getCompactionStrategyManager().mutateRepaired(table1.getLiveSSTables(), 1, null, false);
table1.getCompactionStrategyManager().mutateRepaired(table1.getLiveSSTables(), 1, null);
assertEquals(2, table1.getLiveSSTables().stream().filter(SSTableReader::isRepaired).count());

List<String> result = StorageService.instance.mutateSSTableRepairedState(false, false, TEST_KEYSPACE, Arrays.asList(table1.name));
Expand Down
Loading