-
Notifications
You must be signed in to change notification settings - Fork 2k
FAQ(ZH)
有的,直接看官网文档:https://ververica.github.io/flink-cdc-connectors/release-2.1/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/index.html 。
看文档时,大家可以看已经 release 过的版本,不要看 master 的文档。master 分支上的文档和 master 代码对应,都是处于开发状态的,暂未发布的。
和主流的 maven 项目版本管理相同,xxx-SNAPSHOT 版本都是对应开发分支的代码,需要用户自己下载源码并编译对应的jar, 用户应该使用已经 release 过的版本,比如 flink-sql-connector-mysql-cdc-2.1.0.jar,release 过的版本maven中心仓库才会有。
Flink CDC 项目中各个connector的依赖管理和Flink 项目中 connector 保持一致。flink-sql-connector-xx 是胖包,除了connector的代码外,还把 connector 依赖的所有三方包 shade 后打入,提供给 SQL 作业使用,用户只需要在 lib目录下添加该胖包即可。flink-connector-xx 只有该 connector 的代码,不包含其所需的依赖,提供 datastream 作业使用,用户需要自己管理所需的三方包依赖,有冲突的依赖需要自己做 exclude, shade 处理。
Flink CDC 项目 从 2.0.0 版本将 group id 从com.alibaba.ververica
改成 com.ververica
,这是为了让项目更加社区中立,让各个公司的开发者共建时更方便。所以在maven仓库找 2.x 的包时,路径是 /com/ververica
。
CDC 2.0 支持了无锁算法,支持并发读取,为了保证全量数据 + 增量数据的顺序性,依赖Flink 的 checkpoint机制,所以作业需要配置 checkpoint。 SQL 作业中配置方式:
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
DataStream 作业配置方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);
在解析binlog数据中的timestamp字段时,cdc 会使用到作业里配置的server-time-zone信息,也就是MySQL服务器的时区,如果这个时区没有和你的MySQL服务器时区一致,就会出现这个问题。
此外,如果是在DataStream作业中自定义列化器如 MyDeserializer implements DebeziumDeserializationSchema, 自定义的序列化器在解析 timestamp 类型数据时,需要参考下 RowDataDebeziumDeserializeSchema 中对 timestamp 类型的解析,用时给定的时区信息。
private TimestampData convertToTimestamp(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
支持的,从库需要配置 log-slave-updates = 1 使从实例也能将从主实例同步的数据写入从库的 binlog 文件中,如果主库开启了gtid mode,从库也需要开启。
log-slave-updates = 1
gtid_mode = on
enforce_gtid_consistency = on
通过 mysql cdc 表的with参数中,表名和库名均支持正则配置,比如 'table-name' ='user_.' 可以匹配表名 user_1, user_2,user_a表,注意正则匹配任意字符是'.' 而不是 '*', 其中点号表示任意字符,星号表示0个或多个,database-name也如此。
在 mysql cdc 表的 with 参数中指定 'scan.startup.mode' = 'latest-offset' 即可。
CDC 2.1 版本提供了 DataStream API: MysqlSource, 用户可以配置 includeSchemaChanges 表示是否需要DDL 事件,获取到 DDL 事件后自己写代码处理。
public void consumingAllEvents() throws Exception {
inventoryDatabase.createAndInitialize();
MySqlSource<String> mySqlSource =
MySqlSource.<String>builder()
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.databaseList(inventoryDatabase.getDatabaseName())
.tableList(inventoryDatabase.getDatabaseName() + ".products")
.username(inventoryDatabase.getUsername())
.password(inventoryDatabase.getPassword())
.serverId("5401-5404")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true) // 这里配置,输出DDL事件
.build();
... // 其他处理逻辑
}
Flink CDC 支持的,Q6 中 提供的 DataStream API 已经可以让用户获取 DDL 变更事件和数据变更事件,用户需要在此基础上,根据自己的业务逻辑和下游存储进行 DataStream 作业开发。
这个问题是因为 mysql 服务器 可以配置 binlog 过滤器,忽略了某些库的 binlog。用户可以通过 show master status 命令查看 Binlog_Ignore_DB 和 Binlog_Do_DB。
mysql> show master status;
+------------------+----------+--------------+------------------+----------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+----------------------+
| mysql-bin.000006 | 4594 | | | xxx:1-15 |
+------------------+----------+--------------+------------------+----------------------+
Q9: 作业报错 The connector is trying to read binlog starting at GTIDs xxx and binlog file 'binlog.000064', pos=89887992, skipping 4 events plus 1 rows, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed,怎么办呢?
出现这种错误是 作业正在读取的binlog文件在 MySQL 服务器已经被清理掉,这种情况一般是 MySQL 服务器上保留的 binlog 文件过期时间太短,可以将该值设置大一点,比如7天。
mysql> show variables like 'expire_logs_days';
mysql> set global expire_logs_days=7;
还有种情况是 flink cdc 作业消费binlog 太慢,这种一般分配足够的资源即可。
Q10: 作业报错 ConnectException: A slave with the same server_uuid/server_id as this slave has connected to the master,怎么办呢?
出现这种错误是 作业里使用的 server id 和其他作业或其他同步工具使用的server id 冲突了,server id 需要全局唯一,server id 是一个int类型整数。 在 CDC 2.x 版本中,source 的每个并发都需要一个server id,建议合理规划好server id,比如作业的 source 设置成了四个并发,可以配置 'server-id' = '5001-5004', 这样每个 source task 就不会冲突了。
Q11: 作业报错 ConnectException: Received DML ‘…’ for processing, binlog probably contains events generated with statement or mixed based replication format,怎么办呢?
出现这种错误是 MySQL 服务器配置不对,需要检查下 binlog_format 是不是 ROW? 可以通过下面的命令查看
mysql> show variables like '%binlog_format%';
这是因为用户配置的 MySQL 用户 使用的是 sha256 密码认证,需要 TLS 等协议传输密码。一种简单的方法是使允许 MySQL用户 支持原始密码方式访问。
mysql> ALTER USER 'username'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';
mysql> FLUSH PRIVILEGES;
Q13: 作业报错 EventDataDeserializationException: Failed to deserialize data of EventHeaderV4 .... Caused by: java.net.SocketException: Connection reset, 怎么办呢 ?
这个问题一般是网络原因引起,首先排查flink 集群 到 数据库之间的网络情况,其次可以调大 MySQL 服务器的网络参数。
mysql> set global slave_net_timeout = 120;
mysql> set global thread_pool_idle_timeout = 120;
Q14: 作业报错 The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. 怎么办呢 ?
出现这个问题的原因是的作业全量阶段读取太慢,在全量阶段读完后,之前记录的全量阶段开始时的 gtid 位点已经被 mysql 清理掉了。这种可以增大 mysql 服务器上 binlog 文件的保存时间,也可以调大 source 的并发,让全量阶段读取更快。
Flink Postgres CDC 只会在 checkpoint 完成的时候更新 Postgres slot 中的 LSN。因此如果发现磁盘使用率高的情况下,请先确认 checkpoint 是否开启。
Flink 中如果收到数据的 precision 大于在 Flink 中声明的类型的 precision 时,会将数据处理成 NULL。此时可以配置相应'debezium.decimal.handling.mode' = 'string'
将读取的数据用 STRING 类型 来处理。
请先确保 REPLICA IDENTITY 是 FULL。
TOAST 的数据比较大,为了节省 wal 的大小,如果 TOAST 数据没有变更,那么 wal2json plugin 就不会在更新后的数据中带上 toast 数据。为了避免这个问题,可以通过 'debezium.schema.refresh.mode'='columns_diff_exclude_unchanged_toast'
来解决。
当前 Flink Postgres CDC 在作业退出后并不会手动释放 slot,有两种方式可以解决该问题
- 前往 Postgres 中手动执行以下命令
select pg_drop_replication_slot('rep_slot');
ERROR: replication slot "rep_slot" is active for PID 162564
select pg_terminate_backend(162564); select pg_drop_replication_slot('rep_slot');
- pg source with 参数中添加
'debezium.slot.drop.on.stop' = 'true'
,在作业停止后自动清理 slot
可以的,可以在 Flink CDC 表的with 参数里 加下 'debezium.event.deserialization.failure.handling.mode'='warn' 参数,跳过脏数据,将脏数据打印到WARN日志里。 也可以配置 'debezium.event.deserialization.failure.handling.mode'='ignore', 直接跳过脏数据,不打印脏数据到日志。
支持,默认为 全量+增量 读取;使用copy.existing=false
参数设置为只读增量。
支持,checkpoint 会记录 ChangeStream 的 resumeToken,恢复的时候可以通过resumeToken重新恢复ChangeStream。其中 resumeToken 对应 oplog.rs
(MongoDB 变更日志collection) 的位置,oplog.rs
是一个固定容量的 collection。当 resumeToken 对应的记录在 oplog.rs
中不存在的时候,可能会出现 Invalid resumeToken 的异常。这种情况,在使用时可以设置合适oplog.rs
的集合大小,避免oplog.rs
保留时间过短,可以参考 https://docs.mongodb.com/manual/tutorial/change-oplog-size/ 另外,resumeToken 可以通过新到的变更记录和 heartbeat 记录来刷新。
MongoDB 原始的 oplog.rs
只有 INSERT, UPDATE, REPLACE, DELETE 这几种操作类型,没有保留更新前的信息,不能输出-U 消息,在 Flink 中只能实现 UPSERT 语义。在使用MongoDBTableSource 时,Flink planner 会自动进行 ChangelogNormalize 优化,补齐缺失的 -U 消息,输出完整的 +I, -U, +U, -D 四种消息, 代价是 ChangelogNormalize 优化的代价是该节点会保存之前所有 key 的状态。所以,如果是 DataStream 作业直接使用 MongoDBSource,如果没有 Flink planner 的优化,将不会自动进行 ChangelogNormalize,所以不能直接获取 —U 消息。想要获取更新前镜像值,需要自己管理状态,如果不希望自己管理状态,可以将 MongoDBTableSource 转换为
ChangelogStream 或者 RetractStream,借助 Flink planner 的优化能力补齐更新前镜像值,示例如下:
tEnv.executeSql("CREATE TABLE orders ( ... ) WITH ( 'connector'='mongodb-cdc',... )");
Table table = tEnv.from("orders")
.select($("*"));
tEnv.toChangelogStream(table)
.print()
.setParallelism(1);
env.execute();
仅支持订阅整库的 collection,筛选部分 collection 功能还不支持,例如配置 database
为 'mgdb',collection
为空字符串,则会订阅 'mgdb' 库下所有 collection。
目前还不支持。
MongoDB CDC 基于 ChangeStream 特性实现,ChangeStream 是 MongoDB 3.6 推出的新特性。MongoDB CDC 理论上支持 3.6 以上版本,建议运行版本 >= 4.0, 在低于3.6版本执行时,会出现错误: Unrecognized pipeline stage name: '$changeStream' 。
ChangeStream 需要 MongoDB 以副本集或者分片模式运行,本地测试可以使用单机版副本集 rs.initiate()
。在 standalone 模式下会出现错误:The $changestage is only supported on replica sets.
如果用户是创建在需要连接的db 下,需要在with参数里加下 'connection.options' = 'authSource=用户所在的db'。
不支持的,因为 MongoDB CDC 连接器是在 Flink CDC 项目中独立开发,并不依赖Debezium项目,所以不支持。
MongoDB CDC 全量读取阶段是不做 checkpoint 的,直到全量阶段读取完后才开始作 checkpoint,如果在全量读取阶段失败,MongoDB CDC 会重新读取存量数据。
可以使用在线挖掘的模式,不写入数据字典到 redo log 中,但是这样无法处理 DDL 语句。 生产环境默认策略读取 log 较慢,且默认策略会写入数据字典信息到 redo log 中导致日志量增加较多,可以添加如下 debezium 的配置项。 'log.mining.strategy' = 'online_catalog','log.mining.continuous.mine' = 'true'。如果使用 SQL 的方式,则需要在配置项中加上前缀 'debezium.',即:
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
Q2: 作业报错 Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table xxx. Use command: ALTER TABLE xxx ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS, 怎么办呢?
对于 oracle11 版本,debezium 会默认把 tableIdCaseInsensitive 设置为true, 导致表名被更新为小写,因此在oracle中查询不到 这个表补全日志设置,导致误报这个Supplemental logging not configured for table 错误”。 添加 debezium 的配置项 'database.tablename.case.insensitive' = 'false', 如果使用 SQL 的方式,则在表的 option 中添加配置项 'debezium.database.tablename.case.insensitive' = 'false'
添加 debezium 的配置项 'database.connection.adpter' = 'xstream', 如果使用 SQL 的方式,则在表的 option 中添加配置项 'debezium.database.connection.adpter' = 'xstream'
database-name 是数据库示例的名字,也就是 Oracle 的 SID schema-name 是表对应的 schema,一般而言,一个用户就对应一个 schema, 该用户的 schema 名等于用户名,并作为该用户缺省 schema。所以 schema-name 一般都是创建这个表的用户名,但是如果创建表的时候指定了 schema,则指定的 schema 则为 schema-name。 比如用 CREATE TABLE aaaa.testtable(xxxx) 的方式成功创建了表 testtable, 则 aaaa 为 schema-name。