Skip to content

Commit 6e8d00b

Browse files
authored
[improve]support string type record in one topic to multiple tables (#59)
1 parent e36b3ff commit 6e8d00b

File tree

2 files changed

+80
-7
lines changed

2 files changed

+80
-7
lines changed

src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
package org.apache.doris.kafka.connector.service;
2121

2222
import com.codahale.metrics.MetricRegistry;
23+
import com.fasterxml.jackson.core.JsonProcessingException;
24+
import com.fasterxml.jackson.databind.ObjectMapper;
2325
import com.google.common.annotations.VisibleForTesting;
2426
import java.util.Collection;
27+
import java.util.Collections;
2528
import java.util.HashMap;
2629
import java.util.List;
2730
import java.util.Map;
@@ -39,6 +42,7 @@
3942
import org.apache.doris.kafka.connector.writer.load.LoadModel;
4043
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
4144
import org.apache.kafka.common.TopicPartition;
45+
import org.apache.kafka.connect.data.Struct;
4246
import org.apache.kafka.connect.sink.SinkRecord;
4347
import org.slf4j.Logger;
4448
import org.slf4j.LoggerFactory;
@@ -60,9 +64,11 @@ public class DorisDefaultSinkService implements DorisSinkService {
6064
private final DorisOptions dorisOptions;
6165
private final MetricsJmxReporter metricsJmxReporter;
6266
private final DorisConnectMonitor connectMonitor;
67+
private final ObjectMapper objectMapper;
6368

6469
DorisDefaultSinkService(Map<String, String> config) {
6570
this.dorisOptions = new DorisOptions(config);
71+
this.objectMapper = new ObjectMapper();
6672
this.writer = new HashMap<>();
6773
this.conn = new JdbcConnectionProvider(dorisOptions);
6874
MetricRegistry metricRegistry = new MetricRegistry();
@@ -202,19 +208,36 @@ public String getSinkDorisTableName(SinkRecord record) {
202208
if (StringUtils.isEmpty(field)) {
203209
return defaultTableName;
204210
}
205-
if (!(record.value() instanceof Map)) {
211+
return parseRecordTableName(defaultTableName, field, record);
212+
}
213+
214+
private String parseRecordTableName(
215+
String defaultTableName, String tableNameField, SinkRecord record) {
216+
Object recordValue = record.value();
217+
Map<String, Object> recordMap = Collections.emptyMap();
218+
if (recordValue instanceof Struct) {
206219
LOG.warn(
207-
"Only Map objects supported for The 'record.tablename.field' configuration, field={}, record type={}",
208-
field,
209-
record.value().getClass().getName());
220+
"The Struct type record not supported for The 'record.tablename.field' configuration, field={}",
221+
tableNameField);
210222
return defaultTableName;
223+
} else if (recordValue instanceof Map) {
224+
recordMap = (Map<String, Object>) recordValue;
225+
} else if (recordValue instanceof String) {
226+
try {
227+
recordMap = objectMapper.readValue((String) recordValue, Map.class);
228+
} catch (JsonProcessingException e) {
229+
LOG.warn(
230+
"The String type record failed to parse record value to map. record={}, field={}",
231+
recordValue,
232+
tableNameField,
233+
e);
234+
}
211235
}
212-
Map<String, Object> map = (Map<String, Object>) record.value();
213236
// if the field is not found in the record, use the table name in the config
214-
if (map.get(field) == null) {
237+
if (!recordMap.containsKey(tableNameField)) {
215238
return defaultTableName;
216239
}
217-
return map.get(field).toString();
240+
return recordMap.get(tableNameField).toString();
218241
}
219242

220243
private static String getNameIndex(String topic, int partition) {

src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,16 @@
2121

2222
import java.io.IOException;
2323
import java.io.InputStream;
24+
import java.nio.charset.StandardCharsets;
2425
import java.util.HashMap;
2526
import java.util.Map;
2627
import java.util.Properties;
2728
import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
2829
import org.apache.kafka.connect.data.Schema;
30+
import org.apache.kafka.connect.data.SchemaAndValue;
2931
import org.apache.kafka.connect.data.SchemaBuilder;
32+
import org.apache.kafka.connect.data.Struct;
33+
import org.apache.kafka.connect.json.JsonConverter;
3034
import org.apache.kafka.connect.sink.SinkRecord;
3135
import org.junit.Assert;
3236
import org.junit.Before;
@@ -35,6 +39,7 @@
3539
public class TestDorisSinkService {
3640

3741
private DorisDefaultSinkService dorisDefaultSinkService;
42+
private JsonConverter jsonConverter = new JsonConverter();
3843

3944
@Before
4045
public void init() throws IOException {
@@ -49,6 +54,7 @@ public void init() throws IOException {
4954
props.put("name", "sink-connector-test");
5055
props.put("record.tablename.field", "table_name");
5156
dorisDefaultSinkService = new DorisDefaultSinkService((Map) props);
57+
jsonConverter.configure(new HashMap<>(), false);
5258
}
5359

5460
@Test
@@ -81,5 +87,49 @@ public void getSinkDorisTableName() {
8187
1);
8288
Assert.assertEquals(
8389
"appoint_table", dorisDefaultSinkService.getSinkDorisTableName(record2));
90+
91+
String recordValue3 = "{\"id\":1,\"name\":\"bob\",\"age\":12}";
92+
SinkRecord record3 =
93+
new SinkRecord(
94+
"topic_test",
95+
0,
96+
Schema.OPTIONAL_STRING_SCHEMA,
97+
"key",
98+
Schema.OPTIONAL_STRING_SCHEMA,
99+
recordValue3,
100+
3);
101+
Assert.assertEquals(
102+
"test_kafka_tbl", dorisDefaultSinkService.getSinkDorisTableName(record3));
103+
104+
String recordValue4 =
105+
"{\"id\":12,\"name\":\"jack\",\"age\":13,\"table_name\":\"appoint_table2\"}";
106+
SinkRecord record4 =
107+
new SinkRecord(
108+
"topic_test",
109+
0,
110+
Schema.OPTIONAL_STRING_SCHEMA,
111+
"key",
112+
Schema.OPTIONAL_STRING_SCHEMA,
113+
recordValue4,
114+
3);
115+
Assert.assertEquals(
116+
"appoint_table2", dorisDefaultSinkService.getSinkDorisTableName(record4));
117+
118+
String structMsg =
119+
"{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.test_sink_normal.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":true,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"server_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"gtid\"},{\"type\":\"string\",\"optional\":false,\"field\":\"file\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"pos\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"row\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"thread\"},{\"type\":\"string\",\"optional\":true,\"field\":\"query\"}],\"optional\":false,\"name\":\"io.debezium.connector.mysql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"name\":\"event.block\",\"version\":1,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"normal.test.test_sink_normal.Envelope\",\"version\":1},\"payload\":{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}}";
120+
SchemaAndValue schemaAndValue =
121+
jsonConverter.toConnectData(
122+
"topic_test", structMsg.getBytes(StandardCharsets.UTF_8));
123+
SinkRecord record5 =
124+
new SinkRecord(
125+
"topic_test",
126+
0,
127+
Schema.OPTIONAL_STRING_SCHEMA,
128+
"key",
129+
Schema.OPTIONAL_STRING_SCHEMA,
130+
new Struct(schemaAndValue.schema()),
131+
3);
132+
Assert.assertEquals(
133+
"test_kafka_tbl", dorisDefaultSinkService.getSinkDorisTableName(record5));
84134
}
85135
}

0 commit comments

Comments
 (0)