Skip to content

Commit 5efa349

Browse files
PatrickRenleonardBang
authored andcommitted
[mysql] Improve error message under the case that start reading from earliest but schema change happened before (#1724)
1 parent f1ea59b commit 5efa349

File tree

6 files changed

+138
-17
lines changed

6 files changed

+138
-17
lines changed

docs/content/connectors/mysql-cdc(ZH).md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,8 +589,10 @@ CREATE TABLE mysql_source (...) WITH (
589589
)
590590
```
591591

592-
**注意**MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"
592+
**注意**
593+
1. MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"
593594
该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
595+
2. 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。
594596

595597

596598
### DataStream Source

docs/content/connectors/mysql-cdc.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,8 +597,11 @@ CREATE TABLE mysql_source (...) WITH (
597597
)
598598
```
599599

600-
**Note:** MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
600+
**Notes:**
601+
1. MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
601602
"Binlog offset on checkpoint {checkpoint-id}". It could be useful if you want to restart the job from a specific checkpointed position.
603+
2. If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
604+
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.
602605

603606
### DataStream Source
604607

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/MySqlErrorHandler.java

Lines changed: 60 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616

1717
package com.ververica.cdc.connectors.mysql.debezium.task.context;
1818

19+
import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
20+
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
21+
import com.ververica.cdc.connectors.mysql.table.StartupMode;
1922
import io.debezium.DebeziumException;
2023
import io.debezium.connector.base.ChangeEventQueue;
2124
import io.debezium.connector.mysql.MySqlConnector;
2225
import io.debezium.connector.mysql.MySqlTaskContext;
2326
import io.debezium.pipeline.ErrorHandler;
2427
import io.debezium.relational.TableId;
28+
import org.apache.commons.lang3.exception.ExceptionUtils;
29+
import org.apache.kafka.connect.errors.ConnectException;
2530
import org.slf4j.Logger;
2631
import org.slf4j.LoggerFactory;
2732

@@ -38,12 +43,17 @@ public class MySqlErrorHandler extends ErrorHandler {
3843
Pattern.compile(
3944
"Encountered change event for table (.+)\\.(.+) whose schema isn't known to this connector");
4045

41-
MySqlTaskContext context;
46+
private final MySqlTaskContext context;
47+
private final MySqlSourceConfig sourceConfig;
4248

4349
public MySqlErrorHandler(
44-
String logicalName, ChangeEventQueue<?> queue, MySqlTaskContext context) {
50+
String logicalName,
51+
ChangeEventQueue<?> queue,
52+
MySqlTaskContext context,
53+
MySqlSourceConfig sourceConfig) {
4554
super(MySqlConnector.class, logicalName, queue);
4655
this.context = context;
56+
this.sourceConfig = sourceConfig;
4757
}
4858

4959
@Override
@@ -53,20 +63,56 @@ protected boolean isRetriable(Throwable throwable) {
5363

5464
@Override
5565
public void setProducerThrowable(Throwable producerThrowable) {
56-
if (producerThrowable.getCause() instanceof DebeziumException) {
57-
DebeziumException e = (DebeziumException) producerThrowable.getCause();
58-
String detailMessage = e.getMessage();
59-
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
60-
if (matcher.find()) {
61-
String databaseName = matcher.group(1);
62-
String tableName = matcher.group(2);
63-
TableId tableId = new TableId(databaseName, null, tableName);
64-
if (context.getSchema().schemaFor(tableId) == null) {
65-
LOG.warn("Schema for table " + tableId + " is null");
66-
return;
67-
}
66+
if (isTableNotFoundException(producerThrowable)) {
67+
Matcher matcher =
68+
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
69+
String databaseName = matcher.group(1);
70+
String tableName = matcher.group(2);
71+
TableId tableId = new TableId(databaseName, null, tableName);
72+
if (context.getSchema().schemaFor(tableId) == null) {
73+
LOG.warn("Schema for table " + tableId + " is null");
74+
return;
6875
}
6976
}
77+
78+
if (isSchemaOutOfSyncException(producerThrowable)) {
79+
super.setProducerThrowable(
80+
new SchemaOutOfSyncException(
81+
"Internal schema representation is probably out of sync with real database schema. "
82+
+ "The reason could be that the table schema was changed after the starting "
83+
+ "binlog offset, which is not supported when startup mode is set to "
84+
+ sourceConfig.getStartupOptions().startupMode,
85+
producerThrowable));
86+
return;
87+
}
88+
7089
super.setProducerThrowable(producerThrowable);
7190
}
91+
92+
private boolean isTableNotFoundException(Throwable t) {
93+
if (!(t.getCause() instanceof DebeziumException)) {
94+
return false;
95+
}
96+
DebeziumException e = (DebeziumException) t.getCause();
97+
String detailMessage = e.getMessage();
98+
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
99+
return matcher.find();
100+
}
101+
102+
private boolean isSchemaOutOfSyncException(Throwable t) {
103+
Throwable rootCause = ExceptionUtils.getRootCause(t);
104+
return rootCause instanceof ConnectException
105+
&& rootCause
106+
.getMessage()
107+
.endsWith(
108+
"internal schema representation is probably out of sync with real database schema")
109+
&& isSettingStartingOffset();
110+
}
111+
112+
private boolean isSettingStartingOffset() {
113+
StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
114+
return startupMode == StartupMode.EARLIEST_OFFSET
115+
|| startupMode == StartupMode.TIMESTAMP
116+
|| startupMode == StartupMode.SPECIFIC_OFFSETS;
117+
}
72118
}

flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,8 @@ public void configure(MySqlSplit mySqlSplit) {
165165
changeEventSourceMetricsFactory.getStreamingMetrics(
166166
taskContext, queue, metadataProvider);
167167
this.errorHandler =
168-
new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, taskContext);
168+
new MySqlErrorHandler(
169+
connectorConfig.getLogicalName(), queue, taskContext, sourceConfig);
169170
}
170171

171172
private void validateAndLoadDatabaseHistory(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2022 Ververica Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.ververica.cdc.connectors.mysql.debezium.task.context.exception;
18+
19+
/**
20+
* A wrapper class for clearly show the possible reason of a schema-out-of-sync exception thrown
21+
* inside Debezium.
22+
*/
23+
public class SchemaOutOfSyncException extends Exception {
24+
public SchemaOutOfSyncException(String message, Throwable cause) {
25+
super(message, cause);
26+
}
27+
}

flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
import org.apache.flink.core.testutils.CommonTestUtils;
2020
import org.apache.flink.table.api.DataTypes;
2121
import org.apache.flink.table.types.DataType;
22+
import org.apache.flink.util.ExceptionUtils;
2223

2324
import com.github.shyiko.mysql.binlog.BinaryLogClient;
2425
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
2526
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
2627
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
28+
import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
2729
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
2830
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
2931
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
@@ -65,7 +67,9 @@
6567

6668
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
6769
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
70+
import static org.junit.Assert.assertEquals;
6871
import static org.junit.Assert.assertNotNull;
72+
import static org.junit.Assert.assertThrows;
6973
import static org.junit.Assert.assertTrue;
7074

7175
/** Tests for {@link BinlogSplitReader}. */
@@ -363,6 +367,43 @@ public void testReadBinlogFromEarliestOffset() throws Exception {
363367
assertEqualsInOrder(Arrays.asList(expected), actual);
364368
}
365369

370+
@Test
371+
public void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception {
372+
customerDatabase.createAndInitialize();
373+
MySqlSourceConfig sourceConfig =
374+
getConfig(StartupOptions.earliest(), new String[] {"customers"});
375+
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
376+
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
377+
String tableId = customerDatabase.qualifiedTableName("customers");
378+
DataType dataType =
379+
DataTypes.ROW(
380+
DataTypes.FIELD("id", DataTypes.BIGINT()),
381+
DataTypes.FIELD("name", DataTypes.STRING()),
382+
DataTypes.FIELD("address", DataTypes.STRING()),
383+
DataTypes.FIELD("phone_number", DataTypes.STRING()));
384+
385+
// Add a column to the table
386+
addColumnToTable(mySqlConnection, tableId);
387+
388+
// Create reader and submit splits
389+
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
390+
BinlogSplitReader reader = createBinlogReader(sourceConfig);
391+
reader.submitSplit(split);
392+
393+
// An exception is expected here because the table schema is changed, which is not allowed
394+
// under earliest startup mode.
395+
Throwable throwable =
396+
assertThrows(Throwable.class, () -> readBinlogSplits(dataType, reader, 1));
397+
Optional<SchemaOutOfSyncException> schemaOutOfSyncException =
398+
ExceptionUtils.findThrowable(throwable, SchemaOutOfSyncException.class);
399+
assertTrue(schemaOutOfSyncException.isPresent());
400+
assertEquals(
401+
"Internal schema representation is probably out of sync with real database schema. "
402+
+ "The reason could be that the table schema was changed after the starting "
403+
+ "binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET",
404+
schemaOutOfSyncException.get().getMessage());
405+
}
406+
366407
@Test
367408
public void testReadBinlogFromBinlogFilePosition() throws Exception {
368409
// Preparations
@@ -1002,5 +1043,6 @@ private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
10021043
private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception {
10031044
connection.execute(
10041045
"ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213");
1046+
connection.commit();
10051047
}
10061048
}

0 commit comments

Comments
 (0)