Skip to content

Feature/add headers and metadata v4 #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.azure.kusto.ingest.IngestionProperties;

import io.confluent.kafka.serializers.NonRecordContainer;

import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.*;
import static com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat.SINGLEJSON;

public class FormatWriterHelper {
private static final Logger LOGGER = LoggerFactory.getLogger(KustoRecordWriter.class);
Expand Down Expand Up @@ -72,33 +72,48 @@ public static boolean isCsv(IngestionProperties.DataFormat dataFormat) {
}

/**
* @param messageBytes Raw message bytes to transform
* @param messageBytes Raw message bytes to transform
* @param defaultKeyOrValueField Default value for Key or Value
* @param dataformat JSON or Avro
* @param dataformat JSON or Avro
* @return a Map of the K-V of JSON
*/
public static @NotNull Map<String, Object> convertBytesToMap(byte[] messageBytes,
String defaultKeyOrValueField,
IngestionProperties.DataFormat dataformat) throws IOException {
public static @NotNull Collection<Map<String, Object>> convertBytesToMap(byte[] messageBytes,
String defaultKeyOrValueField,
IngestionProperties.DataFormat dataformat) throws IOException {
if (messageBytes == null || messageBytes.length == 0) {
return Collections.emptyMap();
return Collections.emptyList();
}
if (isAvro(dataformat)) {
return bytesToAvroRecord(defaultKeyOrValueField,messageBytes);
return bytesToAvroRecord(defaultKeyOrValueField, messageBytes);
}
String bytesAsJson = new String(messageBytes, StandardCharsets.UTF_8);
if (isJson(dataformat)) {
return isValidJson(defaultKeyOrValueField,bytesAsJson) ?
OBJECT_MAPPER.readerFor(MAP_TYPE_REFERENCE).readValue(bytesAsJson) :
Collections.singletonMap(defaultKeyOrValueField,
OBJECT_MAPPER.readTree(messageBytes));
return isValidJson(defaultKeyOrValueField, bytesAsJson) ?
parseJson(bytesAsJson) :
Collections.singletonList(Collections.singletonMap(defaultKeyOrValueField,
OBJECT_MAPPER.readTree(messageBytes)));
} else {
return Collections.singletonMap(defaultKeyOrValueField, Base64.getEncoder().encodeToString(messageBytes));
return Collections.singletonList(Collections.singletonMap(defaultKeyOrValueField,
Base64.getEncoder().encodeToString(messageBytes)));
}
}

private static @NotNull Collection<Map<String, Object>> parseJson(String json) throws IOException {
JsonNode jsonData = OBJECT_MAPPER.readTree(json);
if(jsonData.isArray()){
List<Map<String, Object>> result = new ArrayList<>();
for(JsonNode node : jsonData){
result.add(OBJECT_MAPPER.convertValue(node, MAP_TYPE_REFERENCE));
}
return result;
} else {
return Collections.singletonList(OBJECT_MAPPER.convertValue(jsonData, MAP_TYPE_REFERENCE));
}
}

/**
* Convert a given avro record to json and return the encoded bytes.
*
* @param record The GenericRecord to convert
*/
private static Map<String, Object> avroToJson(@NotNull GenericRecord record) throws IOException {
Expand Down Expand Up @@ -129,7 +144,7 @@ private static boolean isValidJson(String defaultKeyOrValueField, String json) {
String defaultKeyOrValueField,
IngestionProperties.DataFormat dataFormat) throws IOException {
String objStr = (String) value;
if (isJson(dataFormat) && isValidJson(defaultKeyOrValueField,objStr)) {
if (isJson(dataFormat) && isValidJson(defaultKeyOrValueField, objStr)) {
return OBJECT_MAPPER.readerFor(MAP_TYPE_REFERENCE).readValue(objStr);
} else {
return Collections.singletonMap(defaultKeyOrValueField, objStr);
Expand All @@ -142,7 +157,7 @@ private static boolean isJson(IngestionProperties.DataFormat dataFormat) {
|| IngestionProperties.DataFormat.SINGLEJSON.equals(dataFormat);
}

private static Map<String, Object> bytesToAvroRecord(String defaultKeyOrValueField,byte[] received_message) {
private static Collection<Map<String, Object>> bytesToAvroRecord(String defaultKeyOrValueField, byte[] received_message) {
Map<String, Object> returnValue = new HashMap<>();
try {
// avro input parser
Expand All @@ -155,11 +170,16 @@ private static Map<String, Object> bytesToAvroRecord(String defaultKeyOrValueFie
throw new ConnectException(
"Failed to parse AVRO " + "record\n" + e.getMessage());
}
List<Map<String, Object>> nodes = new ArrayList<>();
while (dataFileReader.hasNext()) {
String jsonString = dataFileReader.next().toString();
LOGGER.info("---------------------------------------------------------------------------");
LOGGER.info(jsonString);
LOGGER.info("---------------------------------------------------------------------------");
try {
Map<String, Object> nodeMap = OBJECT_MAPPER.readValue(jsonString, MAP_TYPE_REFERENCE);
returnValue.putAll(nodeMap);
nodes.add(returnValue);
} catch (IOException e) {
throw new ConnectException(
"Failed to parse JSON"
Expand All @@ -176,10 +196,12 @@ private static Map<String, Object> bytesToAvroRecord(String defaultKeyOrValueFie
throw new ConnectException(
"Failed to parse AVRO (2) " + "record\n" + e);
}
return returnValue;
return nodes;
} catch (Exception e) {
LOGGER.error("Failed to parse AVRO record (3) \n", e);
return Collections.singletonMap(defaultKeyOrValueField, Base64.getEncoder().encodeToString(received_message));
return Collections.singletonList(
Collections.singletonMap(defaultKeyOrValueField,
Base64.getEncoder().encodeToString(received_message)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -46,8 +47,9 @@ public Map<String, Object> getHeadersAsMap(@NotNull SinkRecord record) {

/**
* Convert SinkRecord to CSV
*
* @param record SinkRecord
* @param isKey boolean
* @param isKey boolean
* @return String
*/
public String convertSinkRecordToCsv(@NotNull SinkRecord record, boolean isKey) {
Expand All @@ -68,38 +70,38 @@ public String convertSinkRecordToCsv(@NotNull SinkRecord record, boolean isKey)

@NotNull
@SuppressWarnings(value = "unchecked")
public Map<String, Object> convertSinkRecordToMap(@NotNull SinkRecord record, boolean isKey,
IngestionProperties.DataFormat dataFormat) throws IOException {
public Collection<Map<String, Object>> convertSinkRecordToMap(@NotNull SinkRecord record, boolean isKey,
IngestionProperties.DataFormat dataFormat) throws IOException {
Object recordValue = isKey ? record.key() : record.value();
Schema schema = isKey ? record.keySchema() : record.valueSchema();
String defaultKeyOrValueField = isKey ? KEY_FIELD : VALUE_FIELD;
if (recordValue == null) {
return Collections.emptyMap();
return Collections.emptyList();
}
if (recordValue instanceof Struct) {
Struct recordStruct = (Struct) recordValue;
return FormatWriterHelper.structToMap(recordStruct);
return Collections.singletonList(FormatWriterHelper.structToMap(recordStruct));
}
// Is Avro Data
if (recordValue instanceof GenericData.Record || recordValue instanceof NonRecordContainer) {
return FormatWriterHelper.convertAvroRecordToMap(schema, recordValue);
return Collections.singletonList(FormatWriterHelper.convertAvroRecordToMap(schema, recordValue));
}
// String or JSON
if (recordValue instanceof String) {
return FormatWriterHelper.convertStringToMap(recordValue, defaultKeyOrValueField, dataFormat);
return Collections.singletonList(FormatWriterHelper.convertStringToMap(recordValue,
defaultKeyOrValueField, dataFormat));
}
// Map
if (recordValue instanceof Map) {
return (Map<String, Object>) recordValue;
return Collections.singletonList((Map<String, Object>) recordValue);
}
// is a byte array
if(isSchemaFormat(dataFormat)){
if (isSchemaFormat(dataFormat)) {
if (recordValue instanceof byte[]) {
return FormatWriterHelper.convertBytesToMap((byte[]) recordValue, defaultKeyOrValueField, dataFormat);
}
else {
} else {
String fieldName = isKey ? KEY_FIELD : VALUE_FIELD;
return Collections.singletonMap(fieldName, recordValue);
return Collections.singletonList(Collections.singletonMap(fieldName, recordValue));
}
} else {
String errorMessage = String.format("DataFormat %s is not supported in the connector though " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -37,53 +38,67 @@ public KustoRecordWriter(String filename, OutputStream out) {
}

/**
* @param record the record to persist.
* @param record the record to persist.
* @param dataFormat the data format to use.
* @throws IOException if an error occurs while writing the record.
*/
@Override
public void write(SinkRecord record, IngestionProperties.DataFormat dataFormat) throws IOException {
try {
if (schema == null) {
schema = record.valueSchema();
LOGGER.debug("Opening record writer for: {}", filename);
}
Map<String, Object> parsedHeaders = getHeadersAsMap(record);
Map<String, String> kafkaMd = getKafkaMetaDataAsMap(record);
if (isCsv(dataFormat)) {
String serializedKeys = StringEscapeUtils.escapeCsv(convertSinkRecordToCsv(record, true));
String serializedValues = convertSinkRecordToCsv(record, false);
String serializedHeaders = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(parsedHeaders));
String serializedMetadata = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(kafkaMd));
String formattedRecord = String.format("%s,%s,%s,%s", serializedValues, serializedKeys,
serializedHeaders, serializedMetadata);
LOGGER.trace("Writing record to file: Keys {} , Values {} , Headers {} , OverallRecord {}",
serializedKeys, serializedValues, serializedHeaders, formattedRecord);
this.plainOutputStream.write(
formattedRecord.getBytes(StandardCharsets.UTF_8));
this.plainOutputStream.write(LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8));
} else {
Map<String, Object> parsedKeys = convertSinkRecordToMap(record, true, dataFormat);
Map<String, Object> parsedValues = convertSinkRecordToMap(record, false, dataFormat);
if (schema == null) {
schema = record.valueSchema();
LOGGER.debug("Opening record writer for: {}", filename);
}
Map<String, Object> parsedHeaders = getHeadersAsMap(record);
Map<String, String> kafkaMd = getKafkaMetaDataAsMap(record);
if (isCsv(dataFormat)) {
String serializedKeys = StringEscapeUtils.escapeCsv(convertSinkRecordToCsv(record, true));
String serializedValues = convertSinkRecordToCsv(record, false);
String serializedHeaders = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(parsedHeaders));
String serializedMetadata = StringEscapeUtils.escapeCsv(OBJECT_MAPPER.writeValueAsString(kafkaMd));
String formattedRecord = String.format("%s,%s,%s,%s", serializedValues, serializedKeys,
serializedHeaders, serializedMetadata);
LOGGER.trace("Writing record to file: Keys {} , Values {} , Headers {} , OverallRecord {}",
serializedKeys, serializedValues, serializedHeaders, formattedRecord);
this.plainOutputStream.write(
formattedRecord.getBytes(StandardCharsets.UTF_8));
this.plainOutputStream.write(LINE_SEPARATOR.getBytes(StandardCharsets.UTF_8));
} else {
Map<String, Object> parsedKeys = convertSinkRecordToMap(record, true, dataFormat).stream().reduce(new HashMap<>(),
(acc, map) -> {
acc.putAll(map);
return acc;
});
Collection<Map<String, Object>> parsedValues = convertSinkRecordToMap(record, false, dataFormat);

parsedValues.forEach(parsedValue -> {
Map<String, Object> updatedValue = (record.value() == null) ? new HashMap<>() :
new HashMap<>(parsedValues);
new HashMap<>(parsedValue);
/* Add all the key fields */
if (record.key() != null) {
if (parsedKeys.size() == 1 && parsedKeys.containsKey(KEY_FIELD)) {
updatedValue.put(KEYS_FIELD, parsedKeys.get(KEY_FIELD));
} else {
updatedValue.put(KEYS_FIELD, parsedKeys);
}
}
/* End add key fields */
/* Add record headers */
if (record.headers() != null && !record.headers().isEmpty()) {
updatedValue.put(HEADERS_FIELD, parsedHeaders);
}
/* End record headers */
/* Add metadata fields */
updatedValue.put(KAFKA_METADATA_FIELD, kafkaMd);
writer.writeObject(updatedValue);
writer.writeRaw(LINE_SEPARATOR);
}
} catch (IOException e) {
LOGGER.error("Error writing record to file: {}", filename, e);
throw new ConnectException(e);
/* End metadata fields */
try {
/* Write out each value row with key and header fields */
writer.writeObject(updatedValue);
writer.writeRaw(LINE_SEPARATOR);
} catch (IOException e) {
LOGGER.error("Error writing record to file: {}", filename, e);
throw new ConnectException(e);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.confluent.connect.avro.AvroData;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;

public class KustoKustoRecordWriterSchemaTests extends KustoRecordWriterBase {
public class KustoRecordWriterSchemaTests extends KustoRecordWriterBase {
private static @NotNull Stream<Arguments> testMapSchemaJson() {
// Key schema, value schema, expectedKey, expectedValue
Schema intToIntSchema = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).name("IntToIntMap").build();
Expand Down
Loading
Loading