Skip to content

Add support for logical AVRO types.  #40

@aymkhalil

Description

@aymkhalil

There are three different ways a pulsar sink can receive a logical type depending on how the message was generated. Below, I'll use C* Decimal type (BigDecimal in Java) as an example:

  1. Using Pulsar API Schema.AVRO(org.apache.pulsar.client.api.MyPojo.class) where MyPojo has a BigDecimal field. The generate output schema is:
{
        "name": "bigDecimal",
        "type": [
          "null",
          {
            "type": "string",
            "java-class": "java.math.BigDecimal"
          }
        ]
    }

The pulsar sink will fail with

Caused by: java.lang.UnsupportedOperationException: No recommended schema for decimal (scale is required)
  1. Using Native AVRO APIs (LogicalTypes.decimal(precision, scale).addToSchema(org.apache.avro.Schema.create(org.apache.avro.Schema.Type.BYTES))))) to add logical schema to bytes. This is matching the AVRO standard mentioned here. Example schema:
{
        "name": "v2",
        "type": {
          "type": "bytes",
          "logicalType": "decimal",
          "precision": 17,
          "scale": 4
        }
  1. Support Cassandra CDC logical types (new option decodeCDCDataTypes) #42 Using a logical CQL type that comes from a C* CDC source, the schema will be:
 {
              "name": "fieldName",
              "type": [
                "null",
                {
                  "type": "record",
                  "name": "cql_decimal",
                  "namespace": "",
                  "fields": [
                    {
                      "name": "bigint",
                      "type": "bytes"
                    },
                    {
                      "name": "scale",
                      "type": "int"
                    }
                  ],
                  "logicalType": "cql_decimal"
                }
              ]
            },

and this will cause the sink to fail with:

com.datastax.oss.sink.pulsar.CassandraSinkTask - Error decoding/mapping Pulsar record PulsarSinkRecord{SinkRecord(sourceRecord=PulsarRecord(topicName=Optional[persist    topicname], partition=0, message=Optional[org.apache.pulsar.client.impl.TopicMessageImpl@20958f19], schema=KeyValueSchema(SEP     ARATED,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@4d53f4e4,org.apache.pulsar.client.impl.schema.generic.GenericAvroSchema@a79d566), failFunction=org.apache.pulsar.functions.source.PulsarSource$     $Lambda$586/0x00000008407be840@2d24432d, ackFunction=org.apache.pulsar.functions.source.PulsarSource$$Lambda$585/0x00000008407bf440@21dc87f6), value=(key = "org.apache.pulsar.client.impl.schema.generic.GenericAvro     Record@e180683", value = "org.apache.pulsar.client.impl.schema.generic.Generic

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions