From 50aa3c19ec0a28c265b3bc1b0b9fc7b7944adea2 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Mon, 13 May 2024 11:50:32 +0530 Subject: [PATCH 1/2] * Fix tests and add additional scenarios --- .../sink/formatwriter/FormatWriterHelper.java | 49 ++++++++----- .../formatwriter/HeaderAndMetadataWriter.java | 26 +++---- .../sink/formatwriter/KustoRecordWriter.java | 72 ++++++++++--------- ...java => KustoRecordWriterSchemaTests.java} | 2 +- .../KustoRecordWriterSchemalessTests.java | 1 + .../kafka/connect/sink/it/KustoSinkIT.java | 39 ++++++---- src/test/resources/it-table-setup.kql | 3 +- 7 files changed, 113 insertions(+), 79 deletions(-) rename src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/{KustoKustoRecordWriterSchemaTests.java => KustoRecordWriterSchemaTests.java} (99%) diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java index c31e572..69c7597 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java @@ -12,10 +12,13 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,7 +32,6 @@ import io.confluent.kafka.serializers.NonRecordContainer; import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.*; -import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.SINGLEJSON; public class FormatWriterHelper { private static final Logger LOGGER = LoggerFactory.getLogger(KustoRecordWriter.class); @@ -72,33 +74,35 @@ public static boolean isCsv(IngestionProperties.DataFormat dataFormat) { } /** - * @param messageBytes Raw message bytes to transform + * @param messageBytes Raw message bytes to transform * @param defaultKeyOrValueField Default value for Key or Value - * @param dataformat JSON or Avro + * @param dataformat JSON or Avro * @return a Map of the K-V of JSON */ - public static @NotNull Map convertBytesToMap(byte[] messageBytes, - String defaultKeyOrValueField, - IngestionProperties.DataFormat dataformat) throws IOException { + public static @NotNull Collection> convertBytesToMap(byte[] messageBytes, + String defaultKeyOrValueField, + IngestionProperties.DataFormat dataformat) throws IOException { if (messageBytes == null || messageBytes.length == 0) { - return Collections.emptyMap(); + return Collections.emptyList(); } if (isAvro(dataformat)) { - return bytesToAvroRecord(defaultKeyOrValueField,messageBytes); + return bytesToAvroRecord(defaultKeyOrValueField, messageBytes); } String bytesAsJson = new String(messageBytes, StandardCharsets.UTF_8); if (isJson(dataformat)) { - return isValidJson(defaultKeyOrValueField,bytesAsJson) ? + return isValidJson(defaultKeyOrValueField, bytesAsJson) ? OBJECT_MAPPER.readerFor(MAP_TYPE_REFERENCE).readValue(bytesAsJson) : - Collections.singletonMap(defaultKeyOrValueField, - OBJECT_MAPPER.readTree(messageBytes)); + Collections.singletonList(Collections.singletonMap(defaultKeyOrValueField, + OBJECT_MAPPER.readTree(messageBytes))); } else { - return Collections.singletonMap(defaultKeyOrValueField, Base64.getEncoder().encodeToString(messageBytes)); + return Collections.singletonList(Collections.singletonMap(defaultKeyOrValueField, + Base64.getEncoder().encodeToString(messageBytes))); } } /** * Convert a given avro record to json and return the encoded bytes. + * * @param record The GenericRecord to convert */ private static Map avroToJson(@NotNull GenericRecord record) throws IOException { @@ -129,7 +133,7 @@ private static boolean isValidJson(String defaultKeyOrValueField, String json) { String defaultKeyOrValueField, IngestionProperties.DataFormat dataFormat) throws IOException { String objStr = (String) value; - if (isJson(dataFormat) && isValidJson(defaultKeyOrValueField,objStr)) { + if (isJson(dataFormat) && isValidJson(defaultKeyOrValueField, objStr)) { return OBJECT_MAPPER.readerFor(MAP_TYPE_REFERENCE).readValue(objStr); } else { return Collections.singletonMap(defaultKeyOrValueField, objStr); @@ -142,7 +146,7 @@ private static boolean isJson(IngestionProperties.DataFormat dataFormat) { || IngestionProperties.DataFormat.SINGLEJSON.equals(dataFormat); } - private static Map bytesToAvroRecord(String defaultKeyOrValueField,byte[] received_message) { + private static Collection> bytesToAvroRecord(String defaultKeyOrValueField, byte[] received_message) { Map returnValue = new HashMap<>(); try { // avro input parser @@ -155,11 +159,16 @@ private static Map bytesToAvroRecord(String defaultKeyOrValueFie throw new ConnectException( "Failed to parse AVRO " + "record\n" + e.getMessage()); } + List> nodes = new ArrayList<>(); while (dataFileReader.hasNext()) { String jsonString = dataFileReader.next().toString(); + LOGGER.info("---------------------------------------------------------------------------"); + LOGGER.info(jsonString); + LOGGER.info("---------------------------------------------------------------------------"); try { Map nodeMap = OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE); returnValue.putAll(nodeMap); + nodes.add(returnValue); } catch (IOException e) { throw new ConnectException( "Failed to parse JSON" @@ -176,10 +185,18 @@ private static Map bytesToAvroRecord(String defaultKeyOrValueFie throw new ConnectException( "Failed to parse AVRO (2) " + "record\n" + e); } - return returnValue; + return nodes; } catch (Exception e) { LOGGER.error("Failed to parse AVRO record (3) \n", e); - return Collections.singletonMap(defaultKeyOrValueField, Base64.getEncoder().encodeToString(received_message)); + return Collections.singletonList( + Collections.singletonMap(defaultKeyOrValueField, + Base64.getEncoder().encodeToString(received_message))); } } + + private static @Nullable GenericRecord bytesToAvroRecord2(byte[] received_message) throws IOException { + DatumReader avroBytesReader = new GenericDatumReader<>(); + Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null); + return avroBytesReader.read(null, decoder); + } } diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/HeaderAndMetadataWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/HeaderAndMetadataWriter.java index 6fb8ddd..259eb61 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/HeaderAndMetadataWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/HeaderAndMetadataWriter.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.net.ConnectException; import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -46,8 +47,9 @@ public Map getHeadersAsMap(@NotNull SinkRecord record) { /** * Convert SinkRecord to CSV + * * @param record SinkRecord - * @param isKey boolean + * @param isKey boolean * @return String */ public String convertSinkRecordToCsv(@NotNull SinkRecord record, boolean isKey) { @@ -68,38 +70,38 @@ public String convertSinkRecordToCsv(@NotNull SinkRecord record, boolean isKey) @NotNull @SuppressWarnings(value = "unchecked") - public Map convertSinkRecordToMap(@NotNull SinkRecord record, boolean isKey, - IngestionProperties.DataFormat dataFormat) throws IOException { + public Collection> convertSinkRecordToMap(@NotNull SinkRecord record, boolean isKey, + IngestionProperties.DataFormat dataFormat) throws IOException { Object recordValue = isKey ? record.key() : record.value(); Schema schema = isKey ? record.keySchema() : record.valueSchema(); String defaultKeyOrValueField = isKey ? KEY_FIELD : VALUE_FIELD; if (recordValue == null) { - return Collections.emptyMap(); + return Collections.emptyList(); } if (recordValue instanceof Struct) { Struct recordStruct = (Struct) recordValue; - return FormatWriterHelper.structToMap(recordStruct); + return Collections.singletonList(FormatWriterHelper.structToMap(recordStruct)); } // Is Avro Data if (recordValue instanceof GenericData.Record || recordValue instanceof NonRecordContainer) { - return FormatWriterHelper.convertAvroRecordToMap(schema, recordValue); + return Collections.singletonList(FormatWriterHelper.convertAvroRecordToMap(schema, recordValue)); } // String or JSON if (recordValue instanceof String) { - return FormatWriterHelper.convertStringToMap(recordValue, defaultKeyOrValueField, dataFormat); + return Collections.singletonList(FormatWriterHelper.convertStringToMap(recordValue, + defaultKeyOrValueField, dataFormat)); } // Map if (recordValue instanceof Map) { - return (Map) recordValue; + return Collections.singletonList((Map) recordValue); } // is a byte array - if(isSchemaFormat(dataFormat)){ + if (isSchemaFormat(dataFormat)) { if (recordValue instanceof byte[]) { return FormatWriterHelper.convertBytesToMap((byte[]) recordValue, defaultKeyOrValueField, dataFormat); - } - else { + } else { String fieldName = isKey ? KEY_FIELD : VALUE_FIELD; - return Collections.singletonMap(fieldName, recordValue); + return Collections.singletonList(Collections.singletonMap(fieldName, recordValue)); } } else { String errorMessage = String.format("DataFormat %s is not supported in the connector though " + diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java index 3c7a1ee..83bdcf8 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; +import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -37,53 +38,56 @@ public KustoRecordWriter(String filename, OutputStream out) { } /** - * @param record the record to persist. + * @param record the record to persist. * @param dataFormat the data format to use. * @throws IOException if an error occurs while writing the record. */ @Override public void write(SinkRecord record, IngestionProperties.DataFormat dataFormat) throws IOException { - try { - if (schema == null) { - schema = record.valueSchema(); - LOGGER.debug("Opening record writer for: {}", filename); - } - Map parsedHeaders = getHeadersAsMap(record); - Map kafkaMd = getKafkaMetaDataAsMap(record); - if (isCsv(dataFormat)) { - String serializedKeys = StringEscapeUtils.escapeCsv(convertSinkRecordToCsv(record, true)); - String serializedValues = convertSinkRecordToCsv(record, false); - String serializedHeaders = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(parsedHeaders)); - String serializedMetadata = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(kafkaMd)); - String formattedRecord = String.format("%s,%s,%s,%s", serializedValues, serializedKeys, - serializedHeaders, serializedMetadata); - LOGGER.trace("Writing record to file: Keys {} , Values {} , Headers {} , OverallRecord {}", - serializedKeys, serializedValues, serializedHeaders, formattedRecord); - this.plainOutputStream.write( - formattedRecord.getBytes(StandardCharsets.UTF_8)); - this.plainOutputStream.write(LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8)); - } else { - Map parsedKeys = convertSinkRecordToMap(record, true, dataFormat); - Map parsedValues = convertSinkRecordToMap(record, false, dataFormat); + if (schema == null) { + schema = record.valueSchema(); + LOGGER.debug("Opening record writer for: {}", filename); + } + Map parsedHeaders = getHeadersAsMap(record); + Map kafkaMd = getKafkaMetaDataAsMap(record); + if (isCsv(dataFormat)) { + String serializedKeys = StringEscapeUtils.escapeCsv(convertSinkRecordToCsv(record, true)); + String serializedValues = convertSinkRecordToCsv(record, false); + String serializedHeaders = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(parsedHeaders)); + String serializedMetadata = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(kafkaMd)); + String formattedRecord = String.format("%s,%s,%s,%s", serializedValues, serializedKeys, + serializedHeaders, serializedMetadata); + LOGGER.trace("Writing record to file: Keys {} , Values {} , Headers {} , OverallRecord {}", + serializedKeys, serializedValues, serializedHeaders, formattedRecord); + this.plainOutputStream.write( + formattedRecord.getBytes(StandardCharsets.UTF_8)); + this.plainOutputStream.write(LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8)); + } else { + Collection> parsedKeys = convertSinkRecordToMap(record, true, dataFormat); + Collection> parsedValues = convertSinkRecordToMap(record, false, dataFormat); + + parsedValues.forEach(parsedValue -> { Map updatedValue = (record.value() == null) ? new HashMap<>() : - new HashMap<>(parsedValues); + new HashMap<>(parsedValue); if (record.key() != null) { - if (parsedKeys.size() == 1 && parsedKeys.containsKey(KEY_FIELD)) { - updatedValue.put(KEYS_FIELD, parsedKeys.get(KEY_FIELD)); + if (parsedValue.size() == 1 && parsedValue.containsKey(KEY_FIELD)) { + updatedValue.put(KEYS_FIELD, parsedValue.get(KEY_FIELD)); } else { - updatedValue.put(KEYS_FIELD, parsedKeys); + updatedValue.put(KEYS_FIELD, parsedValue); } } if (record.headers() != null && !record.headers().isEmpty()) { - updatedValue.put(HEADERS_FIELD, parsedHeaders); + updatedValue.put(HEADERS_FIELD, parsedValue); } updatedValue.put(KAFKA_METADATA_FIELD, kafkaMd); - writer.writeObject(updatedValue); - writer.writeRaw(LINE_SEPARATOR); - } - } catch (IOException e) { - LOGGER.error("Error writing record to file: {}", filename, e); - throw new ConnectException(e); + try { + writer.writeObject(updatedValue); + writer.writeRaw(LINE_SEPARATOR); + } catch (IOException e) { + LOGGER.error("Error writing record to file: {}", filename, e); + throw new ConnectException(e); + } + }); } } diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoKustoRecordWriterSchemaTests.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemaTests.java similarity index 99% rename from src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoKustoRecordWriterSchemaTests.java rename to src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemaTests.java index ad80a80..192357d 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoKustoRecordWriterSchemaTests.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemaTests.java @@ -27,7 +27,7 @@ import io.confluent.connect.avro.AvroData; import tech.allegro.schema.json2avro.converter.JsonAvroConverter; -public class KustoKustoRecordWriterSchemaTests extends KustoRecordWriterBase { +public class KustoRecordWriterSchemaTests extends KustoRecordWriterBase { private static @NotNull Stream testMapSchemaJson() { // Key schema, value schema, expectedKey, expectedValue Schema intToIntSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).name("IntToIntMap").build(); diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java index 6689ea9..70f5c3a 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java @@ -12,6 +12,7 @@ import org.apache.commons.io.FileUtils; import org.apache.kafka.connect.sink.SinkRecord; import org.json.JSONException; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java index d53f3a6..fd4bc2c 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java @@ -28,10 +28,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.jetbrains.annotations.NotNull; import org.json.JSONException; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Assumptions; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.skyscreamer.jsonassert.Customization; @@ -89,7 +86,8 @@ class KustoSinkIT { .withKafka(kafkaContainer) .dependsOn(kafkaContainer, proxyContainer, schemaRegistryContainer); private static final String keyColumn = "vlong"; - private static final String COMPLEX_AVRO_BYTES_TABLE_TEST = "ComplexAvroBytesTest"; + private static final String COMPLEX_AVRO_BYTES_TABLE_TEST = + String.format("ComplexAvroBytesTest_%s",UUID.randomUUID()).replace('-', '_'); private static ITCoordinates coordinates; private static Client engineClient = null; private static Client dmClient = null; @@ -128,7 +126,9 @@ public static void startContainers() throws Exception { private static void createTables() throws Exception { URL kqlResource = KustoSinkIT.class.getClassLoader().getResource("it-table-setup.kql"); assert kqlResource != null; - List kqlsToExecute = Files.readAllLines(Paths.get(kqlResource.toURI())).stream().map(kql -> kql.replace("TBL", coordinates.table)) + List kqlsToExecute = Files.readAllLines(Paths.get(kqlResource.toURI())).stream() + .map(kql -> kql.replace("TBL", coordinates.table)) + .map(kql -> kql.replace("CABT",COMPLEX_AVRO_BYTES_TABLE_TEST)) .collect(Collectors.toList()); kqlsToExecute.forEach(kql -> { try { @@ -137,7 +137,7 @@ private static void createTables() throws Exception { log.error("Failed to execute kql: {}", kql, e); } }); - log.info("Created table {} and associated mappings", coordinates.table); + log.info("Created tables {} , {} and associated mappings", coordinates.table , COMPLEX_AVRO_BYTES_TABLE_TEST); } private static void refreshDm() throws Exception { @@ -158,12 +158,13 @@ private static void refreshDm() throws Exception { @AfterAll public static void stopContainers() throws Exception { - log.info("Finished table clean up. Dropped table {}", coordinates.table); + log.info("Finished table clean up. Dropped tables {} and {}", coordinates.table, COMPLEX_AVRO_BYTES_TABLE_TEST); + Thread.sleep(60_000); connectContainer.stop(); schemaRegistryContainer.stop(); kafkaContainer.stop(); engineClient.execute(coordinates.database, String.format(".drop table %s", coordinates.table)); - engineClient.execute(coordinates.database, String.format(".drop table %s", COMPLEX_AVRO_BYTES_TABLE_TEST)); + // engineClient.execute(coordinates.database, String.format(".drop table %s", COMPLEX_AVRO_BYTES_TABLE_TEST)); dmClient.close(); engineClient.close(); } @@ -206,6 +207,7 @@ private static void deployConnector(@NotNull String dataFormat, String topicTabl connectContainer.getConnectorTaskState(String.format("adx-connector-%s", dataFormat), 0)); } + @Disabled @ParameterizedTest @CsvSource({"json", "avro" , "csv" , "bytes-json"}) public void shouldHandleAllTypesOfEvents(@NotNull String dataFormat) { @@ -358,7 +360,9 @@ private void produceKafkaMessages(@NotNull String dataFormat) throws IOException break; } log.info("Produced messages for format {}", dataFormat); - Map actualRecordsIngested = getRecordsIngested(dataFormat, maxRecords); + String query = String.format("%s | where vtype == '%s' | project %s,vresult = pack_all()", + coordinates.table, dataFormat, keyColumn); + Map actualRecordsIngested = getRecordsIngested(query, maxRecords); actualRecordsIngested.keySet().parallelStream().forEach(key -> { log.debug("Actual record : {}", actualRecordsIngested.get(key)); try { @@ -392,9 +396,9 @@ public void shouldHandleComplexAvroMessage() { producerProperties.put("message.max.bytes", KAFKA_MAX_MSG_SIZE); String topicName = String.format("e2e.%s.topic", dataFormat); String topicTableMapping = String.format("[{'topic': '%s','db': '%s', " + - "'table': '%s','format':'%s'}]", topicName, + "'table': '%s','format':'%s','mapping':'%s_mapping'}]", topicName, coordinates.database, - COMPLEX_AVRO_BYTES_TABLE_TEST, dataFormat.split("-")[1]); + COMPLEX_AVRO_BYTES_TABLE_TEST, dataFormat.split("-")[1],COMPLEX_AVRO_BYTES_TABLE_TEST); deployConnector(dataFormat, topicTableMapping, srUrl, AvroConverter.class.getName(), "org.apache.kafka.connect.converters.ByteArrayConverter", @@ -406,7 +410,7 @@ public void shouldHandleComplexAvroMessage() { .name("Timestamp").type().nullable().longType().noDefault() .endRecord(); long keyInstantStart = Instant.now(Clock.systemUTC()).toEpochMilli(); - for (int i = 1; i < 8; i++) { + for (int i = 1; i < 9; i++) { try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { //complex-avro-1.avro long keyTick = keyInstantStart + i; @@ -427,8 +431,13 @@ public void shouldHandleComplexAvroMessage() { } } - private @NotNull Map getRecordsIngested(String dataFormat, int maxRecords) { - String query = String.format("%s | where vtype == '%s' | project %s,vresult = pack_all()", coordinates.table, dataFormat, keyColumn); + /** + * Polls the Kusto table for the records ingested. The query is executed every 30 seconds and the results are + * @param query The query to execute + * @param maxRecords The maximum number of records to poll for + * @return A map of the records ingested + */ + private @NotNull Map getRecordsIngested(String query , int maxRecords) { Predicate predicate = (results) -> { if (results != null) { log.info("Retrieved records count {}", ((Map) results).size()); diff --git a/src/test/resources/it-table-setup.kql b/src/test/resources/it-table-setup.kql index a12a9c8..96ed841 100644 --- a/src/test/resources/it-table-setup.kql +++ b/src/test/resources/it-table-setup.kql @@ -3,4 +3,5 @@ .alter table TBL policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:05", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}' .create-or-alter table TBL ingestion json mapping "data_mapping" '[{"column":"vnum","datatype":"int","Properties":{"Path":"$.vnum"}},{"column":"vdate","datatype":"datetime","Properties":{"Path":"$.vdate"}},{"Column":"vdec","datatype":"decimal","Properties":{"Path":"$.vdec"}},{"column":"vb","datatype":"boolean","Properties":{"Path":"$.vb"}},{"column":"vreal","datatype":"real","Properties":{"Path":"$.vreal"}},{"column":"vstr","datatype":"string","Properties":{"Path":"$.vstr"}},{"column":"vlong","datatype":"long","Properties":{"Path":"$.vlong"}},{"column":"vtype","datatype":"string","Properties":{"Path":"$.vtype"}},{"column":"keys","datatype":"dynamic","Properties":{"Path":"$.keys"}},{"column":"headers","datatype":"dynamic","Properties":{"Path":"$.headers"}},{"column":"kafkamd","datatype":"dynamic","Properties":{"Path":"$.kafkamd"}}]' .create-or-alter table TBL ingestion csv mapping "csv_mapping" '[{"column": "vb","datatype": "boolean","Properties": {"Ordinal": "0"}},{"column": "vdate","datatype": "datetime","Properties": {"Ordinal": "1"}},{"Column": "vdec","datatype": "decimal","Properties": {"Ordinal": "2"}},{"column": "vlong","datatype": "long","Properties": {"Ordinal": "3"}},{"column": "vnum","datatype": "int","Properties": {"Ordinal": "4"}},{"column": "vreal","datatype": "real","Properties": {"Ordinal": "5"}},{"column": "vstr","datatype": "string","Properties": {"Ordinal": "6"}},{"column": "vtype","datatype": "string","Properties": {"Ordinal": "7"}},{"column": "keys","datatype": "dynamic","Properties": {"Ordinal": "8"}},{"column": "headers","datatype": "dynamic","Properties": {"Ordinal": "9"}},{"column": "kafkamd","datatype": "dynamic","Properties": {"Ordinal": "10"}}]' -.create-merge table ComplexAvroBytesTest(event_id: string, event_type: string, business_event_type: string, business_object: dynamic, metadata: dynamic, partition: int, offset: long, timestamp: long, id: dynamic,keys:dynamic,headers:dynamic,[kafkamd]:dynamic ) \ No newline at end of file +.create-merge table ['CABT'] (['event_id']:string, ['event_type']:string, ['business_event_type']:string, ['business_object']:dynamic, ['metadata']:dynamic, ['partition']:int, ['offset']:long, ['timestamp']:long, ['id']:dynamic, ['keys']:dynamic, ['headers']:dynamic, ['kafkamd']:dynamic, ['date']:string, ['event_id_1']:guid, ['is_priority']:bool, ['process']:string, ['offset_1']:int) +.create table ['CABT'] ingestion json mapping 'CABT_mapping' '[{"column":"event_type", "Properties":{"Path":"$[\'event_type\']"}},{"column":"business_event_type", "Properties":{"Path":"$[\'business_event_type\']"}},{"column":"business_object", "Properties":{"Path":"$[\'business_object\']"}},{"column":"metadata", "Properties":{"Path":"$[\'metadata\']"}},{"column":"partition", "Properties":{"Path":"$[\'partition\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}},{"column":"date", "Properties":{"Path":"$[\'date\']"}},{"column":"event_id_1", "Properties":{"Path":"$[\'event_id\']"}},{"column":"is_priority", "Properties":{"Path":"$[\'is_priority\']"}},{"column":"process", "Properties":{"Path":"$[\'process\']"}},{"column":"offset_1", "Properties":{"Path":"$[\'offset\']"}}]' \ No newline at end of file From e80bbbcc4597cf4a7959ab1ed3d9a3027b6ab850 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Mon, 13 May 2024 20:12:04 +0530 Subject: [PATCH 2/2] * Timeout for Kafka Avro runs --- .../sink/formatwriter/FormatWriterHelper.java | 25 ++++---- .../sink/formatwriter/KustoRecordWriter.java | 21 +++++-- .../KustoRecordWriterSchemalessTests.java | 1 - .../kafka/connect/sink/it/KustoSinkIT.java | 57 ++++++++++++------- .../avro-complex-data/expected-results.txt | 8 +++ src/test/resources/it-table-setup.kql | 4 +- 6 files changed, 77 insertions(+), 39 deletions(-) create mode 100644 src/test/resources/avro-complex-data/expected-results.txt diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java index 69c7597..6365185 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/FormatWriterHelper.java @@ -12,13 +12,10 @@ import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; -import org.apache.avro.io.Decoder; -import org.apache.avro.io.DecoderFactory; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +23,7 @@ import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.microsoft.azure.kusto.ingest.IngestionProperties; @@ -91,7 +89,7 @@ public static boolean isCsv(IngestionProperties.DataFormat dataFormat) { String bytesAsJson = new String(messageBytes, StandardCharsets.UTF_8); if (isJson(dataformat)) { return isValidJson(defaultKeyOrValueField, bytesAsJson) ? - OBJECT_MAPPER.readerFor(MAP_TYPE_REFERENCE).readValue(bytesAsJson) : + parseJson(bytesAsJson) : Collections.singletonList(Collections.singletonMap(defaultKeyOrValueField, OBJECT_MAPPER.readTree(messageBytes))); } else { @@ -100,6 +98,19 @@ public static boolean isCsv(IngestionProperties.DataFormat dataFormat) { } } + private static @NotNull Collection> parseJson(String json) throws IOException { + JsonNode jsonData = OBJECT_MAPPER.readTree(json); + if(jsonData.isArray()){ + List> result = new ArrayList<>(); + for(JsonNode node : jsonData){ + result.add(OBJECT_MAPPER.convertValue(node, MAP_TYPE_REFERENCE)); + } + return result; + } else { + return Collections.singletonList(OBJECT_MAPPER.convertValue(jsonData, MAP_TYPE_REFERENCE)); + } + } + /** * Convert a given avro record to json and return the encoded bytes. * @@ -193,10 +204,4 @@ private static Collection> bytesToAvroRecord(String defaultK Base64.getEncoder().encodeToString(received_message))); } } - - private static @Nullable GenericRecord bytesToAvroRecord2(byte[] received_message) throws IOException { - DatumReader avroBytesReader = new GenericDatumReader<>(); - Decoder decoder = DecoderFactory.get().binaryDecoder(received_message, null); - return avroBytesReader.read(null, decoder); - } } diff --git a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java index 83bdcf8..b600c64 100644 --- a/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java +++ b/src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriter.java @@ -63,24 +63,35 @@ public void write(SinkRecord record, IngestionProperties.DataFormat dataFormat) formattedRecord.getBytes(StandardCharsets.UTF_8)); this.plainOutputStream.write(LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8)); } else { - Collection> parsedKeys = convertSinkRecordToMap(record, true, dataFormat); + Map parsedKeys = convertSinkRecordToMap(record, true, dataFormat).stream().reduce(new HashMap<>(), + (acc, map) -> { + acc.putAll(map); + return acc; + }); Collection> parsedValues = convertSinkRecordToMap(record, false, dataFormat); parsedValues.forEach(parsedValue -> { Map updatedValue = (record.value() == null) ? new HashMap<>() : new HashMap<>(parsedValue); + /* Add all the key fields */ if (record.key() != null) { - if (parsedValue.size() == 1 && parsedValue.containsKey(KEY_FIELD)) { - updatedValue.put(KEYS_FIELD, parsedValue.get(KEY_FIELD)); + if (parsedKeys.size() == 1 && parsedKeys.containsKey(KEY_FIELD)) { + updatedValue.put(KEYS_FIELD, parsedKeys.get(KEY_FIELD)); } else { - updatedValue.put(KEYS_FIELD, parsedValue); + updatedValue.put(KEYS_FIELD, parsedKeys); } } + /* End add key fields */ + /* Add record headers */ if (record.headers() != null && !record.headers().isEmpty()) { - updatedValue.put(HEADERS_FIELD, parsedValue); + updatedValue.put(HEADERS_FIELD, parsedHeaders); } + /* End record headers */ + /* Add metadata fields */ updatedValue.put(KAFKA_METADATA_FIELD, kafkaMd); + /* End metadata fields */ try { + /* Write out each value row with key and header fields */ writer.writeObject(updatedValue); writer.writeRaw(LINE_SEPARATOR); } catch (IOException e) { diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java index 70f5c3a..6689ea9 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/formatwriter/KustoRecordWriterSchemalessTests.java @@ -12,7 +12,6 @@ import org.apache.commons.io.FileUtils; import org.apache.kafka.connect.sink.SinkRecord; import org.json.JSONException; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; diff --git a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java index fd4bc2c..21889ba 100644 --- a/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java +++ b/src/test/java/com/microsoft/azure/kusto/kafka/connect/sink/it/KustoSinkIT.java @@ -6,9 +6,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.time.Clock; import java.time.Duration; -import java.time.Instant; import java.util.*; import java.util.function.Predicate; import java.util.function.Supplier; @@ -159,12 +157,11 @@ private static void refreshDm() throws Exception { @AfterAll public static void stopContainers() throws Exception { log.info("Finished table clean up. Dropped tables {} and {}", coordinates.table, COMPLEX_AVRO_BYTES_TABLE_TEST); - Thread.sleep(60_000); connectContainer.stop(); schemaRegistryContainer.stop(); kafkaContainer.stop(); engineClient.execute(coordinates.database, String.format(".drop table %s", coordinates.table)); - // engineClient.execute(coordinates.database, String.format(".drop table %s", COMPLEX_AVRO_BYTES_TABLE_TEST)); + engineClient.execute(coordinates.database, String.format(".drop table %s", COMPLEX_AVRO_BYTES_TABLE_TEST)); dmClient.close(); engineClient.close(); } @@ -207,7 +204,6 @@ private static void deployConnector(@NotNull String dataFormat, String topicTabl connectContainer.getConnectorTaskState(String.format("adx-connector-%s", dataFormat), 0)); } - @Disabled @ParameterizedTest @CsvSource({"json", "avro" , "csv" , "bytes-json"}) public void shouldHandleAllTypesOfEvents(@NotNull String dataFormat) { @@ -239,7 +235,7 @@ public void shouldHandleAllTypesOfEvents(@NotNull String dataFormat) { deployConnector(dataFormat, topicTableMapping, srUrl, keyFormat, valueFormat); try { produceKafkaMessages(dataFormat); - Thread.sleep(30000); + Thread.sleep(10_000); } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } @@ -362,11 +358,12 @@ private void produceKafkaMessages(@NotNull String dataFormat) throws IOException log.info("Produced messages for format {}", dataFormat); String query = String.format("%s | where vtype == '%s' | project %s,vresult = pack_all()", coordinates.table, dataFormat, keyColumn); - Map actualRecordsIngested = getRecordsIngested(query, maxRecords); + Map actualRecordsIngested = getRecordsIngested(query, maxRecords); actualRecordsIngested.keySet().parallelStream().forEach(key -> { + long keyLong = Long.parseLong(key.toString()); log.debug("Actual record : {}", actualRecordsIngested.get(key)); try { - JSONAssert.assertEquals(expectedRecordsProduced.get(key), actualRecordsIngested.get(key), + JSONAssert.assertEquals(expectedRecordsProduced.get(keyLong), actualRecordsIngested.get(key), new CustomComparator(LENIENT, // there are sometimes round off errors in the double values but they are close enough to 8 precision new Customization("vdec", (vdec1, @@ -381,10 +378,11 @@ private void produceKafkaMessages(@NotNull String dataFormat) throws IOException } @Test - public void shouldHandleComplexAvroMessage() { + public void shouldHandleComplexAvroMessage() throws IOException { String dataFormat = "bytes-avro"; + int maxRecords = 8; String srUrl = String.format("http://%s:%s", schemaRegistryContainer.getContainerId().substring(0, 12), 8081); - String producerSrUrl = String.format("http://localhost:%s", schemaRegistryContainer.getMappedPort(8081)); + String producerSrUrl = String.format("http://localhost:%s", schemaRegistryContainer.getMappedPort(8081)); Map producerProperties = new HashMap<>(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaContainer.getBootstrapServers()); producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer"); @@ -398,7 +396,7 @@ public void shouldHandleComplexAvroMessage() { String topicTableMapping = String.format("[{'topic': '%s','db': '%s', " + "'table': '%s','format':'%s','mapping':'%s_mapping'}]", topicName, coordinates.database, - COMPLEX_AVRO_BYTES_TABLE_TEST, dataFormat.split("-")[1],COMPLEX_AVRO_BYTES_TABLE_TEST); + COMPLEX_AVRO_BYTES_TABLE_TEST, dataFormat.split("-")[1], COMPLEX_AVRO_BYTES_TABLE_TEST); deployConnector(dataFormat, topicTableMapping, srUrl, AvroConverter.class.getName(), "org.apache.kafka.connect.converters.ByteArrayConverter", @@ -409,11 +407,19 @@ public void shouldHandleComplexAvroMessage() { .name("IterationKey").type().stringType().noDefault() .name("Timestamp").type().nullable().longType().noDefault() .endRecord(); - long keyInstantStart = Instant.now(Clock.systemUTC()).toEpochMilli(); - for (int i = 1; i < 9; i++) { - try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { + long keyStart = 100000L; + + InputStream expectedResultsStream = Objects.requireNonNull(this.getClass().getClassLoader(). + getResourceAsStream("avro-complex-data/expected-results.txt")); + String expectedResults = IOUtils.toString(expectedResultsStream, StandardCharsets.UTF_8); + Map expectedResultMap = + Arrays.stream(expectedResults.split("\n")) + .map(line -> line.split("~")) + .collect(Collectors.toMap(arr -> arr[0], arr -> arr[1])); + try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { + for (int i = 1; i <= maxRecords; i++) { //complex-avro-1.avro - long keyTick = keyInstantStart + i; + long keyTick = keyStart + i; GenericData.Record keyRecord = new GenericData.Record(keySchema); keyRecord.put("IterationKey", String.valueOf(i)); keyRecord.put("Timestamp", keyTick); @@ -425,10 +431,18 @@ public void shouldHandleComplexAvroMessage() { producerRecord.headers().add("iteration", String.valueOf(i).getBytes()); RecordMetadata rmd = producer.send(producerRecord).get(); log.info("Avro bytes sent to topic {} with offset {} of size {}", topicName, rmd.offset(), testData.length); - } catch (Exception e) { - log.error("Failed to send record to topic {}", topicName, e); } + Thread.sleep(30_000); + } catch (Exception e) { + log.error("Failed to send record to topic {}", topicName, e); + fail("Failed sending message to Kafka for testing Avro-Bytes scenario."); } + + String countLongQuery = String.format("%s | summarize c = count() by event_id | project %s=event_id, " + + "vresult = bag_pack('event_id',event_id,'count',c)", COMPLEX_AVRO_BYTES_TABLE_TEST, keyColumn); + + Map actualRecordsIngested = getRecordsIngested(countLongQuery, maxRecords); + assertEquals(expectedResultMap, actualRecordsIngested); } /** @@ -437,7 +451,7 @@ public void shouldHandleComplexAvroMessage() { * @param maxRecords The maximum number of records to poll for * @return A map of the records ingested */ - private @NotNull Map getRecordsIngested(String query , int maxRecords) { + private @NotNull Map getRecordsIngested(String query , int maxRecords) { Predicate predicate = (results) -> { if (results != null) { log.info("Retrieved records count {}", ((Map) results).size()); @@ -452,13 +466,14 @@ public void shouldHandleComplexAvroMessage() { .build(); RetryRegistry registry = RetryRegistry.of(config); Retry retry = registry.retry("ingestRecordService", config); - Supplier> recordSearchSupplier = () -> { + Supplier> recordSearchSupplier = () -> { try { log.debug("Executing query {} ", query); KustoResultSetTable resultSet = engineClient.execute(coordinates.database, query).getPrimaryResults(); - Map actualResults = new HashMap<>(); + Map actualResults = new HashMap<>(); while (resultSet.next()) { - Long key = (long) resultSet.getInt(keyColumn); + Object keyObject = resultSet.getObject(keyColumn); + Object key = keyObject instanceof Number ? Long.parseLong(keyObject.toString()) : keyObject.toString(); String vResult = resultSet.getString("vresult"); log.debug("Record queried from DB: {}", vResult); actualResults.put(key, vResult); diff --git a/src/test/resources/avro-complex-data/expected-results.txt b/src/test/resources/avro-complex-data/expected-results.txt new file mode 100644 index 0000000..81551d5 --- /dev/null +++ b/src/test/resources/avro-complex-data/expected-results.txt @@ -0,0 +1,8 @@ +f51705d7-6e77-46e4-8e40-f8d14c0fe322~{"event_id":"f51705d7-6e77-46e4-8e40-f8d14c0fe322","count":93} +e4d27e8a-428b-4505-8e90-35cf15e7c9f7~{"event_id":"e4d27e8a-428b-4505-8e90-35cf15e7c9f7","count":105} +c0af9315-13f9-4647-a263-d6d26ad26e8e~{"event_id":"c0af9315-13f9-4647-a263-d6d26ad26e8e","count":111} +82f4c300-4679-4477-8c71-9bf4cb28c465~{"event_id":"82f4c300-4679-4477-8c71-9bf4cb28c465","count":98} +7ab92c07-daa9-4d81-9ee8-dc88e0729f30~{"event_id":"7ab92c07-daa9-4d81-9ee8-dc88e0729f30","count":109} +76071ae6-daa7-4ffb-987e-b9766f677acc~{"event_id":"76071ae6-daa7-4ffb-987e-b9766f677acc","count":107} +1bd15bc0-76a3-4561-9a0e-ecfe0c5b41d6~{"event_id":"1bd15bc0-76a3-4561-9a0e-ecfe0c5b41d6","count":108} +144eadde-7484-494a-8551-232a1204e475~{"event_id":"144eadde-7484-494a-8551-232a1204e475","count":101} \ No newline at end of file diff --git a/src/test/resources/it-table-setup.kql b/src/test/resources/it-table-setup.kql index 96ed841..26a5230 100644 --- a/src/test/resources/it-table-setup.kql +++ b/src/test/resources/it-table-setup.kql @@ -3,5 +3,5 @@ .alter table TBL policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:05", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}' .create-or-alter table TBL ingestion json mapping "data_mapping" '[{"column":"vnum","datatype":"int","Properties":{"Path":"$.vnum"}},{"column":"vdate","datatype":"datetime","Properties":{"Path":"$.vdate"}},{"Column":"vdec","datatype":"decimal","Properties":{"Path":"$.vdec"}},{"column":"vb","datatype":"boolean","Properties":{"Path":"$.vb"}},{"column":"vreal","datatype":"real","Properties":{"Path":"$.vreal"}},{"column":"vstr","datatype":"string","Properties":{"Path":"$.vstr"}},{"column":"vlong","datatype":"long","Properties":{"Path":"$.vlong"}},{"column":"vtype","datatype":"string","Properties":{"Path":"$.vtype"}},{"column":"keys","datatype":"dynamic","Properties":{"Path":"$.keys"}},{"column":"headers","datatype":"dynamic","Properties":{"Path":"$.headers"}},{"column":"kafkamd","datatype":"dynamic","Properties":{"Path":"$.kafkamd"}}]' .create-or-alter table TBL ingestion csv mapping "csv_mapping" '[{"column": "vb","datatype": "boolean","Properties": {"Ordinal": "0"}},{"column": "vdate","datatype": "datetime","Properties": {"Ordinal": "1"}},{"Column": "vdec","datatype": "decimal","Properties": {"Ordinal": "2"}},{"column": "vlong","datatype": "long","Properties": {"Ordinal": "3"}},{"column": "vnum","datatype": "int","Properties": {"Ordinal": "4"}},{"column": "vreal","datatype": "real","Properties": {"Ordinal": "5"}},{"column": "vstr","datatype": "string","Properties": {"Ordinal": "6"}},{"column": "vtype","datatype": "string","Properties": {"Ordinal": "7"}},{"column": "keys","datatype": "dynamic","Properties": {"Ordinal": "8"}},{"column": "headers","datatype": "dynamic","Properties": {"Ordinal": "9"}},{"column": "kafkamd","datatype": "dynamic","Properties": {"Ordinal": "10"}}]' -.create-merge table ['CABT'] (['event_id']:string, ['event_type']:string, ['business_event_type']:string, ['business_object']:dynamic, ['metadata']:dynamic, ['partition']:int, ['offset']:long, ['timestamp']:long, ['id']:dynamic, ['keys']:dynamic, ['headers']:dynamic, ['kafkamd']:dynamic, ['date']:string, ['event_id_1']:guid, ['is_priority']:bool, ['process']:string, ['offset_1']:int) -.create table ['CABT'] ingestion json mapping 'CABT_mapping' '[{"column":"event_type", "Properties":{"Path":"$[\'event_type\']"}},{"column":"business_event_type", "Properties":{"Path":"$[\'business_event_type\']"}},{"column":"business_object", "Properties":{"Path":"$[\'business_object\']"}},{"column":"metadata", "Properties":{"Path":"$[\'metadata\']"}},{"column":"partition", "Properties":{"Path":"$[\'partition\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}},{"column":"date", "Properties":{"Path":"$[\'date\']"}},{"column":"event_id_1", "Properties":{"Path":"$[\'event_id\']"}},{"column":"is_priority", "Properties":{"Path":"$[\'is_priority\']"}},{"column":"process", "Properties":{"Path":"$[\'process\']"}},{"column":"offset_1", "Properties":{"Path":"$[\'offset\']"}}]' \ No newline at end of file +.create-merge table ['CABT'] (['event_id']:string, ['event_type']:string, ['business_event_type']:string, ['business_object']:dynamic, ['metadata']:dynamic, ['partition']:int, ['offset']:long, ['timestamp']:long, ['id']:dynamic, ['keys']:dynamic, ['headers']:dynamic, ['kafkamd']:dynamic, ['date']:string, ['is_priority']:bool, ['process']:string) +.create table ['CABT'] ingestion json mapping 'CABT_mapping' '[{"column":"event_type", "Properties":{"Path":"$[\'event_type\']"}},{"column":"business_event_type", "Properties":{"Path":"$[\'business_event_type\']"}},{"column":"business_object", "Properties":{"Path":"$[\'business_object\']"}},{"column":"metadata", "Properties":{"Path":"$[\'metadata\']"}},{"column":"partition", "Properties":{"Path":"$[\'partition\']"}},{"column":"timestamp", "Properties":{"Path":"$[\'timestamp\']"}},{"column":"event_id", "Properties":{"Path":"$[\'event_id\']"}},{"column":"is_priority", "Properties":{"Path":"$[\'is_priority\']"}},{"column":"process", "Properties":{"Path":"$[\'process\']"}},{"column":"offset", "Properties":{"Path":"$[\'offset\']"}},{"column":"keys", "Properties":{"Path":"$[\'keys\']"}},{"column":"headers", "Properties":{"Path":"$[\'headers\']"}},{"column":"kafkamd", "Properties":{"Path":"$[\'kafkamd\']"}}]' \ No newline at end of file