添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
相关文章推荐
豪情万千的上铺  ·  mysql ...·  2 周前    · 
曾经爱过的松树  ·  批量 kill mysql ...·  1 周前    · 
爱旅游的遥控器  ·  背景 · Bootstrap v5.3·  4 月前    · 
眼睛小的铁链  ·  Singapore - 2024 ...·  4 月前    · 
苦闷的水龙头  ·  ASP.NET Core .NET 8 ...·  4 月前    · 
踏实的紫菜汤  ·  Detailed Guide: Setup ...·  5 月前    · 
搜索

5.5. 部署 Debezium MySQL 连接器

download PDF

您可以使用以下任一方法部署 Debezium MySQL 连接器: 使用 AMQ Streams 自动创建包含连接器插件的镜像 。 这是首选的方法。 从 Dockerfile 构建自定义 Kafka Connect 容器镜像 第 5.5.5 节 “Debezium MySQL 连接器配置属性的描述”

5.5.1. 使用 AMQ Streams 部署 MySQL 连接器

从 Debezium 1.7 开始,部署 Debezium 连接器的首选方法是使用 AMQ Streams 来构建包含连接器插件的 Kafka Connect 容器镜像。 在部署过程中,您可以创建并使用以下自定义资源(CR): 定义 Kafka Connect 实例的 KafkaConnect CR,并包含有关镜像中需要包含连接器工件的信息。 提供连接器用来访问源数据库的信息的 KafkaConnector CR。在 AMQ Streams 启动 Kafka Connect pod 后,您可以通过应用 KafkaConnector CR 来启动连接器。 在 Kafka Connect 镜像的构建规格中,您可以指定可用于部署的连接器。对于每个连接器插件,您还可以指定您的部署可以使用的其他组件。例如,您可以添加 Apicurio Registry 工件或 Debezium 脚本组件。当 AMQ Streams 构建 Kafka Connect 镜像时,它会下载指定的工件,并将其合并到镜像中。 KafkaConnect CR 中的 spec.build.output 参数指定存储生成的 Kafka Connect 容器镜像的位置。容器镜像可以存储在 Docker registry 中,也可以存储在 OpenShift ImageStream 中。要将镜像存储在 ImageStream 中,您必须先创建 ImageStream,然后才能部署 Kafka Connect。镜像流不会被自动创建。 如果使用 KafkaConnect 资源创建集群,之后您无法使用 Kafka Connect REST API 创建或更新连接器。您仍然可以使用 REST API 来检索信息。 在 OpenShift 上使用 AMQ Streams 配置 Kafka 连接 在 OpenShift 中部署和升级 AMQ Streams 中的使用 AMQ Streams 自动创建新容器镜像

5.5.2. 使用 AMQ Streams 部署 Debezium MySQL 连接器

使用早期版本的 AMQ Streams 时,要在 OpenShift 上部署 Debezium 连接器,首先需要为连接器构建 Kafka Connect 镜像。在 OpenShift 上部署连接器的当前首选的方法是使用 AMQ Streams 中的构建配置来自动构建 Kafka Connect 容器镜像,其中包含您要使用的 Debezium 连接器插件。 在构建过程中,AMQ Streams Operator 会将 KafkaConnect 自定义资源中的输入参数(包括 Debezium 连接器定义)转换为 Kafka Connect 容器镜像。构建从 Red Hat Maven 存储库或其他配置的 HTTP 服务器下载必要的工件。 新创建的容器被推送到 .spec.build.output 中指定的容器 registry,用于部署 Kafka Connect 集群。AMQ Streams 构建 Kafka Connect 镜像后,您可以创建 KafkaConnector 自定义资源来启动构建中包含的连接器。 您可以访问安装了集群 Operator 的 OpenShift 集群。 AMQ Streams Operator 正在运行。 部署 Apache Kafka 集群,如在 OpenShift 中部署和升级 AMQ Streams 所述。 Kafka Connect 部署在 AMQ Streams 上 您有红帽构建的 Debezium 许可证。 已安装 OpenShift oc CLI 客户端,或者您可以访问 OpenShift Container Platform Web 控制台。 根据您要存储 Kafka Connect 构建镜像的方式,您需要 registry 权限或您必须创建 ImageStream 资源:

要将构建镜像存储在镜像 registry 中,如 Red Hat Quay.io 或 Docker Hub
  • 在 registry 中创建和管理镜像的帐户和权限。
将构建镜像存储为原生 OpenShift ImageStream
  • ImageStream 资源已部署到集群中,以存储新的容器镜像。您必须为集群明确创建 ImageStream。镜像流默认不可用。如需有关 ImageStreams 的更多信息,请参阅在 OpenShift Container Platform 上管理镜像流 。 登录 OpenShift 集群。 为连接器创建 Debezium KafkaConnect 自定义资源(CR),或修改现有的资源。例如,创建一个名为 dbz-connect.yaml KafkaConnect CR,用于指定 metadata.annotations spec.build 属性。以下示例显示了来自 dbz-connect.yaml 文件的摘录,该文件描述了 KafkaConnect 自定义资源。

    例 5.1. dbz-connect.yaml 文件,该文件定义包含 Debezium 连接器的 KafkaConnect 自定义资源

    在以下示例中,自定义资源被配置为下载以下工件: Debezium MySQL 连接器存档。 红帽构建的 Apicurio Registry 存档。Apicurio Registry 是一个可选组件。只有在打算在连接器中使用 Avro 序列化时,才添加 Apicurio Registry 组件。 Debezium 脚本 SMT 归档以及您要与 Debezium 连接器一起使用的关联脚本引擎。SMT 归档和脚本语言依赖项是可选组件。只有在打算使用 Debezium 是 基于内容的路由 SMT 或 过滤 SMT 时,才添加这些组件。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: debezium-kafka-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true" 1
spec:
  version: 3.3.1
  build: 2
    output: 3
      type: imagestream  4
      image: debezium-streams-connect:latest
    plugins: 5
      - name: debezium-connector-mysql
        artifacts:
          - type: zip 6
            url: https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.1.4.Final-redhat-00001/debezium-connector-mysql-2.1.4.Final-redhat-00001-plugin.zip  7
          - type: zip
            url: https://maven.repository.redhat.com/ga/io/apicurio/apicurio-registry-distro-connect-converter/2.3.0.Final-redhat-<build-number>/apicurio-registry-distro-connect-converter-2.3.0.Final-redhat-<build-number>.zip  8
          - type: zip
            url: https://maven.repository.redhat.com/ga/io/debezium/debezium-scripting/2.1.4.Final-redhat-00001/debezium-scripting-2.1.4.Final-redhat-00001.zip 9
          - type: jar
            url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.11/groovy-3.0.11.jar  10
          - type: jar
            url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.11/groovy-jsr223-3.0.11.jar
          - type: jar
            url: https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json3.0.11/groovy-json-3.0.11.jar
  bootstrapServers: debezium-kafka-cluster-kafka-bootstrap:9093
  ...
表 5.24. Kafka Connect 配置设置的描述
描述

strimzi.io/use-connector-resources 注解设置为 "true" ,以便 Cluster Operator 使用 KafkaConnector 资源在此 Kafka Connect 集群中配置连接器。 spec.build 配置指定构建镜像的位置,并列出要在镜像中包含的插件,以及插件工件的位置。 build.output 指定存储新构建镜像的 registry。 指定镜像输出的名称和镜像名称。 output.type 的有效值是 docker ,可推送到容器 registry,如 Docker Hub 或 Quay,或将镜像推送到内部 OpenShift ImageStream 的镜像流 。要使用 ImageStream,必须将 ImageStream 资源部署到集群中。有关在 KafkaConnect 配置中指定 build.output 的更多信息,请参阅在 OpenShift 中配置 AMQ Streams 中的 AMQ Streams Build schema 参考 插件配置 列出了您要包含在 Kafka Connect 镜像中的所有连接器。对于列表中的每个条目,指定一个插件 名称 ,以及有关构建连接器所需的工件的信息。另外,对于每个连接器插件,您还可以包含您要与连接器一起使用的其他组件。例如,您可以添加 Service Registry 工件或 Debezium 脚本组件。 artifacts.type 的值指定 artifacts.url 中指定的工件的文件类型。有效类型是 zip tgz jar 。Debezium 连接器存档以 .zip 文件格式提供。 type 值必须与 url 字段中引用的文件类型匹配。 artifacts.url 的值指定 HTTP 服务器的地址,如 Maven 存储库,用于存储连接器工件的文件。Debezium 连接器工件位于 Red Hat Maven 存储库中。OpenShift 集群必须有权访问指定的服务器。 (可选)指定用于下载 Apicurio Registry 组件的工件 类型和 url 。包括 Apicurio Registry 工件,只有在您希望连接器使用 Apache Avro 与 Red Hat build of Apicurio Registry 序列化事件键和值时,而不是使用默认的 JSON converter。 (可选)指定 Debezium 脚本 SMT 归档的工件 类型和 url ,以用于 Debezium 连接器。只有在打算使用 Debezium 基于内容的路由 SMT 或 过滤 SMT 时使用脚本 SMT 时,才包含脚本 SMT,您必须部署 JSR 223 兼容脚本实施,如 groovy。 (可选)为 JSR 223 兼容脚本实现的 JAR 文件指定工件 类型和 url ,这是 Debezium 脚本 SMT 所需的。 如果您使用 AMQ Streams 将连接器插件合并到 Kafka Connect 镜像中,每个所需脚本语言组件 artifacts.url 必须指定 JAR 文件的位置,而 artifacts.type 的值还必须设置为 jar 。无效的值会导致连接器在运行时失败。 要启用使用带有脚本 SMT 的 Apache Groovy 语言,示例中的自定义资源会检索以下库的 JAR 文件: groovy Groovy-jsr223 (协调代理) Groovy-json (用于解析 JSON 字符串的模块) 作为替代方案,Debebe 脚本 SMT 还支持使用 GraalVM JavaScript 的 JSR 223 实现。 输入以下命令将 KafkaConnect 构建规格应用到 OpenShift 集群:

oc create -f dbz-connect.yaml

根据自定义资源中指定的配置,Streams Operator 会准备要部署的 Kafka Connect 镜像。
构建完成后,Operator 将镜像推送到指定的 registry 或 ImageStream,并启动 Kafka Connect 集群。您配置中列出的连接器工件在集群中可用。 创建一个 KafkaConnector 资源来定义您要部署的每个连接器的实例。
例如,创建以下 KafkaConnector CR,并将它保存为 mysql-inventory-connector.yaml

例 5.2. 为 Debezium 连接器定义 KafkaConnector 自定义资源的 mysql-inventory-connector.yaml 文件

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnector
    metadata:
      labels:
        strimzi.io/cluster: debezium-kafka-connect-cluster
      name: inventory-connector-mysql 1
    spec:
      class: io.debezium.connector.mysql.MySqlConnector 2
      tasksMax: 1  3
      config:  4
        schema.history.internal.kafka.bootstrap.servers: debezium-kafka-cluster-kafka-bootstrap.debezium.svc.cluster.local:9092
        schema.history.internal.kafka.topic: schema-changes.inventory
        database.hostname: mysql.debezium-mysql.svc.cluster.local 5
        database.port: 3306   6
        database.user: debezium  7
        database.password: dbz  8
        database.server.id: 184054 9
        topic.prefix: inventory-connector-mysql 10
        table.include.list: inventory.*  11
        ...
表 5.25. 连接器配置设置的描述
描述

使用 Kafka Connect 集群注册的连接器名称。 连接器类的名称。 可同时运行的任务数量。 连接器的配置。 主机数据库实例的地址。 数据库实例的端口号。 Debezium 用于连接到数据库的帐户名称。 Debezium 用于连接到数据库用户帐户的密码。 连接器的唯一数字 ID。 数据库实例或集群的主题前缀。
指定的名称只能从字母数字字符或下划线括起。
由于主题前缀用作从这个连接器接收更改事件的任何 Kafka 主题的前缀,所以名称必须在集群中的连接器之间唯一。
此命名空间也用于相关的 Kafka Connect 模式的名称,如果您将连接器与 Avro 连接器集成,则对应的 Avro 模式的命名空间也用于。 连接器从中捕获更改事件的表列表。 运行以下命令来创建连接器资源:

oc create -n <namespace> -f <kafkaConnector>.yaml

oc create -n debezium -f mysql-inventory-connector.yaml

连接器注册到 Kafka Connect 集群,并开始针对 KafkaConnector CR 中的 spec.config.database.dbname 指定的数据库运行。连接器 pod 就绪后,Debezium 正在运行。 您现在已准备好 验证 Debezium MySQL 部署

5.5.3. 通过从 Dockerfile 构建自定义 Kafka Connect 容器镜像来部署 Debezium MySQL 连接器

要部署 Debezium MySQL 连接器,您必须构建包含 Debezium 连接器存档的自定义 Kafka Connect 容器镜像,然后将此容器镜像推送到容器 registry。然后,您需要创建以下自定义资源(CR): 定义 Kafka Connect 实例的 KafkaConnect CR。CR 中的 image 属性指定您创建用来运行 Debezium 连接器的容器镜像的名称。您可以将此 CR 应用到部署了 Red Hat AMQ Streams 的 OpenShift 实例。AMQ Streams 提供将 Apache Kafka 引入到 OpenShift 的 operator 和镜像。 定义 Debezium MySQL 连接器的 KafkaConnector CR。将此 CR 应用到应用 KafkaConnect CR 的同一 OpenShift 实例。 MySQL 正在运行,您完成了 设置 MySQL 以用于 Debezium 连接器 的步骤。 AMQ Streams 部署在 OpenShift 上,并运行 Apache Kafka 和 Kafka Connect。如需更多信息, 请参阅在 OpenShift 中部署和升级 AMQ Streams 。 podman 或 Docker 已安装。 您有在容器 registry 中创建和管理容器(如 quay.io docker.io )的帐户和权限,您要向其添加将运行 Debezium 连接器的容器。 为 Kafka Connect 创建 Debezium MySQL 容器: 创建一个 Dockerfile,它使用 registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12 作为基础镜像。例如,在终端窗口中输入以下命令:

cat <<EOF >debezium-container-for-mysql.yaml 1
FROM registry.redhat.io/amq7/amq-streams-kafka-32-rhel8:2.2.0-12
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium 2
RUN cd /opt/kafka/plugins/debezium/ \
&& curl -O https://maven.repository.redhat.com/ga/io/debezium/debezium-connector-mysql/2.1.4.Final-redhat-00001/debezium-connector-mysql-2.1.4.Final-redhat-00001-plugin.zip \
&& unzip debezium-connector-mysql-2.1.4.Final-redhat-00001-plugin.zip \
&& rm debezium-connector-mysql-2.1.4.Final-redhat-00001-plugin.zip
RUN cd /opt/kafka/plugins/debezium/
USER 1001
EOF
描述

您可以指定您想要的任何文件名。 指定 Kafka Connect 插件目录的路径。如果您的 Kafka Connect 插件目录位于不同的位置,请将这个路径替换为您的目录的实际路径。 该命令在当前目录中创建一个名为 debezium-container-for-mysql.yaml 的 Dockerfile。 从您在上一步中创建的 debezium-container-for-mysql.yaml Docker 文件中构建容器镜像。在包含该文件的目录中,打开终端窗口并输入以下命令之一:

podman build -t debezium-container-for-mysql:latest .
docker build -t debezium-container-for-mysql:latest .

上述命令使用名称 debezium-container-for-mysql 构建容器镜像。 将自定义镜像推送到容器 registry,如 quay.io 或内部容器 registry。容器 registry 必须可供部署镜像的 OpenShift 实例使用。输入以下命令之一:

podman push <myregistry.io>/debezium-container-for-mysql:latest
docker push <myregistry.io>/debezium-container-for-mysql:latest
  • 创建新的 Debezium MySQL KafkaConnect 自定义资源(CR)。例如,创建一个名为 dbz-connect.yaml KafkaConnect CR,用于指定 注解和 镜像 属性。以下示例显示了来自 dbz-connect.yaml 文件的摘录,该文件描述了 KafkaConnect 自定义资源。

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaConnect
    metadata:
      name: my-connect-cluster
      annotations:
        strimzi.io/use-connector-resources: "true" 1
    spec:
      image: debezium-container-for-mysql  2
      ...
    描述

    metadata.annotations 表示 KafkaConnector 资源用于配置在这个 Kafka Connect 集群中使用的 Cluster Operator。 spec.image 指定您为运行 Debezium 连接器而创建的镜像名称。此属性覆盖 Cluster Operator 中的 STRIMZI_DEFAULT_KAFKA_CONNECT_IMAGE 变量。 输入以下命令将 KafkaConnect CR 应用到 OpenShift Kafka Connect 环境:

    oc create -f dbz-connect.yaml

    命令添加一个 Kafka Connect 实例,用于指定您为运行 Debezium 连接器而创建的镜像名称。 创建一个 KafkaConnector 自定义资源来配置 Debezium MySQL 连接器实例。 您可以在 .yaml 文件中配置 Debezium MySQL 连接器,该文件指定连接器的配置属性。连接器配置可能会指示 Debezium 为 schema 和表的子集生成事件,或者可能会设置属性,以便 Debezium 忽略敏感、太大或不需要的指定栏中的值。 以下示例配置了一个 Debezium 连接器,它连接到 MySQL 主机 192.168.99.100 ,在端口 192.168.0.1 上捕获对 inventory 数据库的更改。 dbserver1 是服务器的逻辑名称。

    MySQL inventory-connector.yaml

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        name: inventory-connector-mysql  1
        labels:
          strimzi.io/cluster: my-connect-cluster
      spec:
        class: io.debezium.connector.mysql.MySqlConnector
        tasksMax: 1  2
        config:  3
          database.hostname: mysql  4
          database.port: 3306
          database.user: debezium
          database.password: dbz
          database.server.id: 184054  5
          topic.prefix: inventory-connector-mysql 6
          table.include.list: inventory  7
          schema.history.internal.kafka.bootstrap.servers: my-cluster-kafka-bootstrap:9092  8
          schema.history.internal.kafka.topic: schema-changes.inventory  9

    表 5.26. 连接器配置设置的描述
    描述

    连接器的名称。 任何时候只能有一个任务。由于 MySQL 连接器读取 MySQL 服务器的 binlog ,因此使用单一连接器任务可以确保正确的顺序和事件处理。Kafka Connect 服务使用连接器启动一个或多个可以正常工作的任务,并在 Kafka Connect 服务集群中自动分发运行的任务。如果有任何服务停止或崩溃,这些任务将重新分发到运行的服务。 连接器的配置。 数据库主机,它是运行 MySQL 服务器的容器的名称( mysql )。 连接器的唯一 ID。 MySQL 服务器或集群的主题前缀。此名称用作接收更改事件记录的所有 Kafka 主题的前缀。 连接器只捕获 inventory 表中的更改。 此连接器用来向数据库架构历史记录主题写入和恢复 DDL 语句的 Kafka 代理列表。重启后,连接器会在连接器开始读取时恢复 binlog 中存在的数据库的模式。 数据库架构历史记录主题的名称。本主题仅用于内部使用,不应供消费者使用。 使用 Kafka Connect 创建连接器实例。例如,如果您将 KafkaConnector 资源保存到 inventory-connector.yaml 文件中,您将运行以下命令:

    oc apply -f inventory-connector.yaml

    上述命令注册 inventory-connector ,连接器开始针对 KafkaConnector CR 中定义的 inventory 数据库运行。 有关您可以为 Debezium MySQL 连接器设置的配置属性的完整列表,请参阅 MySQL 连接器配置属性 。 连接器启动后,它会为连接器配置 MySQL 数据库 执行一致的快照 。然后,连接器开始为行级操作生成数据更改事件,并将事件记录流传输到 Kafka 主题。

    5.5.4. 验证 Debezium MySQL 连接器是否正在运行

    如果连接器正确启动且没有错误,它会为每个连接器配置为捕获的表创建一个主题。下游应用程序可以订阅这些主题以检索源数据库中发生的信息事件。 要验证连接器是否正在运行,您可以从 OpenShift Container Platform Web 控制台或 OpenShift CLI 工具(oc)执行以下操作: 验证连接器状态。 验证连接器是否生成主题。 验证主题是否填充了每个表初始快照过程中生成的读操作("op":"r")的事件。 Debezium 连接器部署到 OpenShift 上的 AMQ Streams。 已安装 OpenShift oc CLI 客户端。 访问 OpenShift Container Platform web 控制台。 使用以下方法之一检查 KafkaConnector 资源的状态: 在 OpenShift Container Platform Web 控制台中: 导航到 Home Search 。 在 Search 页面中,点 Resources 以打开 Select Resource 复选框,然后键入 KafkaConnector 。 在 KafkaConnectors 列表中,点您要检查的连接器的名称,如 inventory-connector-mysql 。 在 Conditions 部分中,验证 Type Status 列中的值是否已设置为 Ready True 。 在一个终端窗口中: 使用以下命令:

    oc describe KafkaConnector <connector-name> -n <project>

    oc describe KafkaConnector inventory-connector-mysql -n debezium

    该命令返回类似以下输出的状态信息:

    例 5.3. KafkaConnector 资源状态

    Name:         inventory-connector-mysql
    Namespace:    debezium
    Labels:       strimzi.io/cluster=debezium-kafka-connect-cluster
    Annotations:  <none>
    API Version:  kafka.strimzi.io/v1beta2
    Kind:         KafkaConnector
    Status:
      Conditions:
        Last Transition Time:  2021-12-08T17:41:34.897153Z
        Status:                True
        Type:                  Ready
      Connector Status:
        Connector:
          State:      RUNNING
          worker_id:  10.131.1.124:8083
        Name:         inventory-connector-mysql
        Tasks:
          Id:               0
          State:            RUNNING
          worker_id:        10.131.1.124:8083
        Type:               source
      Observed Generation:  1
      Tasks Max:            1
      Topics:
        inventory-connector-mysql.inventory
        inventory-connector-mysql.inventory.addresses
        inventory-connector-mysql.inventory.customers
        inventory-connector-mysql.inventory.geom
        inventory-connector-mysql.inventory.orders
        inventory-connector-mysql.inventory.products
        inventory-connector-mysql.inventory.products_on_hand
    Events:  <none>
  • 验证连接器是否已创建 Kafka 主题: 通过 OpenShift Container Platform Web 控制台。 导航到 Home Search 。 在 Search 页面中,点 Resources 打开 Select Resource 复选框,然后键入 KafkaTopic 。 从 KafkaTopics 列表中,点您要检查的主题的名称,例如 inventory-connector-mysql.inventory.orders--ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d 。 在 Conditions 部分中,验证 Type Status 列中的值是否已设置为 Ready True 。 在一个终端窗口中: 使用以下命令:

    oc get kafkatopics

    该命令返回类似以下输出的状态信息:

    例 5.4. KafkaTopic 资源状态

    NAME                                                                    CLUSTER               PARTITIONS   REPLICATION FACTOR   READY
    connect-cluster-configs                                                 debezium-kafka-cluster   1            1                    True
    connect-cluster-offsets                                                 debezium-kafka-cluster   25           1                    True
    connect-cluster-status                                                  debezium-kafka-cluster   5            1                    True
    consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a             debezium-kafka-cluster   50           1                    True
    inventory-connector-mysql--a96f69b23d6118ff415f772679da623fbbb99421                               debezium-kafka-cluster   1            1                    True
    inventory-connector-mysql.inventory.addresses---1b6beaf7b2eb57d177d92be90ca2b210c9a56480          debezium-kafka-cluster   1            1                    True
    inventory-connector-mysql.inventory.customers---9931e04ec92ecc0924f4406af3fdace7545c483b          debezium-kafka-cluster   1            1                    True
    inventory-connector-mysql.inventory.geom---9f7e136091f071bf49ca59bf99e86c713ee58dd5               debezium-kafka-cluster   1            1                    True
    inventory-connector-mysql.inventory.orders---ac5e98ac6a5d91e04d8ec0dc9078a1ece439081d             debezium-kafka-cluster   1            1                    True
    inventory-connector-mysql.inventory.products---df0746db116844cee2297fab611c21b56f82dcef           debezium-kafka-cluster   1            1                    True
    inventory-connector-mysql.inventory.products_on_hand---8649e0f17ffcc9212e266e31a7aeea4585e5c6b5   debezium-kafka-cluster   1            1                    True
    schema-changes.inventory                                                debezium-kafka-cluster   1            1                    True
    strimzi-store-topic---effb8e3e057afce1ecf67c3f5d8e4e3ff177fc55          debezium-kafka-cluster   1            1                    True
    strimzi-topic-operator-kstreams-topic-store-changelog---b75e702040b99be8a9263134de3507fc0cc4017b  debezium-kafka-cluster  1   1    True
  • 检查主题内容。 在终端窗口中输入以下命令:

  • oc exec -n <project>  -it <kafka-cluster> -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=<topic-name>

    oc exec -n debezium  -it debezium-kafka-cluster-kafka-0 -- /opt/kafka/bin/kafka-console-consumer.sh \
    >     --bootstrap-server localhost:9092 \
    >     --from-beginning \
    >     --property print.key=true \
    >     --topic=inventory-connector-mysql.inventory.products_on_hand

    指定主题名称的格式与 oc describe 命令在第 1 步中返回的格式相同,例如 inventory-connector-mysql.inventory.addresses 。 对于主题中的每个事件,命令会返回类似以下输出的信息:

    例 5.5. Debezium 更改事件的内容

    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"}],"optional":false,"name":"inventory-connector-mysql.inventory.products_on_hand.Key"},"payload":{"product_id":101}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mysql.inventory.products_on_hand.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"product_id"},{"type":"int32","optional":false,"field":"quantity"}],"optional":true,"name":"inventory-connector-mysql.inventory.products_on_hand.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"inventory-connector-mysql.inventory.products_on_hand.Envelope"},"payload":{"before":null,"after":{"product_id":101,"quantity":3},"source":{"version":"2.1.4.Final-redhat-00001","connector":"mysql","name":"inventory-connector-mysql","ts_ms":1638985247805,"snapshot":"true","db":"inventory","sequence":null,"table":"products_on_hand","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":156,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1638985247805,"transaction":null}}

    在上例中, 有效负载 值显示连接器快照从表 inventory.products_on_hand 中生成一个读取( "op" ="r" )事件。 product_id 记录的 "before" 状态为 null ,这表示记录没有之前的值。 "after" 状态对于 product_id 101 的项目的 quantity 显示为 3

    5.5.5. Debezium MySQL 连接器配置属性的描述

    Debezium MySQL 连接器有许多配置属性,您可以使用它们来实现应用程序的正确连接器行为。许多属性具有默认值。有关属性的信息按如下方式进行组织: 所需的连接器配置属性 高级连接器配置属性 数据库架构历史记录配置属性 ,用于控制 Debezium 如何处理从数据库架构历史记录主题读取的事件。 透传数据库模式历史记录属性 控制 数据库驱动程序行为的直通数据库驱动程序属性 。 除非默认值可用 否则需要以下配置属性。

    表 5.27. 所需的 Debezium MySQL 连接器配置属性
    属性 默认 描述

    没有默认值 连接器的唯一名称。尝试再次使用相同的名称注册失败。所有 Kafka Connect 连接器都需要此属性。 connector.class 没有默认值 连接器的 Java 类的名称。始终为 MySQL 连接器指定 io.debezium.connector.mysql.MySqlConnector tasks.max 应该为此连接器创建的最大任务数量。MySQL 连接器始终使用单个任务,因此不要使用这个值,因此始终可以接受默认值。 database.hostname 没有默认值 MySQL 数据库服务器的 IP 地址或主机名。 database.port MySQL 数据库服务器的整数端口号。 database.user 没有默认值 连接到 MySQL 数据库服务器时使用的 MySQL 用户的名称。 database.password 没有默认值 连接到 MySQL 数据库服务器时要使用的密码。 topic.prefix 没有默认值 为 Debezium 捕获更改的特定 MySQL 数据库服务器/集群提供命名空间的主题前缀。主题前缀应该在所有其他连接器中唯一,因为它用作接收此连接器发送的事件的所有 Kafka 主题名称的前缀。数据库服务器逻辑名称中只能使用字母数字字符、连字符、句点和下划线。

    不要更改此属性的值。如果您更改了 name 值,重启后,而不是继续向原始主题发出事件,连接器会将后续事件发送到名称基于新值的主题。连接器也无法恢复其数据库架构历史记录主题。 database.server.id 没有默认值 此数据库客户端的数字 ID,必须在 MySQL 集群中所有当前运行的数据库进程之间唯一。此连接器将 MySQL 数据库集群加入为另一个服务器(使用此唯一 ID),以便读取 binlog。 database.include.list 可选的、以逗号分隔的正则表达式列表,与要捕获更改的数据库的名称匹配。连接器不会捕获任何名称不在 database.include.list 中的更改。默认情况下,连接器捕获所有数据库中的更改。
    要匹配数据库的名称,Debebe 应用 您指定的正则表达式 。也就是说,指定的表达式与数据库的整个名称字符串匹配,它与数据库名称中可能存在的子字符串不匹配。
    如果您在配置中包含此属性,不要设置 database.exclude.list 属性。 database.exclude.list 可选的、以逗号分隔的正则表达式列表,与您不想捕获更改的数据库名称匹配。连接器捕获名称不在 database.exclude.list 中的任何数据库中的更改。
    要匹配数据库的名称,Debebe 应用 您指定的正则表达式 。也就是说,指定的表达式与数据库的整个名称字符串匹配,它与数据库名称中可能存在的子字符串不匹配。
    如果您在配置中包含此属性,不要设置 database.include.list 属性。 table.include.list 可选的、以逗号分隔的正则表达式列表,与您要捕获更改的表的完全限定表标识符匹配。连接器不会捕获没有在 table.include.list 中包含的任何表中的更改。每个标识符的格式都是 databaseName tableName 。默认情况下,连接器捕获捕获更改的每个数据库中的每个非系统表中的更改。
    要匹配表的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与表的整个名称字符串匹配,它与表名称中可能存在的子字符串不匹配。
    如果您在配置中包含此属性,不要设置 table.exclude.list 属性。 table.exclude.list 可选的、以逗号分隔的正则表达式列表,与您不想捕获更改的表的完全限定表标识符匹配。连接器捕获 table.exclude.list 中没有包含的任何更改。每个标识符的格式都是 databaseName tableName
    要匹配列的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与表的整个名称字符串匹配,它与表名称中可能存在的子字符串不匹配。
    如果您在配置中包含此属性,不要设置 table.include.list 属性。 column.exclude.list 可选的、以逗号分隔的正则表达式列表,与列的完全限定名称匹配,以便从更改事件记录值中排除。列的完全限定域名格式为 databaseName tableName . columnName
    要匹配列的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与列的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。如果您在配置中包含此属性,不要设置 column.include.list 属性。 column.include.list 可选的、以逗号分隔的正则表达式列表,与更改事件记录值中包含的列的完全限定名称匹配。列的完全限定域名格式为 databaseName tableName . columnName
    要匹配列的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与列的整个名称字符串匹配,它与列名称中可能存在的子字符串不匹配。
    如果您在配置中包含此属性,请不要设置 column.exclude.list 属性。 column.truncate.to. length .chars 一个可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定名称匹配。如果您要在一组列中超过属性名称中指定的字符数时,设置此属性。 length 设置为正整数值,例如 column.truncate.to.20.chars 。 列的完全限定域名会观察以下格式: databaseName . tableName . columnName 。要匹配列的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。 您可以在单个配置中指定多个长度不同的属性。 column.mask.with. length .chars 一个可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定名称匹配。如果您希望连接器屏蔽一组列的值,例如,如果它们包含敏感数据,则设置此属性。将 length 设置为正整数,将指定列中的数据替换为属性名称中 长度 指定的星号(DSL)字符数。 length 设为 0 ( 零)将指定列中的数据替换为空字符串。 列的完全限定域名会观察以下格式: databaseName . tableName . columnName 。要匹配列的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。 您可以在单个配置中指定多个长度不同的属性。 column.mask.hash. hashAlgorithm .with.salt. salt ; hash- hash.v2. hashAlgorithm .with.salt. salt 一个可选的、以逗号分隔的正则表达式列表,与基于字符的列的完全限定名称匹配。列的完全限定域名格式为 < databaseName>。< tableName & gt; . < columnName&gt ;。
    要匹配 column Debezium 的名称,请应用您指定为 正则表达式的正则表达式。也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。在生成的更改事件记录中,指定列的值替换为 pseudonyms。
    一个 pseudonym,它包括了通过应用指定的 hashAlgorithm salt 的结果的哈希值。根据使用的 hash 功能,会维护引用完整性,而列值则替换为伪nyms。支持的哈希功能在 Java Cryptography 架构标准算法名称文档中的 MessageDigest 部分 进行了描述。

    在以下示例中, CzQMA0cB5K 是一个随机选择的 salt。

    column.mask.hash.SHA-256.with.salt.CzQMA0cB5K = inventory.orders.customerName, inventory.shipment.customerName
    如有必要,伪的nym 会自动缩短到列的长度。连接器配置可以包含多个指定不同哈希算法和 salt 的属性。

    根据使用的 hashAlgorithm 、选择 salt 以及实际数据集,生成的数据集可能无法完全屏蔽。

    如果该值在不同的位置或系统中被哈希化,则应使用哈希策略版本 2 来确保光纤。 column.propagate.source.type 可选的、以逗号分隔的正则表达式列表,它与您希望连接器发送代表列元数据的完全限定名称匹配。当设置此属性时,连接器会将以下字段添加到事件记录的架构中: __debezium.source.column.type
    __debezium.source.column.length
    __debezium.source.column.scale
    这些参数分别传播列的原始类型和长度(用于变量带宽类型)。
    启用连接器发送这个额外数据有助于正确调整接收器数据库中的特定数字或基于字符的列。 列的完全限定域名会观察以下格式之一: databaseName . tableName . columnName , 或 databaseName . schemaName . tableName . columnName .
    要匹配列的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与列的整个名称字符串匹配;表达式不匹配列名称中可能存在的子字符串。 datatype.propagate.source.type 可选的、以逗号分隔的正则表达式列表,用于指定为数据库列定义的数据类型的完全限定名称。当设置此属性时,对于具有匹配数据类型的列,连接器会发出事件记录,该记录在 schema 中包含以下额外字段: __debezium.source.column.type
    __debezium.source.column.length
    __debezium.source.column.scale
    这些参数分别传播列的原始类型和长度(用于变量带宽类型)。
    启用连接器发送这个额外数据有助于正确调整接收器数据库中的特定数字或基于字符的列。 列的完全限定域名会观察以下格式之一: databaseName . tableName . typeName , 或 databaseName . schemaName . tableName . typeName .
    要匹配数据类型的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与数据类型的整个名称字符串匹配;表达式不匹配类型名称中可能存在的子字符串。 有关 MySQL 特定数据类型名称的列表,请查看 MySQL 数据类型映射 time.precision.mode adaptive_time_microseconds 时间、日期和时间戳可以通过不同类型的精度表示,包括:

    adaptive_time_microseconds (默认值)捕获日期、日期和时间戳值,与使用 millisecond、microsecond 或 nanosecond 精度值的数据库完全相同。 它们始终被捕获为微秒。


    connect 始终代表使用 Kafka Connect 的内置表示 Time, Date, 和 Timestamp,无论数据库列的精度是什么。 decimal.handling.mode 指定连接器如何处理 DECIMAL NUMERIC 列:

    准确 (默认值)代表它们使用二进制格式更改事件中代表的 java.math.BigDecimal 值。

    加倍 表示它们, 这可能会导致精度丢失,但更易于使用。

    字符串 对格式的字符串进行编码,这容易使用,但提供有关实际类型的语义信息会丢失。 bigint.unsigned.handling.mode 指定在更改事件中应该如何表示 BIGINT UNSIGNED 列。可能的设置是:

    long 代表 Java 的 值,它可能无法提供精度,但在消费者中容易使用。 是首选的设置。

    精确 使用 java.math.BigDecimal 来代表值,它们通过使用二进制表示和 Kafka Connect 的 org.apache.kafka.connect.data.Decimal 类型在更改事件中编码。在处理大于 2^63 的值时,请使用此设置,因为无法使用 long 来转换这些值。 include.schema.changes 指定连接器是否应该将数据库模式中的更改发布到与数据库服务器 ID 的名称相同的 Kafka 主题的布尔值。每个架构更改都会使用包含数据库名称且值包含 DDL 语句的键来记录。这独立于连接器内部记录数据库架构历史记录。 include.schema.comments false 指定连接器是否应该解析和发布元数据对象上的表和列注释的布尔值。启用此选项会对内存用量造成影响。逻辑架构对象的数量和大小严重影响 Debezium 连接器消耗的内存量,并为每一个字符串添加潜在的大型字符串数据可能非常昂贵。 include.query false 指定连接器是否应该包含生成更改事件的原始 SQL 查询的布尔值。

    如果将此选项设置为 true ,则必须使用 binlog_rows_query_log_events 选项设置为 ON 来配置 MySQL。当 include.query true 时,快照进程生成的事件不会存在查询。

    include.query 设置为 true 可能会公开通过更改事件中包含原始 SQL 语句来明确排除或屏蔽的表或字段。因此,默认设置为 false event.deserialization.failure.handling.mode 指定连接器在对 binlog 事件进行反序列化时应如何响应异常。

    会失败 传播异常,这表示有问题的事件及其 binlog 偏移,并导致连接器停止。

    会警告 日志中有问题的事件及其 binlog 偏移,然后跳过 event.

    忽略 通过有问题的事件传递,且不会记录任何问题的事件。 inconsistent.schema.handling.mode 指定连接器应如何响应与内部架构表示中不存在的表相关的 binlog 事件。也就是说,内部表示与数据库不一致。

    会失败 抛出一个异常,表示有问题的事件及其 binlog 偏移,并导致连接器停止。

    会记录 有问题的事件及其 binlog 偏移并跳过事件。

    跳过 通过有问题的事件,且不会记录任何问题的事件。 max.batch.size 正整数值,用于指定在每个连接器迭代过程中应处理的每个批处理事件的最大大小。默认值为 2048。 max.queue.size 正整数值,用于指定阻塞队列可以保存的最大记录数。当 Debezium 从数据库读取事件时,它会将事件放置在阻塞队列中,然后再将它们写入 Kafka。当连接器将消息写入 Kafka 或 Kafka 不可用时,阻塞队列可以提供从数据库读取更改事件的后端。当连接器定期记录偏移时,队列中保存的事件会被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。 max.queue.size.in.bytes 较长的整数值,指定块队列的最大卷(以字节为单位)。默认情况下,不会为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正长值。
    如果还设置了 max.queue.size ,当队列的大小达到任一属性指定的限制时,写入队列会被阻断。例如,如果您设置了 max.queue.size=1000 , 和 max.queue.size.in.bytes=5000 ,则在队列包含 1000 记录后,或者队列中记录的卷达到 5000 字节后会被阻断。 poll.interval.ms 正整数值,用于指定连接器在处理批处理事件前应该等待出现新更改事件的毫秒数。默认值为 500 毫秒,或 0.5 秒。 connect.timeout.ms 30000 一个正整数值,用于指定这个连接器在超时前应该等待的最大时间(毫秒)。默认值为 30 秒。 gtid.source.includes 没有默认值 以逗号分隔的正则表达式列表,与连接器用来在 MySQL 服务器上查找 binlog 位置的 GTID 集中的源 UUID 匹配。当设置此属性时,连接器只使用具有与指定 include 模式匹配的源 UUID 的 GTID 范围。 要匹配 GTID 的值,Debezium 应用 您指定的正则表达式 。也就是说,指定的表达式与整个 UUID 字符串匹配;它与 UUID 中可能存在的子字符串不匹配。
    如果您在配置中包含此属性,不要设置 gtid.source.excludes 属性。 gtid.source.excludes 没有默认值 以逗号分隔的正则表达式列表,与连接器用来在 MySQL 服务器上查找 binlog 位置的 GTID 集中的源 UUID 匹配。当设置此属性时,连接器只使用具有与任何指定 排除 模式不匹配的源 UUID 的 GTID 范围。 要匹配 GTID 的值,Debezium 应用 您指定的正则表达式 。也就是说,指定的表达式与整个 UUID 字符串匹配;它与 UUID 中可能存在的子字符串不匹配。
    如果您在配置中包含此属性,不要设置 gtid.source.includes 属性。 tombstones.on.delete 控制 删除 事件是否随后是 tombstone 事件。

    true - 删除操作由 delete 事件和后续 tombstone 事件表示。

    false - 仅发出 delete 事件。

    删除源记录后,发出 tombstone 事件(默认行为)后,如果为主题启用了 日志压缩 ,则 Kafka 可以完全删除与已删除行键相关的所有事件。 message.key.columns 指定连接器用来组成自定义消息键的表达式列表,以更改它发布到指定表的 Kafka 主题的事件记录。 默认情况下,Debezium 使用表的主键列作为它发出的记录的消息键。对于缺少主密钥的表,或者指定缺少主密钥的表的密钥,您可以根据一个或多个列配置自定义消息密钥。

    要为表建立自定义消息键,请列出表,后跟要用作消息键的列。每个列表条目的格式都是:

    < fully-qualified_tableName > : & lt;keyColumn > , <keyColumn >

    to a table key on multiple 列名称,在列名称之间插入逗号。 每个完全限定表名称都是正则表达式,格式为:

    < databaseName >。& lt;tableName >

    属性可以包含多个表的条目。使用分号分隔列表中的表条目。

    以下示例为表 inventory.customers purchase.orders :

    inventory 设置了消息键。customers:pk1,pk2; (rhacm).purchaseorders:pk3,pk4

    用于表 清单。客户 ,列 pk1 pk2 被指定为 message 键。对于任意 数据库中的订购 表,列 pk3 pk4 服务器作为消息键。 对您用来创建自定义消息键的列数没有限制。但是,最好使用指定唯一密钥所需的最小数量。 binary.handling.mode bytes 指定在更改事件中应该如何表示 二进制 列,如 blob 、二进制、 varbinary 。可能的设置:

    字节 表示二进制数据作为一个字节数组。

    base64 代表二进制数据作为一个 base64 编码的字符串。

    base64-url-safe 代表二进制数据作为 base64-url-safe-encoded 字符串。

    hex 代表二进制数据作为一个十六进制编码(base16)字符串。 schema.name.adjustment.mode 指定如何调整架构名称,以便与连接器使用的消息转换器兼容。可能的设置:
    none 不适用任何调整。
    Avro 将 Avro 类型名称中使用的字符替换为下划线。

    高级 MySQL 连接器配置属性

    下表描述了 高级 MySQL 连接器属性 。这些属性的默认值很少需要更改。因此,您不需要在连接器配置中指定它们。

    表 5.28. MySQL 连接器高级配置属性的描述
    属性 默认 描述

    connect.keep.alive 指定是否应使用单独的线程来确保与 MySQL 服务器/集群的连接保持处于活动状态的布尔值。 converters 没有默认值 枚举连接器可以使用 的自定义转换器 实例的符号名称的逗号分隔列表。
    例如, 布尔值
    需要此属性才能使连接器使用自定义转换器。 对于您为连接器配置的每个转换器,还必须添加一个 .type 属性,它指定了实现转换器接口的类的完全限定域名。 .type 属性使用以下格式:
    <converterSymbolicName> .type

    boolean.type: io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
    如果要进一步控制配置的转换器的行为,您可以添加一个或多个配置参数将值传递给转换器。要将这些额外配置参数与转换器关联,请为半语名称添加转换器的符号链接名称前缀。

    例如,要定义 一个选择器 参数,用于指定 布尔值 转换器进程的列子集,请添加以下属性:
    boolean.selector=db1.table1.*, db1.table2.column1
    table.ignore.builtin 指定是否应忽略内置系统表的布尔值。无论表包含和排除列表是什么,这个适用。默认情况下,系统表不包括在捕获其变化时,在更改任何系统表时不会生成任何事件。 database.ssl.mode disabled 指定是否使用加密的连接。可能的设置是:

    ,代表使用未加密的连接。

    如果服务器支持安全连接, 首选 建立加密连接。 如果服务器不支持安全连接,请回退到未加密的连接。

    需要 建立加密的连接;如果因为任何原因无法进行更新,

    verify_ca 的行为与 required 相同,但它还会根据配置的证书颁发机构(CA)证书验证服务器 TLS 证书,如果服务器 TLS 证书与任何有效的证书不匹配,则会失败。 CA 证书。

    verify_identity 的行为类似于 verify_ca ,但还要验证服务器证书是否与远程连接的主机匹配。 snapshot.mode Initial 指定在连接器启动时运行快照的条件。可能的设置为:

    初始 - 只有在没有为逻辑服务器名称记录偏移时,连接器才会运行快照。

    initial_only - 连接器仅在没有为逻辑服务器名称记录偏移时运行快照,然后停止;例如,它不会从 binlog.

    when_needed - 连接器 时读取更改事件。也就是说,如果没有偏移可用,或者当之前记录的偏移指定了服务器中不可用的 binlog 位置或 GTID 时。

    never 不会使用快照。首次使用逻辑服务器名称启动时,连接器会从 binlog 的开头读取。请小心配置此行为。只有在 binlog 保证包含数据库的整个历史记录时才有效。

    schema_only - 连接器运行模式的快照而不是数据。当您不需要主题包含数据的一致性快照时,此设置很有用,但只需要更改,因为连接器已启动。

    schema_only_recovery - 这是已捕获更改的连接器的恢复设置。当您重启连接器时,此设置启用了恢复损坏的或丢失的数据库架构历史记录主题。您可以定期将其设置为"clean up"一个意外增长的数据库架构历史记录主题。数据库架构历史记录主题需要无限保留。 snapshot.locking.mode minimal 控制连接器保存全局 MySQL 读锁定的时长,这会阻止对数据库的任何更新,同时执行快照。可能的设置有:

    最小 - 连接器仅包含全局读取锁定,用于连接器读取数据库模式和其他元数据的快照的初始部分。快照中的其余工作涉及从每个表中选择所有行。连接器可以使用 REPEATABLE READ 事务以一致的方式执行此操作。即使不再保存全局读取锁定,其他 MySQL 客户端正在更新数据库。

    minimal_percona - 连接器只保存连接器读取数据库模式和其他元数据 的全局备份锁定 。快照中的其余工作涉及从每个表中选择所有行。连接器可以使用 REPEATABLE READ 事务以一致的方式执行此操作。即使不再保留全局备份锁定,其他 MySQL 客户端也会更新数据库,也是如此。此模式不会将表刷新到磁盘,不受长时间运行的读取阻止,且仅适用于 Percona Server。

    extended - 延长 - 快照持续时间的所有写入。如果有客户端正在提交 MySQL 从 REPEATABLE READ 语义中排除的操作。

    none - 防止连接器在快照期间获取任何表锁定。虽然所有快照模式都允许使用此设置, 但只有在 快照运行时没有发生任何架构更改时,才能安全地使用。对于使用 MyISAM 引擎定义的表,表仍然会被锁定,尽管此属性被设置为 MyISAM 获取表锁定。这个行为与 InnoDB 引擎不同,它会获取行级别锁定。 snapshot.include.collection.list table.include.list 中指定的所有表 可选的、以逗号分隔的正则表达式列表,与表的完全限定名称(< databaseName>.<tableName&gt ;)匹配,以便包括在快照中。指定的项目必须在连接器的 table.include.list 属性中命名。只有在连接器的 snapshot.mode 属性设置为除 never 以外的值时,此属性才会生效。
    此属性不会影响增量快照的行为。
    要匹配表的名称,Debebe 应用您指定的正则表达式。 也就是说,指定的表达式与表的整个名称字符串匹配,它与表名称中可能存在的子字符串不匹配。 snapshot.select.statement.overrides 没有默认值 指定要包含在快照中的表行。如果您希望快照仅在表中包括行的子集,请使用此属性。此属性仅影响快照。它不适用于连接器从日志中读取的事件。 属性包含一个以逗号分隔的表名称列表,格式为 < databaseName>.<tableName&gt ;。例如,

    "snapshot.select.statement.overrides": "inventory.products,customers.orders"

    For the list 中的每个表,添加一个进一步的配置属性,用于指定连接器在进行快照时要在表上运行的 SELECT 语句。指定 SELECT 语句决定快照中包含的表行的子集。使用以下格式指定此 SELECT 语句属性的名称:

    snapshot.select.statement.overrides. < databaseName> . &lt ; tableName&gt ;。例如, snapshot.select.statement.overrides.customers.orders

    Example: 在包含 soft-delete 列 delete_flag customers.orders 表中,如果您希望快照只包含不是软删除的记录,请添加以下属性:
    "snapshot.select.statement.overrides": "customer.orders",
    "snapshot.select.statement.overrides.customer.orders": "SELECT * FROM [customers].[orders] WHERE delete_flag = 0 ORDER BY id DESC"
    在生成的快照中,连接器仅包含 delete_flag = 0 的记录。 min.row.count.to.stream.results 在快照过程中,连接器会查询配置连接器的每个表来捕获更改。连接器使用每个查询结果生成一个读取事件,其中包含该表中所有行的数据。此属性决定了 MySQL 连接器是否将表的结果置于内存中,但需要大量内存,或者流传输结果,但对非常大的表来说可能较慢,但不适用于非常大的表。此属性的设置指定表在连接器流结果前必须包含的最小行数。

    要跳过所有表大小检查,并且始终在快照期间流传输所有结果,请 将此属性设置为 0。 heartbeat.interval.ms 控制连接器将心跳信息发送到 Kafka 主题的频率。默认行为是连接器不会发送心跳信息。

    心跳消息可用于监控连接器是否从数据库接收更改事件。心跳消息可能会帮助减少连接器重启时需要重新发送的更改事件数量。要发送心跳消息,请将此属性设置为正整数,这表示心跳消息之间的毫秒数。 heartbeat.action.query 没有默认值 指定连接器发送心跳消息时连接器在源数据库上执行的查询。

    例如,这可以用来定期捕获源数据库中设置的已执行 GTID 的状态。

    INSERT INTO gtid_history_table (选择 mysql.gtid_executed) database.initial.statements 没有默认值 当 JDBC 连接(而不是读取事务日志的连接)与数据库建立时,要执行的 SQL 语句的分号分隔列表。要将分号指定为 SQL 语句中的字符而不是分隔符,请使用两个分号( ;; )。

    连接器可能会自行自行建立 JDBC 连接,因此此属性可以自行配置会话参数。它不用于执行 DML 语句。 snapshot.delay.ms 没有默认值 连接器在连接器启动时应等待的时间(毫秒)。如果您要在集群中启动多个连接器,此属性对于避免快照中断非常有用,这可能会导致连接器重新平衡。 snapshot.fetch.size 没有默认值 在快照中,连接器以行批处理形式读取表内容。此属性指定批处理中的最大行数。 snapshot.lock.timeout.ms 10000 正整数,用于指定在执行快照时等待获取表锁定的最长时间(以毫秒为单位)。如果连接器无法在这个时间段内获取表锁定,则快照会失败。请参阅 MySQL 连接器如何执行数据库快照 enable.time.adjuster 指示连接器是否将 2 位规格转换为 4 位数字的布尔值。当转换完全委派给数据库时,设置为 false

    MySQL 允许用户使用 2 位或 4 位插入年值。对于 2 位值,该值在 1970 - 2069 之间被映射到一年。默认行为是连接器进行转换。 sanitize.field.names 如果连接器配置将 key.converter value.converter 属性设置为 Avro converter,则为 true
    false 如果没有。 指明是否清理字段名称以遵循 Avro 命名要求 skipped.operations 以逗号分隔的操作类型列表,这些类型将在流期间跳过。操作包括: c 用于插入/创建, u 用于更新, d 用于删除, t 用于 truncates, none 用于不跳过任何操作。默认情况下跳过截断的操作。 signal.data.collection 没有默认值 用于向连接器发送信号的数据收集的完全限定名称。 https://access.redhat.com/documentation/zh-cn/red_hat_build_of_debezium/2.1.4/html-single/debezium_user_guide/index#debezium-signaling-enabling-signaling
    使用以下格式指定集合名称:
    < databaseName> . < tableName> incremental.snapshot.allow.schema.changes false 在增量快照期间允许架构更改。启用连接器时,会在增量快照期间检测模式更改,并重新选择当前的块以避免锁定 DDL。

    请注意,不支持对主密钥的更改,如果增量快照执行,可能会导致错误的结果。另一个限制是,如果架构更改只影响列的默认值,则不会检测到更改,直到 DDL 从 binlog 流处理。这不会影响快照事件的值,但快照事件的 schema 可能具有过时的默认值。 incremental.snapshot.chunk.size 连接器在增量快照块期间获取并读取内存的最大行数。增加块大小可提高效率,因为快照会运行更大的快照查询。但是,较大的块大小还需要更多内存来缓冲快照数据。将块大小调整为在您的环境中提供最佳性能的值。 provide.transaction.metadata false 确定连接器是否生成带有事务边界的事件,并使用事务元数据增强更改事件。如果您希望连接器进行此操作,请指定 true 。详情请参阅 事务元数据 topic.naming.strategy io.debezium.schema.DefaultTopicNamingStrategy 应该用来确定数据更改的主题名称、模式更改、事务、心跳事件等的 TopicNamingStrategy 类的名称,默认为 DefaultTopicNamingStrategy topic.delimiter 指定主题名称的分隔符,默认为 . topic.cache.size 10000 用于在绑定并发哈希映射中保存主题名称的大小。此缓存有助于确定与给定数据收集对应的主题名称。 topic.heartbeat.prefix __debezium-heartbeat 控制连接器向发送心跳消息的主题名称。主题名称具有此模式:

    topic.heartbeat.prefix . topic.prefix

    ,如果主题前缀是 fulfillment ,则默认主题名称为 __debezium-heartbeat.fulfillment topic.transaction 控制连接器向发送事务元数据消息的主题名称。主题名称具有此模式:

    topic.prefix . topic.transaction

    ,例如,如果主题前缀是 fulfillment ,则默认主题名称为 fulfillment.transaction

    Debezium 连接器数据库模式历史记录配置属性

    Debezium 提供了一组 schema.history.internal114 属性,用于控制连接器如何与 schema 历史记录主题交互。 下表描述了用于配置 Debezium 连接器的 schema.history.internal 属性。

    表 5.29. 连接器数据库模式历史记录配置属性
    属性 默认 描述

    schema.history.internal.kafka.topic 没有默认值 连接器存储数据库架构历史记录的 Kafka 主题的完整名称。 schema.history.internal.kafka.bootstrap.servers 没有默认值 连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。此连接用于检索之前由连接器存储的数据库架构历史记录,并编写从源数据库读取的每个 DDL 语句。每个对都应该指向 Kafka Connect 进程使用的相同 Kafka 集群。 schema.history.internal.kafka.recovery.poll.interval.ms 整数值,用于指定连接器在轮询保留数据时应该等待的最大毫秒数。默认值为 100ms。 schema.history.internal.kafka.query.timeout.ms 指定连接器在使用 Kafka admin 客户端获取集群信息时应等待的最大毫秒数。 schema.history.internal.kafka.create.timeout.ms 30000 指定连接器在使用 Kafka admin 客户端创建 kafka 历史记录主题时应等待的最大毫秒数。 schema.history.internal.kafka.recovery.attempts 连接器在连接器恢复失败前读取保留历史记录数据的次数上限。在收到数据后等待的最长时间为 recovery.attempts microseconds recovery.poll.interval.ms schema.history.internal.skip.unparseable.ddl false 指定连接器是否应该忽略不正确的或未知数据库语句或停止处理等布尔值,以便人可以解决这个问题。安全默认值为 false 。skip 仅应谨慎使用,因为它可能会在处理 binlog 时导致数据丢失或强制使用。 schema.history.internal.store.only.captured.tables.ddl false 指定连接器是否应该记录所有 DDL 语句的布尔值
    true 仅记录那些与 Debezium 捕获更改的表相关的 DDL 语句。请小心地设置为 true ,因为如果您更改了哪些表已捕获更改,则缺少数据可能会成为必要的。
    安全默认值为 false

    直通数据库模式历史记录属性,用于配置制作者和消费者客户端


    Debezium 依赖于 Kafka producer 将模式更改写入数据库架构历史记录主题。同样,它依赖于 Kafka 使用者在连接器启动时从数据库 schema 历史记录主题中读取。您可以通过将值分配给以 schema.history.internal.consumer 前缀开头的一组直通配置属性来定义 Kafka producer 消费者 客户端的配置。直通制作者和消费者数据库模式历史记录属性控制一系列行为,如这些客户端如何与 Kafka 代理安全连接,如下例所示:

    schema.history.internal.producer.security.protocol=SSL
    schema.history.internal.producer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
    schema.history.internal.producer.ssl.keystore.password=test1234
    schema.history.internal.producer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    schema.history.internal.producer.ssl.truststore.password=test1234
    schema.history.internal.producer.ssl.key.password=test1234
    schema.history.internal.consumer.security.protocol=SSL
    schema.history.internal.consumer.ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
    schema.history.internal.consumer.ssl.keystore.password=test1234
    schema.history.internal.consumer.ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
    schema.history.internal.consumer.ssl.truststore.password=test1234
    schema.history.internal.consumer.ssl.key.password=test1234

    Debezium 在将属性传递给 Kafka 客户端之前,从属性名称中分离前缀。 如需有关 Kafka producer 配置属性 Kafka 使用者配置属性 的更多详情,请参阅 Kafka 文档。

    Debezium 连接器 Kafka 信号配置属性

    当 MySQL 连接器配置为只读时,信号表的替代选择是信号 Kafka 主题。 Debezium 提供了一组 信号 192.168.1.0/24 属性,用于控制连接器如何与 Kafka 信号主题交互。 下表描述了 信号 属性。

    表 5.30. Kafka 信号配置属性
    属性 默认 描述

    signal.kafka.topic 没有默认值 连接器监控用于临时信号的 Kafka 主题的名称。 signal.kafka.bootstrap.servers 没有默认值 连接器用来建立到 Kafka 集群的初始连接的主机/端口对列表。每个对都应该指向 Kafka Connect 进程使用的相同 Kafka 集群。 signal.kafka.poll.timeout.ms 整数值,用于指定连接器在轮询信号时应等待的最大毫秒数。默认值为 100ms。

    Debezium 连接器传递信号 Kafka 使用者客户端配置属性

    Debezium 连接器提供传递配置信号 Kafka 使用者。透传信号属性以 signals.consumer.* 前缀开始。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 使用者。 与 数据库架构历史记录客户端的直通属性 一样,Debebe 会将属性中的前缀从属性分离,然后再将它们传递给 Kafka 信号消费者。

  •