-
Notifications
You must be signed in to change notification settings - Fork 10
Open
Description
There are three different ways a pulsar sink can receive a logical type depending on how the message was generated. Below, I'll use C* Decimal type (BigDecimal in Java) as an example:
- Using Pulsar API
Schema.AVRO(org.apache.pulsar.client.api.MyPojo.class)
where MyPojo has aBigDecimal
field. The generate output schema is:
{
"name": "bigDecimal",
"type": [
"null",
{
"type": "string",
"java-class": "java.math.BigDecimal"
}
]
}
The pulsar sink will fail with
Caused by: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required)
- Using Native AVRO APIs (
LogicalTypes.decimal(precision, scale).addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES))))
) to add logical schema to bytes. This is matching the AVRO standard mentioned here. Example schema:
{
"name": "v2",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 17,
"scale": 4
}
- Support Cassandra CDC logical types (new option decodeCDCDataTypes) #42 Using a logical CQL type that comes from a C* CDC source, the schema will be:
{
"name": "fieldName",
"type": [
"null",
{
"type": "record",
"name": "cql_decimal",
"namespace": "",
"fields": [
{
"name": "bigint",
"type": "bytes"
},
{
"name": "scale",
"type": "int"
}
],
"logicalType": "cql_decimal"
}
]
},
and this will cause the sink to fail with:
com.datastax.oss.sink.pulsar.CassandraSinkTask - Error decoding/mapping Pulsar record PulsarSinkRecord{SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persist topicname], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@20958f19], schema=KeyValueSchema(SEP ARATED,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@4d53f4e4,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@a79d566), failFunction=org.apache.pulsar.functions.source.PulsarSource$ $Lambda$586/0x00000008407be840@2d24432d, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$585/0x00000008407bf440@21dc87f6), value=(key = "org.apache.pulsar.client.impl.schema.generic.GenericAvro Record@e180683", value = "org.apache.pulsar.client.impl.schema.generic.Generic
Metadata
Metadata
Assignees
Labels
No labels