Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ClickHouseSinkConfig {
public static final String DB_TOPIC_SPLIT_CHAR = "dbTopicSplitChar";
public static final String KEEPER_ON_CLUSTER = "keeperOnCluster";
public static final String DATE_TIME_FORMAT = "dateTimeFormats";
public static final String TOLERATE_STATE_MISMATCH = "tolerateStateMismatch";

public static final int MILLI_IN_A_SEC = 1000;
private static final String databaseDefault = "default";
Expand Down Expand Up @@ -92,6 +93,7 @@ public enum StateStores {
private final String keeperOnCluster;
private final Map<String, DateTimeFormatter> dateTimeFormats;
private final String clientVersion;
private final boolean tolerateStateMismatch;

public enum InsertFormats {
NONE,
Expand Down Expand Up @@ -263,6 +265,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
}
}
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1");
this.tolerateStateMismatch = Boolean.parseBoolean(props.getOrDefault(TOLERATE_STATE_MISMATCH, "false"));

LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
Expand Down Expand Up @@ -558,6 +561,16 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.SHORT,
"Client version"
);
configDef.define(TOLERATE_STATE_MISMATCH,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.LOW,
"Tolerate state mismatch. default: false",
group,
++orderInGroup,
ConfigDef.Width.SHORT,
"Tolerate state mismatch."
);
return configDef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public RangeState getOverLappingState(RangeContainer rangeContainer) {
// ZEROED [10, 20] Actual [0, 10]
if (actualMinOffset == 0)
return RangeState.ZERO;
// PREVIOUS [10, 20] Actual [5, 8]
if (actualMaxOffset < minOffset)
return RangeState.PREVIOUS;
// ERROR [10, 20] Actual [8, 19]
return RangeState.ERROR;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package com.clickhouse.kafka.connect.sink.kafka;

public enum RangeState {

ZERO(0), //This is for when it seems like the topic has been deleted/recreated
SAME(1),
PREFIX(2),
SUFFIX(3),
CONTAINS(4),
OVER_LAPPING(5),
NEW(6),
ERROR(7);
ERROR(7),
PREVIOUS(8);


private int rangeState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ public class Processing {

private ErrorReporter errorReporter = null;

public Processing(StateProvider stateProvider, DBWriter dbWriter) {
this.stateProvider = stateProvider;
this.dbWriter = dbWriter;
this.errorReporter = null;
}

public Processing(StateProvider stateProvider, DBWriter dbWriter, ErrorReporter errorReporter, ClickHouseSinkConfig clickHouseSinkConfig) {
this.stateProvider = stateProvider;
this.dbWriter = dbWriter;
Expand Down Expand Up @@ -177,8 +171,8 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
doInsert(rightRecords, rightRangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case ERROR:
throw new RuntimeException(String.format("State MISMATCH given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
default: //case ERROR:
throw new RuntimeException(String.format("ERROR State given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset()));
}
break;
Expand Down Expand Up @@ -211,8 +205,17 @@ public void doLogic(List<Record> records) throws IOException, ExecutionException
doInsert(rightRecords, rightRangeContainer);
stateProvider.setStateRecord(new StateRecord(topic, partition, rightRangeContainer.getMaxOffset(), rightRangeContainer.getMinOffset(), State.AFTER_PROCESSING));
break;
case ERROR:
throw new RuntimeException(String.format("State MISMATCH given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
case PREVIOUS:
if (clickHouseSinkConfig.isTolerateStateMismatch()) {
LOGGER.warn("State MISMATCH as batch already processed - skipping [{}] records for topic: [{}], partition: [{}], minOffset: [{}], maxOffset: [{}], storedMinOffset: [{}], storedMaxOffset: [{}]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset());
} else {
throw new RuntimeException(String.format("State MISMATCH as batch already processed - skipping [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], storedMinOffset: [%s], storedMaxOffset: [%s]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset()));
}
break;
default: //case ERROR:
throw new RuntimeException(String.format("ERROR State given [%s] records for topic: [%s], partition: [%s], minOffset: [%s], maxOffset: [%s], expectedMinOffset: [%s], expectedMaxOffset: [%s]",
records.size(), topic, partition, rangeContainer.getMinOffset(), rangeContainer.getMaxOffset(), stateRecord.getMinOffset(), stateRecord.getMaxOffset()));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package com.clickhouse.kafka.connect.sink.processing;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -18,6 +19,7 @@
import com.clickhouse.kafka.connect.sink.state.provider.InMemoryState;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.Assert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -58,7 +60,7 @@ public void ProcessAllAtOnceNewTest() throws IOException, ExecutionException, In
List<Record> records = createRecords("test", 1);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
}
Expand All @@ -73,7 +75,7 @@ public void ProcessSplitNewTest() throws IOException, ExecutionException, Interr
assertEquals(records.size(), recordsHead.size() + recordsTail.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
processing.doLogic(recordsTail);
Expand All @@ -86,7 +88,7 @@ public void ProcessAllNewTwiceTest() throws IOException, ExecutionException, Int
List<Record> records = createRecords("test", 1);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
processing.doLogic(records);
Expand All @@ -102,7 +104,7 @@ public void ProcessAllNewFailedSetStateAfterProcessingTest() throws IOException,
//List<Record> recordsTail = records.subList(splitPoint, records.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -118,7 +120,7 @@ public void ProcessContainsBeforeProcessingTest() throws IOException, ExecutionE
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -133,7 +135,7 @@ public void ProcessContainsAfterProcessingTest() throws IOException, ExecutionEx
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
processing.doLogic(containsRecords);
Expand All @@ -148,7 +150,7 @@ public void ProcessOverlappingBeforeProcessingTest() throws IOException, Executi
List<Record> containsRecords = records.subList(345,850);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
processing.doLogic(containsRecords);
Expand All @@ -166,7 +168,7 @@ public void ProcessSplitNewWithBeforeProcessingTest() throws IOException, Execut
assertEquals(records.size(), recordsHead.size() + recordsTail.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -183,7 +185,7 @@ public void ProcessDeletedTopicBeforeProcessingTest() throws IOException, Execut
List<Record> containsRecords = records.subList(0,150);
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(records);
assertEquals(records.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand Down Expand Up @@ -218,7 +220,7 @@ public void ProcessPartialOverlappingBeforeProcessingTest() throws IOException,
List<Record> recordsTail = records.subList(splitPointLow, records.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
StateRecord stateRecord = stateProvider.getStateRecord("test", 1);
Expand All @@ -237,11 +239,30 @@ public void ProcessPartialOverlappingAfterProcessingTest() throws IOException, E
List<Record> recordsTail = records.subList(splitPointLow, records.size());
StateProvider stateProvider = new InMemoryState();
DBWriter dbWriter = new InMemoryDBWriter();
Processing processing = new Processing(stateProvider, dbWriter);
Processing processing = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
processing.doLogic(recordsHead);
assertEquals(recordsHead.size(), dbWriter.recordsInserted());
processing.doLogic(recordsTail);
assertEquals(records.size(), dbWriter.recordsInserted());
}

@Test
@DisplayName("ProcessOldRecordsTest")
public void ProcessOldRecordsTest() throws IOException, ExecutionException, InterruptedException {
List<Record> records = createRecords("test", 1);
List<Record> recordsHead = records.subList(1, 2);
StateProvider stateProvider = new InMemoryState();
stateProvider.setStateRecord(new StateRecord("test", 1, 5000, 4000, State.AFTER_PROCESSING));
DBWriter dbWriter = new InMemoryDBWriter();
Processing processingWithoutConfig = new Processing(stateProvider, dbWriter, null, new ClickHouseSinkConfig(new HashMap<>()));
Assert.assertThrows(RuntimeException.class, () -> processingWithoutConfig.doLogic(recordsHead));

HashMap<String, String> config = new HashMap<>();
config.put(ClickHouseSinkConfig.TOLERATE_STATE_MISMATCH, "true");
ClickHouseSinkConfig clickHouseConfig = new ClickHouseSinkConfig(config);
Processing processing = new Processing(stateProvider, dbWriter, null, clickHouseConfig);
processing.doLogic(recordsHead);
assertEquals(0, dbWriter.recordsInserted());
}

}
Loading