Skip to content

Commit 0813cb5

Browse files
authored
[improve] support combine flush async (#74)
When this is enabled, enable.combine.flush=true . Currently, streamload is still synchronous, which will block the consumption of kafka data while writing. Change it to asynchronous to improve throughput.
1 parent f5ea699 commit 0813cb5

15 files changed

+602
-79
lines changed

src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ public void put(final Collection<SinkRecord> records) {
109109
remainingRetries);
110110
remainingRetries--;
111111
context.timeout(options.getRetryIntervalMs());
112+
// When writing asynchronously, need to restart the asynchronous thread
113+
sink.init();
112114
throw new RetriableException(ex);
113115
}
114116
throw ex;

src/main/java/org/apache/doris/kafka/connector/service/DorisCombinedSinkService.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import java.util.Collection;
2323
import java.util.HashMap;
2424
import java.util.Map;
25+
import org.apache.doris.kafka.connector.writer.AsyncStreamLoadWriter;
2526
import org.apache.doris.kafka.connector.writer.DorisWriter;
26-
import org.apache.doris.kafka.connector.writer.StreamLoadWriter;
2727
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2828
import org.apache.kafka.common.TopicPartition;
2929
import org.apache.kafka.connect.sink.SinkRecord;
@@ -34,14 +34,24 @@
3434
/** Combined all partitions and write once. */
3535
public class DorisCombinedSinkService extends DorisDefaultSinkService {
3636
private static final Logger LOG = LoggerFactory.getLogger(DorisCombinedSinkService.class);
37-
3837
private final Map<String, HashMap<Integer, Long>> topicPartitionOffset;
3938

4039
DorisCombinedSinkService(Map<String, String> config, SinkTaskContext context) {
4140
super(config, context);
4241
this.topicPartitionOffset = new HashMap<>();
4342
}
4443

44+
@Override
45+
public void init() {
46+
for (DorisWriter wr : writer.values()) {
47+
if (wr instanceof AsyncStreamLoadWriter) {
48+
// When the stream load asynchronous thread down,
49+
// it needs to be restarted when retrying
50+
((AsyncStreamLoadWriter) wr).start();
51+
}
52+
}
53+
}
54+
4555
/**
4656
* Create new task
4757
*
@@ -60,8 +70,9 @@ public void startTask(final String tableName, final TopicPartition topicPartitio
6070
// Only by topic
6171
int partition = -1;
6272
DorisWriter dorisWriter =
63-
new StreamLoadWriter(
73+
new AsyncStreamLoadWriter(
6474
tableName, topic, partition, dorisOptions, conn, connectMonitor);
75+
6576
writer.put(writerKey, dorisWriter);
6677
metricsJmxReporter.start();
6778
}
@@ -86,14 +97,6 @@ public void insert(final Collection<SinkRecord> records) {
8697
// Might happen a count of record based flushing,buffer
8798
insert(record);
8899
}
89-
90-
// check all sink writer to see if they need to be flushed
91-
for (DorisWriter writer : writer.values()) {
92-
// Time based flushing
93-
if (writer.shouldFlush()) {
94-
writer.flushBuffer();
95-
}
96-
}
97100
}
98101

99102
@Override
@@ -122,8 +125,9 @@ public long getOffset(final TopicPartition topicPartition) {
122125
public void commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
123126
// Here we force flushing the data in memory once to
124127
// ensure that the offsets recorded in topicPartitionOffset have been flushed to doris
128+
LOG.info("trigger flush by commit, topic {}", topicPartitionOffset.keySet());
125129
for (DorisWriter writer : writer.values()) {
126-
writer.flushBuffer();
130+
writer.commitFlush();
127131
}
128132
}
129133

src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ public class DorisDefaultSinkService implements DorisSinkService {
8787
this.metricsJmxReporter);
8888
}
8989

90+
@Override
91+
public void init() {}
92+
9093
@Override
9194
public void startTask(TopicPartition topicPartition) {
9295
startTask(dorisOptions.getTopicMapTable(topicPartition.topic()), topicPartition);

src/main/java/org/apache/doris/kafka/connector/service/DorisSinkService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
/** Background service of data sink, responsible to create/drop table and insert/delete files */
2929
public interface DorisSinkService {
3030

31+
/** init task for writer */
32+
void init();
33+
3134
/**
3235
* Start the Task.
3336
*
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.doris.kafka.connector.writer;
21+
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.ScheduledThreadPoolExecutor;
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.doris.kafka.connector.cfg.DorisOptions;
26+
import org.apache.doris.kafka.connector.connection.ConnectionProvider;
27+
import org.apache.doris.kafka.connector.metrics.DorisConnectMonitor;
28+
import org.apache.doris.kafka.connector.utils.BackendUtils;
29+
import org.apache.doris.kafka.connector.writer.load.AsyncDorisStreamLoad;
30+
import org.apache.doris.kafka.connector.writer.load.DefaultThreadFactory;
31+
import org.apache.kafka.connect.sink.SinkRecord;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
34+
35+
public class AsyncStreamLoadWriter extends DorisWriter {
36+
private static final Logger LOG = LoggerFactory.getLogger(AsyncStreamLoadWriter.class);
37+
private final LabelGenerator labelGenerator;
38+
private AsyncDorisStreamLoad dorisStreamLoad;
39+
private final transient ScheduledExecutorService scheduledExecutorService;
40+
41+
public AsyncStreamLoadWriter(
42+
String tableName,
43+
String topic,
44+
int partition,
45+
DorisOptions dorisOptions,
46+
ConnectionProvider connectionProvider,
47+
DorisConnectMonitor connectMonitor) {
48+
super(tableName, topic, partition, dorisOptions, connectionProvider, connectMonitor);
49+
this.labelGenerator = new LabelGenerator(topic, partition, tableIdentifier);
50+
BackendUtils backendUtils = BackendUtils.getInstance(dorisOptions, LOG);
51+
this.dorisStreamLoad =
52+
new AsyncDorisStreamLoad(backendUtils, dorisOptions, topic, this.tableName);
53+
this.scheduledExecutorService =
54+
new ScheduledThreadPoolExecutor(
55+
1, new DefaultThreadFactory("stream-load-flush-interval"));
56+
// when uploading data in streaming mode, we need to regularly detect whether there are
57+
// exceptions.
58+
scheduledExecutorService.scheduleWithFixedDelay(
59+
this::intervalFlush,
60+
dorisOptions.getFlushTime(),
61+
dorisOptions.getFlushTime(),
62+
TimeUnit.SECONDS);
63+
}
64+
65+
/** start async thread stream load */
66+
public void start() {
67+
this.dorisStreamLoad.start();
68+
}
69+
70+
public void insert(final SinkRecord dorisRecord) {
71+
checkFlushException();
72+
putBuffer(dorisRecord);
73+
if (buffer.getBufferSizeBytes() >= dorisOptions.getFileSize()
74+
|| (dorisOptions.getRecordNum() != 0
75+
&& buffer.getNumOfRecords() >= dorisOptions.getRecordNum())) {
76+
LOG.info(
77+
"trigger flush by buffer size or count, buffer size: {}, num of records: {}, lastoffset : {}",
78+
buffer.getBufferSizeBytes(),
79+
buffer.getNumOfRecords(),
80+
buffer.getLastOffset());
81+
bufferFullFlush();
82+
}
83+
}
84+
85+
private void bufferFullFlush() {
86+
doFlush(false, true);
87+
}
88+
89+
private void intervalFlush() {
90+
LOG.debug("interval flush trigger");
91+
doFlush(false, false);
92+
}
93+
94+
public void commitFlush() {
95+
doFlush(true, false);
96+
}
97+
98+
private synchronized void doFlush(boolean waitUtilDone, boolean bufferFull) {
99+
if (waitUtilDone || bufferFull) {
100+
flushBuffer(waitUtilDone);
101+
} else if (dorisStreamLoad.hasCapacity()) {
102+
flushBuffer(false);
103+
}
104+
}
105+
106+
public synchronized void flushBuffer(boolean waitUtilDone) {
107+
if (!buffer.isEmpty()) {
108+
RecordBuffer tmpBuff = buffer;
109+
110+
String label = labelGenerator.generateLabel(tmpBuff.getLastOffset());
111+
dorisStreamLoad.flush(label, tmpBuff);
112+
this.buffer = new RecordBuffer();
113+
}
114+
115+
if (waitUtilDone) {
116+
dorisStreamLoad.forceFlush();
117+
}
118+
}
119+
120+
private void checkFlushException() {
121+
dorisStreamLoad.checkException();
122+
}
123+
124+
@Override
125+
public void commit(int partition) {
126+
// Won't go here
127+
}
128+
129+
@Override
130+
public long getOffset() {
131+
// Won't go here
132+
return 0;
133+
}
134+
135+
@Override
136+
public void fetchOffset() {
137+
// Won't go here
138+
}
139+
}

src/main/java/org/apache/doris/kafka/connector/writer/DorisWriter.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public abstract class DorisWriter {
4242
protected String dbName;
4343
protected final String tableIdentifier;
4444
protected List<String> fileNames;
45-
private RecordBuffer buffer;
45+
protected RecordBuffer buffer;
4646
protected final AtomicLong committedOffset; // loaded offset + 1
4747
protected final AtomicLong flushedOffset; // flushed offset
4848
protected final AtomicLong processedOffset; // processed offset
@@ -116,11 +116,8 @@ protected void initRecord(final SinkRecord record) {
116116

117117
protected void insertRecord(final SinkRecord record) {
118118
// discard the record if the record offset is smaller or equal to server side offset
119-
// when enable.combine.flush=true, No verification is required because the offsets of
120-
// multiple partitions cannot be compared.
121-
if (dorisOptions.isEnableCombineFlush()
122-
|| (record.kafkaOffset() > this.offsetPersistedInDoris.get()
123-
&& record.kafkaOffset() > processedOffset.get())) {
119+
if (record.kafkaOffset() > this.offsetPersistedInDoris.get()
120+
&& record.kafkaOffset() > processedOffset.get()) {
124121
SinkRecord dorisRecord = record;
125122
RecordBuffer tmpBuff = null;
126123

@@ -175,6 +172,11 @@ public boolean shouldFlush() {
175172
>= (dorisOptions.getFlushTime() * 1000);
176173
}
177174

175+
// for combine flush
176+
public void commitFlush() {
177+
flushBuffer();
178+
}
179+
178180
public void flushBuffer() {
179181
if (buffer.isEmpty()) {
180182
return;

src/main/java/org/apache/doris/kafka/connector/writer/RecordBuffer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class RecordBuffer extends PartitionBuffer<String> {
2929
private static final Logger LOG = LoggerFactory.getLogger(RecordBuffer.class);
3030
public static final String LINE_SEPARATOR = "\n";
3131
private final StringJoiner buffer;
32+
private String label;
3233

3334
public RecordBuffer() {
3435
super();
@@ -52,4 +53,12 @@ public String getData() {
5253
getLastOffset());
5354
return result;
5455
}
56+
57+
public String getLabel() {
58+
return label;
59+
}
60+
61+
public void setLabel(String label) {
62+
this.label = label;
63+
}
5564
}

0 commit comments

Comments
 (0)