diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java index c53ec5e8..be63f02d 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java @@ -22,7 +22,7 @@ import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata; import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; -import com.datastax.oss.driver.api.core.type.CqlVectorType; +import com.datastax.oss.driver.api.core.type.VectorType; import com.datastax.oss.driver.api.core.type.DataType; import com.datastax.oss.driver.api.core.type.ListType; import com.datastax.oss.driver.api.core.type.MapType; @@ -76,7 +76,7 @@ public AbstractNativeConverter(KeyspaceMetadata ksm, TableMetadata tm, List vectorValue = new ArrayList<>(); - vector.getValues().forEach(vectorValue::add); + vector.stream().forEach(vectorValue::add); genericRecordBuilder.put(fieldName, buildArrayValue(vectorSchema, vectorValue)); } } @@ -275,35 +276,35 @@ GenericRecord buildUDTValue(Schema udtSchema, UdtValue udtValue) { genericRecord.put(field.toString(), udtValue.getBigInteger(field)); break; case ProtocolConstants.DataType.LIST: { - ListType listType = (ListType) udtValue.getType(field); - List listValue = udtValue.getList(field, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()); - String path = typeName + "." + field.toString(); - Schema elementSchema = subSchemas.get(path); - log.debug("path={} elementSchema={} listType={} listValue={}", path, elementSchema, listType, listValue); - genericRecord.put(field.toString(), buildArrayValue(elementSchema, listValue)); - } - break; + ListType listType = (ListType) udtValue.getType(field); + List listValue = udtValue.getList(field, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()); + String path = typeName + "." + field.toString(); + Schema elementSchema = subSchemas.get(path); + log.debug("path={} elementSchema={} listType={} listValue={}", path, elementSchema, listType, listValue); + genericRecord.put(field.toString(), buildArrayValue(elementSchema, listValue)); + } + break; case ProtocolConstants.DataType.SET: { - SetType setType = (SetType) udtValue.getType(field); - Set setValue = udtValue.getSet(field, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()); - String path = typeName + "." + field.toString(); - Schema elementSchema = subSchemas.get(path); - log.debug("path={} elementSchema={} setType={} setValue={}", path, elementSchema, setType, setValue); - genericRecord.put(field.toString(), buildArrayValue(elementSchema, setValue)); - } - break; + SetType setType = (SetType) udtValue.getType(field); + Set setValue = udtValue.getSet(field, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()); + String path = typeName + "." + field.toString(); + Schema elementSchema = subSchemas.get(path); + log.debug("path={} elementSchema={} setType={} setValue={}", path, elementSchema, setType, setValue); + genericRecord.put(field.toString(), buildArrayValue(elementSchema, setValue)); + } + break; case ProtocolConstants.DataType.MAP: { - MapType mapType = (MapType) udtValue.getType(field); - Map mapValue = udtValue.getMap(field, - CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(), - CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()) - .entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), Map.Entry::getValue)); - String path = typeName + "." + field.toString(); - Schema valueSchema = subSchemas.get(path); - log.debug("path={} valueSchema={} mapType={} mapValue={}", path, valueSchema, mapType, mapValue); - genericRecord.put(field.toString(), mapValue); - } - break; + MapType mapType = (MapType) udtValue.getType(field); + Map mapValue = udtValue.getMap(field, + CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(), + CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()) + .entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), Map.Entry::getValue)); + String path = typeName + "." + field.toString(); + Schema valueSchema = subSchemas.get(path); + log.debug("path={} valueSchema={} mapType={} mapValue={}", path, valueSchema, mapType, mapValue); + genericRecord.put(field.toString(), mapValue); + } + break; default: log.debug("Ignoring unsupported type field name={} type={}", field, dataType.asCql(false, true)); } @@ -418,3 +419,4 @@ public IndexedRecord toRecord(CqlDuration value, Schema schema, LogicalType type } } } + diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java index b9fa0803..2b309f0b 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java @@ -63,7 +63,7 @@ public class NativeJsonConverter extends AbstractNativeConverter { private static final ObjectMapper mapper = new ObjectMapper(); - private static final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); + private static final JsonNodeFactory jsonNodeFactory = new JsonNodeFactory(true); public NativeJsonConverter(KeyspaceMetadata ksm, TableMetadata tm, List columns) { super(ksm, tm, columns); diff --git a/gradle.properties b/gradle.properties index 55098650..33b7d1b9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -8,10 +8,10 @@ releasesRepoUrl=https://repo.datastax.com/artifactory/datastax-public-releases-l # deps version avroVersion=1.11.4 -lombokVersion=1.18.20 -ossDriverVersion=4.16.0 -cassandra3Version=3.11.10 -cassandra4Version=4.0.4 +lombokVersion=1.18.38 +ossDriverVersion=4.17.0 +cassandra3Version=3.11.19 +cassandra4Version=4.0.18 dse4Version=6.8.23 pulsarGroup=org.apache.pulsar @@ -20,22 +20,22 @@ pulsarVersion=3.0.0 testPulsarImage=datastax/lunastreaming testPulsarImageTag=2.10_3.4 -kafkaVersion=3.4.0 -vavrVersion=0.10.3 +kafkaVersion=3.4.1 +vavrVersion=0.10.6 testContainersVersion=1.19.1 -caffeineVersion=2.8.8 +caffeineVersion=2.9.3 guavaVersion=30.1-jre -messagingConnectorsCommonsVersion=1.0.14 -slf4jVersion=1.7.30 +messagingConnectorsCommonsVersion=1.0.15 +slf4jVersion=1.7.36 # pulsar connector -logbackVersion=1.2.9 -jacksonBomVersion=2.13.4.20221013 -jnrVersion=3.1.15 -nettyVersion=4.1.72.Final -nettyTcNativeVersion=2.0.46.Final -commonCompressVersion=1.21 +logbackVersion=1.2.13 +jacksonBomVersion=2.13.5 +jnrVersion=3.1.20 +nettyVersion=4.1.122.Final +nettyTcNativeVersion=2.0.72.Final +commonCompressVersion=1.27.1 gsonVersion=2.8.9 -jsonVersion=20230227 +jsonVersion=20250107 # cassandra settings for docker images commitlog_sync_period_in_ms=2000 @@ -45,4 +45,4 @@ cdc_total_space_in_mb=70 dockerRepo=myrepo/ # CDC backfilling Client -dsbulkVersion=1.10.0 +dsbulkVersion=1.11.0 diff --git a/testcontainers/src/main/java/com/datastax/oss/cdc/PulsarDualNodeTests.java b/testcontainers/src/main/java/com/datastax/oss/cdc/PulsarDualNodeTests.java index 72d4ece3..c7730393 100644 --- a/testcontainers/src/main/java/com/datastax/oss/cdc/PulsarDualNodeTests.java +++ b/testcontainers/src/main/java/com/datastax/oss/cdc/PulsarDualNodeTests.java @@ -19,13 +19,7 @@ import com.datastax.testcontainers.PulsarContainer; import com.datastax.testcontainers.cassandra.CassandraContainer; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.api.schema.RecordSchemaBuilder; import org.apache.pulsar.client.api.schema.SchemaBuilder; @@ -39,14 +33,11 @@ import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; import org.testcontainers.containers.Network; +import org.testcontainers.images.builder.Transferable; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -229,10 +220,12 @@ public void testUnorderedMutations() throws InterruptedException, IOException { String pulsarServiceUrl = "pulsar://pulsar:" + pulsarContainer.BROKER_PORT; Long testId = Math.abs(AgentTestUtil.random.nextLong()); String randomDataDir = System.getProperty("buildDir") + "/data-" + testId + "-"; - try (CassandraContainer cassandraContainer1 = createCassandraContainer(1, pulsarServiceUrl, testNetwork) + try ( + CassandraContainer cassandraContainer1 = createCassandraContainer(1, pulsarServiceUrl, testNetwork) .withFileSystemBind(randomDataDir + "1", "/var/lib/cassandra"); - CassandraContainer cassandraContainer2 = createCassandraContainer(2, pulsarServiceUrl, testNetwork) - .withFileSystemBind(randomDataDir + "2", "/var/lib/cassandra")) { + CassandraContainer cassandraContainer2 = createCassandraContainer(2, pulsarServiceUrl, testNetwork) + .withFileSystemBind(randomDataDir + "2", "/var/lib/cassandra"); + ) { cassandraContainer1.start(); cassandraContainer2.start(); diff --git a/testcontainers/src/main/java/com/datastax/testcontainers/cassandra/CassandraContainer.java b/testcontainers/src/main/java/com/datastax/testcontainers/cassandra/CassandraContainer.java index 8e4c0bdb..40b054bf 100644 --- a/testcontainers/src/main/java/com/datastax/testcontainers/cassandra/CassandraContainer.java +++ b/testcontainers/src/main/java/com/datastax/testcontainers/cassandra/CassandraContainer.java @@ -30,6 +30,7 @@ import org.testcontainers.delegate.DatabaseDelegate; import org.testcontainers.ext.ScriptUtils; import org.testcontainers.ext.ScriptUtils.ScriptLoadException; +import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.DockerImageName; import org.testcontainers.utility.MountableFile;