MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。
source:
type: mysql
name: MySQL Source
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
server-id: 5401-5404
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: pass
pipeline:
name: MySQL to Doris Pipeline
parallelism: 4
连接器配置项
String
需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.*
schema-change.enabled
optional
Boolean
是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。
server-id
optional
(none)
String
读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408',
建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。
scan.incremental.snapshot.chunk.size
optional
Integer
表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。
scan.snapshot.fetch.size
optional
Integer
读取表快照时每次读取数据的最大条数。
scan.startup.mode
optional
initial
String
MySQL CDC 消费者可选的启动模式,
合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。
请查阅 启动模式 章节了解更多详细信息。
scan.startup.specific-offset.file
optional
(none)
String
在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。
scan.startup.specific-offset.pos
optional
(none)
在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。
scan.startup.specific-offset.gtid-set
optional
(none)
String
在 "specific-offset" 启动模式下,启动位点的 GTID 集合。
scan.startup.specific-offset.skip-events
optional
(none)
在指定的启动位点后需要跳过的事件数量。
scan.startup.specific-offset.skip-rows
optional
(none)
在指定的启动位点后需要跳过的数据行数量。
connect.timeout
optional
Duration
连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。
connect.max-retries
optional
Integer
连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。
connection.pool.size
optional
Integer
连接池大小。
jdbc.properties.*
optional
String
传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'.
heartbeat.interval
optional
Duration
用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。
debezium.*
optional
(none)
String
将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。
例如: 'debezium.snapshot.mode' = 'never'
.
查看更多关于 Debezium 的 MySQL 连接器属性
scan.incremental.close-idle-reader.enabled
optional
false
Boolean
是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。
配置选项scan.startup.mode
指定 MySQL CDC 使用者的启动模式。有效枚举包括:
initial
(默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
earliest-offset
:跳过快照阶段,从可读取的最早 binlog 位点开始读取
latest-offset
:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
specific-offset
:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
timestamp
:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
数据类型映射
MySQL中除GEOMETRYCOLLECTION
之外的空间数据类型都会转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
字段srid
标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。
由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid
将始终为 0。
字段type
标识空间数据类型,例如POINT
/LINESTRING
/POLYGON
。
字段coordinates
表示空间数据的坐标
。
对于GEOMETRYCOLLECTION
,它将转换为 Json 字符串,格式固定,如:
{"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
Geometrics
字段是一个包含所有空间数据的数组。
不同空间数据类型映射的示例如下:
LINESTRING(3 0, 3 3, 3 5)
{"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0}
POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))
{"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0}
MULTIPOINT((1 1),(2 2))
{"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0}
MultiLineString((1 1,2 2,3 3),(4 4,5 5))
{"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0}
MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))
{"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0}
GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))
{"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}