添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
流式计算 Flink版

流式计算 Flink版

复制全文
Formats 参考
Debezium-JSON
复制全文
Debezium-JSON
文档反馈
问问助手

Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如。

  • 将增量数据从数据库同步到其他系统。
  • 日志审计
  • 数据库的实时物化视图。
  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 消息,输出到 Kafka 等存储中。但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。

如何使用 Debezium-JSON

Debezium 为变更日志提供了统一的格式,这是一个 JSON 格式的从 MySQL product 表捕获的更新操作的简单示例::

{
  "before": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.18
  "after": {
    "id": 111,
    "name": "scooter",
    "description": "Big 2-wheel scooter",
    "weight": 5.15
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904,
  "transaction": null

MySQL 产品表有 4 列( id name description weight )。上面的 JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15 。假设此消息已同步到 Kafka 主题 products_binlog ,则可以使用以下 DDL 来使用此主题并解析更改事件:

CREATE TABLE topic_products (
  -- schema 与 MySQL 的 products 表完全相同
  id BIGINT,
  name STRING,
  description STRING,
  weight DECIMAL(10, 2)
) WITH (
 'connector' = 'kafka',
 'topic' = 'products_binlog',
 'properties.bootstrap.servers' = 'localhost:9092',
 'properties.group.id' = 'testGroup',
  -- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息
 'format' = 'debezium-json'

在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 'value.converter.schemas.enable' ,用来在消息体中包含 schema 信息。然后,Debezium JSON 消息可能如下所示:
::为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 'debezium-json.schema-include' = 'true' (默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。
在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源:

-- MySQL "products" 的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;
-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引,供将来查找
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

支持的元数据 (Available Metadata)

注意:仅适用于 Flink1.16 版本。

以下格式元数据可在表定义中作为只读( VIRTUAL ) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。

Key

数据类型

描述

schema

STRING NULL

描述有效载荷模式的 JSON 字符串。如果模式未包含在 Debezium 记录中,则为空。

ingestion-timestamp

TIMESTAMP_LTZ(3) NULL

连接器处理事件的时间戳。对应于 Debezium 记录中的 ts_ms 字段。

source.timestamp

TIMESTAMP_LTZ(3) NULL

源系统创建事件的时间戳。与 Debezium 记录中的 source.ts_ms 字段相对应。

source.database

STRING NULL

源数据库。对应 Debezium 记录中的 source.db 字段(如果有)。

source.schema

STRING NULL

源数据库模式。对应 Debezium 记录中的 source.schema 字段(如果有)。

source.table

STRING NULL

源数据库表。与 Debezium 记录中的 source.table source.collection (如果有)相对应。

source.properties

MAP<STRING, STRING> NULL

各种源属性的映射。对应于 Debezium 记录中的 source 字段。

下面的示例展示了如何在 Kafka 中访问 Debezium 元数据字段:

CREATE TABLE KafkaTable (
  origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
  event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
  origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
  origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
  origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
  user_id BIGINT,
  item_id BIGINT,
  behavior STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',