Connector
No-lock Read
Parallel Read
Exactly-once Read
Incremental Snapshot Read
We need several steps to setup a Flink cluster with the provided connector.
-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'inventory',
'table-name' = 'products'
-- read snapshot and binlog data from mysql, and do some transformation, and show on the client
SELECT id, UPPER(name), description, weight FROM mysql_binlog;
Usage for DataStream API
Include following Maven dependency (available through Maven Central):
<dependency>
<groupId>org.apache.flink</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependencies need to be built based on master or release branches by yourself. -->
<version>3.0-SNAPSHOT</version>
</dependency>
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
.tableList("yourDatabaseName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute("Print MySQL Snapshot + Binlog");
Deserialization
The following JSON data show the change event in JSON format.
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
"source": {...},
"op": "u", // the operation type, "u" means this this is an update event
"ts_ms": 1589362330904, // the time at which the connector processed the event
"transaction": null
Note: Please refer Debezium documentation to know the meaning of each field.
In some cases, users can use the JsonDebeziumDeserializationSchema(true) Constructor to enabled include schema in the message. Then the Debezium JSON message may look like this:
"schema": {
"type": "struct",
"fields": [
"type": "struct",
"fields": [
"type": "int32",
"optional": false,
"field": "id"
"type": "string",
"optional": false,
"default": "flink",
"field": "name"
"type": "string",
"optional": true,
"field": "description"
"type": "double",
"optional": true,
"field": "weight"
"optional": true,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Value",
"field": "before"
"type": "struct",
"fields": [
"type": "int32",
"optional": false,
"field": "id"
"type": "string",
"optional": false,
"default": "flink",
"field": "name"
"type": "string",
"optional": true,
"field": "description"
"type": "double",
"optional": true,
"field": "weight"
"optional": true,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Value",
"field": "after"
"type": "struct",
"fields": {...},
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
"type": "string",
"optional": false,
"field": "op"
"type": "int64",
"optional": true,
"field": "ts_ms"
"optional": false,
"name": "mysql_binlog_source.inventory_1pzxhca.products.Envelope"
"payload": {
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
"source": {...},
"op": "u", // the operation type, "u" means this this is an update event
"ts_ms": 1589362330904, // the time at which the connector processed the event
"transaction": null
Usually, it is recommended to exclude schema because schema fields makes the messages very verbose which reduces parsing performance.