Skip to content

Commit 148c18e

Browse files
authored
[connector][fix] Properly handle projection clause for partition deletes (#189)
* [connector][fix] Properly handle projection clause for partition deletes * Use a new table name for testClusteringKey() to avoid conflict with other tests * Increase CI test job timeout to 120m
1 parent cd886ce commit 148c18e

File tree

4 files changed

+133
-6
lines changed

4 files changed

+133
-6
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
needs: build
3434
name: Test
3535
runs-on: ubuntu-latest
36-
timeout-minutes: 90
36+
timeout-minutes: 120
3737
strategy:
3838
fail-fast: false
3939
matrix:

connector/src/main/java/com/datastax/oss/pulsar/source/ConverterAndQuery.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,14 @@ public class ConverterAndQuery {
6464
/**
6565
* When requesting a partition, the projection clause contains only static columns.
6666
* When requesting a wide row, the projection clause contains regular and static columns
67+
* When deleting a single row or a partition, the projection contains regular and static columns
6768
* @param whereClauseLength number of columns in the CQL where clause.
6869
* @return the projection clause
6970
*/
7071
public CqlIdentifier[] getProjectionClause(int whereClauseLength) {
71-
return primaryKeyClause.length == whereClauseLength
72+
// when primary key columns are different from where clause columns and static columns are absent, we still
73+
// need to include regular columns in the projection clause (e.g. for DELETE by partition key use cases)
74+
return primaryKeyClause.length == whereClauseLength || staticProjectionClause.length == 0
7275
? projectionClause
7376
: staticProjectionClause;
7477
}

connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import java.nio.ByteBuffer;
6969
import java.nio.charset.StandardCharsets;
7070
import java.time.Duration;
71+
import java.time.LocalDate;
7172
import java.util.Collection;
7273
import java.util.HashMap;
7374
import java.util.Iterator;
@@ -154,16 +155,27 @@ public static void initBeforeClass() throws Exception {
154155

155156
@AfterAll
156157
public static void closeAfterAll() {
157-
cassandraContainer1.close();
158-
cassandraContainer2.close();
159-
pulsarContainer.close();
158+
if (cassandraContainer1 != null) {
159+
cassandraContainer1.close();
160+
}
161+
if (cassandraContainer2 != null) {
162+
cassandraContainer2.close();
163+
}
164+
if (pulsarContainer != null) {
165+
pulsarContainer.close();
166+
}
160167
}
161168

162169
@Test
163170
public void testSinglePk() throws InterruptedException, IOException {
164171
testSinglePk("ks1");
165172
}
166173

174+
@Test
175+
public void testClusteringKey() throws InterruptedException, IOException {
176+
testClusteringKey("ks1");
177+
}
178+
167179
@Test
168180
public void testCompoundPk() throws InterruptedException, IOException {
169181
testCompoundPk("ks1");
@@ -322,6 +334,118 @@ public void testSinglePk(String ksName) throws InterruptedException, IOException
322334
}
323335
}
324336

337+
// docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table5/cassandra-source-ks1-table5-0.log
338+
public void testClusteringKey(String ksName) throws InterruptedException, IOException {
339+
try {
340+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
341+
cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
342+
" WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
343+
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table5 (pk text, c1 date, c2 uuid, val int, PRIMARY KEY (pk, c1, c2)) WITH cdc=true");
344+
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('1','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)");
345+
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('2','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)");
346+
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk, c1, c2, val) VALUES('3','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af, 1)");
347+
}
348+
deployConnector(ksName, "table5");
349+
350+
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
351+
Map<String, Integer> mutationTable5 = new HashMap<>();
352+
try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
353+
.topic(String.format(Locale.ROOT, "data-%s.table5", ksName))
354+
.subscriptionName("sub1")
355+
.subscriptionType(SubscriptionType.Key_Shared)
356+
.subscriptionMode(SubscriptionMode.Durable)
357+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
358+
.subscribe()) {
359+
Message<GenericRecord> msg;
360+
while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null &&
361+
mutationTable5.values().stream().mapToInt(i -> i).sum() < 4) {
362+
GenericRecord record = msg.getValue();
363+
assertEquals(this.schemaType, record.getSchemaType());
364+
Object key = getKey(msg);
365+
GenericRecord value = getValue(record);
366+
assertEquals((Integer) 0, mutationTable5.computeIfAbsent(getAndAssertKeyFieldAsString(key, "pk"), k -> 0));
367+
assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1"));
368+
assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2"));
369+
assertEquals(1, value.getField("val"));
370+
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
371+
consumer.acknowledge(msg);
372+
}
373+
assertEquals((Integer) 1, mutationTable5.get("1"));
374+
assertEquals((Integer) 1, mutationTable5.get("2"));
375+
assertEquals((Integer) 1, mutationTable5.get("3"));
376+
377+
// trigger a schema update
378+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
379+
cqlSession.execute("ALTER TABLE " + ksName + ".table5 ADD d double");
380+
cqlSession.execute("INSERT INTO " + ksName + ".table5 (pk,c1,c2,val,d) VALUES('1','2021-01-10', 016b123d-f732-4173-9225-c6717066c7af,1,1.0)");
381+
}
382+
while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null &&
383+
mutationTable5.values().stream().mapToInt(i -> i).sum() < 5) {
384+
GenericRecord record = msg.getValue();
385+
assertEquals(this.schemaType, record.getSchemaType());
386+
Object key = getKey(msg);
387+
GenericRecord value = getValue(record);
388+
assertEquals("1", getAndAssertKeyFieldAsString(key, "pk"));
389+
assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1"));
390+
assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2"));
391+
assertEquals(1, value.getField("val"));
392+
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
393+
consumer.acknowledge(msg);
394+
}
395+
assertEquals((Integer) 2, mutationTable5.get("1")); // 2 inserts for pk=1
396+
assertEquals((Integer) 1, mutationTable5.get("2"));
397+
assertEquals((Integer) 1, mutationTable5.get("3"));
398+
399+
// delete a row by partition key only
400+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
401+
cqlSession.execute("DELETE FROM " + ksName + ".table5 WHERE pk = '1'");
402+
}
403+
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
404+
mutationTable5.values().stream().mapToInt(i -> i).sum() < 6) {
405+
GenericRecord record = msg.getValue();
406+
assertEquals(this.schemaType, record.getSchemaType());
407+
Object key = getKey(msg);
408+
GenericRecord value = getValue(record);
409+
assertEquals("1", getAndAssertKeyFieldAsString(key, "pk"));
410+
// clustering keys are null because we deleted by partition key only
411+
assertKeyFieldIsNull(key, "c1");
412+
assertKeyFieldIsNull(key, "c2");
413+
assertNullValue(value);
414+
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
415+
consumer.acknowledge(msg);
416+
}
417+
assertEquals((Integer) 3, mutationTable5.get("1")); // 2 inserts and 1 delete for pk=1
418+
assertEquals((Integer) 1, mutationTable5.get("2"));
419+
assertEquals((Integer) 1, mutationTable5.get("3"));
420+
421+
// delete a row by partition and clustering keys
422+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
423+
cqlSession.execute("DELETE FROM " + ksName + ".table5 WHERE pk = '2' and c1 = '2021-01-10' and c2 = 016b123d-f732-4173-9225-c6717066c7af");
424+
}
425+
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
426+
mutationTable5.values().stream().mapToInt(i -> i).sum() < 6) {
427+
GenericRecord record = msg.getValue();
428+
assertEquals(this.schemaType, record.getSchemaType());
429+
Object key = getKey(msg);
430+
GenericRecord value = getValue(record);
431+
assertEquals("2", getAndAssertKeyFieldAsString(key, "pk"));
432+
assertEquals((int) LocalDate.parse("2021-01-10").toEpochDay(), getAndAssertKeyFieldAsInt(key, "c1"));
433+
assertEquals("016b123d-f732-4173-9225-c6717066c7af", getAndAssertKeyFieldAsString(key, "c2"));
434+
assertNullValue(value);
435+
mutationTable5.compute(getAndAssertKeyFieldAsString(key, "pk"), (k, v) -> v + 1);
436+
consumer.acknowledge(msg);
437+
}
438+
assertEquals((Integer) 3, mutationTable5.get("1")); // 2 inserts and 1 delete for pk=1
439+
assertEquals((Integer) 2, mutationTable5.get("2")); // 1 insert and 1 delete for pk=2
440+
assertEquals((Integer) 1, mutationTable5.get("3"));
441+
}
442+
}
443+
} finally {
444+
dumpFunctionLogs("cassandra-source-" + ksName + "-table5");
445+
undeployConnector(ksName, "table5");
446+
}
447+
}
448+
325449
// docker exec -it pulsar cat /pulsar/logs/functions/public/default/cassandra-source-ks1-table2/cassandra-source-ks1-table2-0.log
326450
public void testCompoundPk(String ksName) throws InterruptedException, IOException {
327451
try {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ testPulsarImageTag=2.10_3.4
2222

2323
kafkaVersion=3.4.0
2424
vavrVersion=0.10.3
25-
testContainersVersion=1.16.2
25+
testContainersVersion=1.19.1
2626
caffeineVersion=2.8.8
2727
guavaVersion=30.1-jre
2828
messagingConnectorsCommonsVersion=1.0.14

0 commit comments

Comments
 (0)