|
|
星星上的苦瓜 · 代码块 | Docusaurus· 2 月前 · |
|
|
叛逆的烈酒 · CDC 的已知问题、限制和错误 - SQL ...· 2 月前 · |
|
|
讲道义的烈酒 · Debezium-JSON--流式计算 ...· 3 天前 · |
|
|
叛逆的猴子 · 在Ubuntu上使用python ...· 2 年前 · |
|
|
英俊的大熊猫 · 在抖音直播ChatGPT,“不赚钱”交个朋友 ...· 2 年前 · |
|
|
正直的芹菜 · 基于Intel 集成显卡的 FFmpeg ...· 2 年前 · |
|
|
心软的茄子 · 错误代码及案例(二十六) - 知乎· 2 年前 · |
|
|
不敢表白的企鹅 · GitHub清除敏感信息操作 - 知乎· 2 年前 · |
流式计算 Flink版
Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。
Flink 支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如。
Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 消息,输出到 Kafka 等存储中。但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。
Debezium 为变更日志提供了统一的格式,这是一个 JSON 格式的从 MySQL product 表捕获的更新操作的简单示例::
{ "before": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.18 "after": { "id": 111, "name": "scooter", "description": "Big 2-wheel scooter", "weight": 5.15 "source": {...}, "op": "u", "ts_ms": 1589362330904, "transaction": null
MySQL 产品表有 4 列(
id
、
name
、
description
、
weight
)。上面的 JSON 消息是
products
表上的一条更新事件,其中
id = 111
的行的
weight
值从
5.18
更改为
5.15
。假设此消息已同步到 Kafka 主题
products_binlog
,则可以使用以下 DDL 来使用此主题并解析更改事件:
CREATE TABLE topic_products ( -- schema 与 MySQL 的 products 表完全相同 id BIGINT, name STRING, description STRING, weight DECIMAL(10, 2) ) WITH ( 'connector' = 'kafka', 'topic' = 'products_binlog', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', -- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息 'format' = 'debezium-json'
在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置
'value.converter.schemas.enable'
,用来在消息体中包含 schema 信息。然后,Debezium JSON 消息可能如下所示:
::为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项
'debezium-json.schema-include' = 'true'
(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。
在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源:
-- MySQL "products" 的实时物化视图 -- 计算相同产品的最新平均重量 SELECT name, AVG(weight) FROM topic_products GROUP BY name; -- 将 MySQL "products" 表的所有数据和增量更改同步到 -- Elasticsearch "products" 索引,供将来查找 INSERT INTO elasticsearch_products SELECT * FROM topic_products;
注意:仅适用于 Flink1.16 版本。
以下格式元数据可在表定义中作为只读(
VIRTUAL
) 列公开。
注意:格式元数据字段只有在相应的连接器支持相应的格式元数据时才可用。目前,只有 Kafka 连接器能够公开格式元数据字段。
|
Key |
数据类型 |
描述 |
|---|---|---|
|
|
|
描述有效载荷模式的 JSON 字符串。如果模式未包含在 Debezium 记录中,则为空。 |
|
|
|
连接器处理事件的时间戳。对应于 Debezium 记录中的
|
|
|
|
源系统创建事件的时间戳。与 Debezium 记录中的
|
|
|
|
源数据库。对应 Debezium 记录中的
|
|
|
|
源数据库模式。对应 Debezium 记录中的
|
|
|
|
源数据库表。与 Debezium 记录中的
|
|
|
|
各种源属性的映射。对应于 Debezium 记录中的
|
下面的示例展示了如何在 Kafka 中访问 Debezium 元数据字段:
CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
|
|
星星上的苦瓜 · 代码块 | Docusaurus 2 月前 |
|
|
心软的茄子 · 错误代码及案例(二十六) - 知乎 2 年前 |
|
|
不敢表白的企鹅 · GitHub清除敏感信息操作 - 知乎 2 年前 |