添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

MySQL CDC Pipeline 连接器允许从 MySQL 数据库读取快照数据和增量数据,并提供端到端的整库数据同步能力。 本文描述了如何设置 MySQL CDC Pipeline 连接器。

从 MySQL 读取数据同步到 Doris 的 Pipeline 可以定义如下:

source:
   type: mysql
   name: MySQL Source
   hostname: 127.0.0.1
   port: 3306
   username: admin
   password: pass
   tables: adb.\.*, bdb.user_table_[0-9]+, [app|web].order_\.*
   server-id: 5401-5404
sink:
  type: doris
  name: Doris Sink
  fenodes: 127.0.0.1:8030
  username: root
  password: pass
pipeline:
   name: MySQL to Doris Pipeline
   parallelism: 4
  连接器配置项
      String
      需要监视的 MySQL 数据库的表名。表名支持正则表达式,以监视满足正则表达式的多个表。
需要注意的是,点号(.)被视为数据库和表名的分隔符。 如果需要在正则表达式中使用点(.)来匹配任何字符,必须使用反斜杠对点进行转义。
例如,db0.\.*, db1.user_table_[0-9]+, db[1-2].[app|web]order_\.* schema-change.enabled optional Boolean 是否发送模式更改事件,下游 sink 可以响应模式变更事件实现表结构同步,默认为true。 server-id optional (none) String 读取数据使用的 server id,server id 可以是个整数或者一个整数范围,比如 '5400' 或 '5400-5408', 建议在 'scan.incremental.snapshot.enabled' 参数为启用时,配置成整数范围。因为在当前 MySQL 集群中运行的所有 slave 节点,标记每个 salve 节点的 id 都必须是唯一的。 所以当连接器加入 MySQL 集群作为另一个 slave 节点(并且具有唯一 id 的情况下),它就可以读取 binlog。 默认情况下,连接器会在 5400 和 6400 之间生成一个随机数,但是我们建议用户明确指定 Server id。 scan.incremental.snapshot.chunk.size optional Integer 表快照的块大小(行数),读取表的快照时,捕获的表被拆分为多个块。 scan.snapshot.fetch.size optional Integer 读取表快照时每次读取数据的最大条数。 scan.startup.mode optional initial String MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial","earliest-offset","latest-offset","specific-offset" 和 "timestamp"。 请查阅 启动模式 章节了解更多详细信息。 scan.startup.specific-offset.file optional (none) String 在 "specific-offset" 启动模式下,启动位点的 binlog 文件名。 scan.startup.specific-offset.pos optional (none) 在 "specific-offset" 启动模式下,启动位点的 binlog 文件位置。 scan.startup.specific-offset.gtid-set optional (none) String 在 "specific-offset" 启动模式下,启动位点的 GTID 集合。 scan.startup.specific-offset.skip-events optional (none) 在指定的启动位点后需要跳过的事件数量。 scan.startup.specific-offset.skip-rows optional (none) 在指定的启动位点后需要跳过的数据行数量。 connect.timeout optional Duration 连接器在尝试连接到 MySQL 数据库服务器后超时前应等待的最长时间。 connect.max-retries optional Integer 连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。 connection.pool.size optional Integer 连接池大小。 jdbc.properties.* optional String 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'. heartbeat.interval optional Duration 用于跟踪最新可用 binlog 偏移的发送心跳事件的间隔。 debezium.* optional (none) String 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 MySQL 服务器捕获数据更改。 例如: 'debezium.snapshot.mode' = 'never'. 查看更多关于 Debezium 的 MySQL 连接器属性 scan.incremental.close-idle-reader.enabled optional false Boolean 是否在快照结束后关闭空闲的 Reader。 此特性需要 flink 版本大于等于 1.14 并且 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 需要设置为 true。
若 flink 版本大于等于 1.15,'execution.checkpointing.checkpoints-after-tasks-finish.enabled' 默认值变更为 true,可以不用显式配置 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = true。

配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

  • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog。
  • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取
  • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog 的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改。
  • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。
  • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。
  • 数据类型映射

    MySQL中除GEOMETRYCOLLECTION之外的空间数据类型都会转换为 Json 字符串,格式固定,如:

    {"srid": 0 , "type": "xxx", "coordinates": [0, 0]}
    

    字段srid标识定义几何体的 SRS,如果未指定 SRID,则 SRID 0 是新几何体值的默认值。 由于 MySQL 8+ 在定义空间数据类型时只支持特定的 SRID,因此在版本较低的MySQL中,字段srid将始终为 0。

    字段type标识空间数据类型,例如POINT/LINESTRING/POLYGON

    字段coordinates表示空间数据的坐标

    对于GEOMETRYCOLLECTION,它将转换为 Json 字符串,格式固定,如:

    {"srid": 0 , "type": "GeometryCollection", "geometries": [{"type":"Point","coordinates":[10,10]}]}
    

    Geometrics字段是一个包含所有空间数据的数组。

    不同空间数据类型映射的示例如下:

    LINESTRING(3 0, 3 3, 3 5) {"coordinates":[[3,0],[3,3],[3,5]],"type":"LineString","srid":0} POLYGON((1 1, 2 1, 2 2, 1 2, 1 1)) {"coordinates":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],"type":"Polygon","srid":0} MULTIPOINT((1 1),(2 2)) {"coordinates":[[1,1],[2,2]],"type":"MultiPoint","srid":0} MultiLineString((1 1,2 2,3 3),(4 4,5 5)) {"coordinates":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],"type":"MultiLineString","srid":0} MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5))) {"coordinates":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],"type":"MultiPolygon","srid":0} GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20)) {"geometries":[{"type":"Point","coordinates":[10,10]},{"type":"Point","coordinates":[30,30]},{"type":"LineString","coordinates":[[15,15],[20,20]]}],"type":"GeometryCollection","srid":0}