将 Azure 事件中心上的 Apache Kafka Connect 支持与 Debezium 集成以进行变更数据捕获

变更数据捕获 (CDC) 是一项技术,用来跟踪为响应创建、更新和删除操作而在数据库表中进行的行级更改。 Debezium 是一个基于不同数据库中提供的变更数据捕获功能(例如,PostgreSQL 中的逻辑解码)构建的分布式平台。 它提供了一组 Kafka Connect 连接器,它们可以深入探索数据库表中的行级更改,并将它们转换为事件流,这些事件流随后会发送到 Apache Kafka

本教程逐步介绍如何使用事件中心(适用于 Kafka)、Azure Database for PostgreSQL 和 Debezium 在 Azure 上设置基于变更数据捕获的系统。 该系统使用 Debezium PostgreSQL 连接器将数据库修改从 PostgreSQL 流式传输到事件中心内的 Kafka 主题。

注意

本文包含一个对 Azure 不再使用的术语的引用。 在从软件中删除该术语后,我们会将其从本文中删除。

在本教程中,我们将执行以下步骤:

  • 创建事件中心命名空间
  • 设置和配置 Azure Database for PostgreSQL
  • 使用 Debezium PostgreSQL 连接器配置并运行 Kafka Connect
  • 测试变更数据捕获
  • (可选)通过 FileStreamSink 连接器使用变更数据事件

先决条件

要完成本演练,需要:

创建事件中心命名空间

要从事件中心服务进行发送和接收,需要使用事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅创建事件中心。 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。 有关说明,请参阅获取事件中心连接字符串

设置和配置 Azure Database for PostgreSQL

Azure Database for PostgreSQL 是一项基于开放源代码 PostgreSQL 数据库引擎社区版本的关系数据库服务,提供了三种部署选项:单一服务器、灵活服务器和 Cosmos DB for PostgreSQL。 请按照这些说明使用 Azure 门户创建 Azure Database for PostgreSQL 服务器。

设置并运行 Kafka Connect

本部分涵盖了以下主题:

  • Debezium 连接器安装
  • 为事件中心配置 Kafka Connect
  • 使用 Debezium 连接器启动 Kafka Connect 群集

下载并设置 Debezium 连接器

按照 Debezium 文档中的最新说明下载并设置连接器。

为事件中心配置 Kafka Connect

将 Kafka Connect 吞吐量从 Kafka 重定向到事件中心时,必须进行最低限定的重新配置。 以下 connect-distributed.properties 示例演示了如何配置 Connect,以便进行身份验证并与事件中心的 Kafka 终结点通信:

重要

  • Debezium 会为每个表自动创建一个主题,并自动创建一组元数据主题。 Kafka 主题对应于事件中心实例(事件中心)。 有关 Apache Kafka 到 Azure 事件中心的映射,请参阅 Kafka 和事件中心概念映射
  • 事件中心命名空间中的事件中心数量有不同的限制,具体取决于层级(基本、标准、高级或专用)。 有关这些限制,请参阅配额
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.chinacloudapi.cn:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

运行 Kafka Connect

此步骤在本地以分布式模式启动了一个 Kafka Connect 辅助角色,使用事件中心来保留群集状态。

  1. 在本地保存上述 connect-distributed.properties 文件。 请务必替换大括号中的所有值。
  2. 在本地计算机上导航到 Kafka 发行版的位置。
  3. 运行 ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties 并等待群集启动。

注意

Kafka Connect 使用 Kafka AdminClient API 自动创建具有建议配置(包括压缩)的主题。 在 Azure 门户中快速查看命名空间就可以发现,Connect 辅助角色的内部主题已自动创建。

Kafka Connect 内部主题必须使用压缩。 如果未正确配置内部连接主题,事件中心团队不负责修复不正确的配置。

配置并启动 Debezium PostgreSQL 源连接器

为 PostgreSQL 源连接器创建配置文件 (pg-source-connector.json) - 根据你的 Azure PostgreSQL 实例替换这些值。

{
    "name": "todo-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.chinacloudapi.cn",
        "database.port": "5432",
        "database.user": "<replace with database user name>",
        "database.password": "<replace with database password>",
        "database.dbname": "postgres",
        "database.server.name": "my-server",
        "plugin.name": "wal2json",
        "table.whitelist": "public.todos"
    }
}

提示

database.server.name 属性是一个逻辑名称,可标识所监视的特定 PostgreSQL 数据库服务器/群集并为其提供命名空间。

若要创建连接器的实例,请使用 Kafka Connect REST API 终结点:

curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors

若要检查连接器的状态,请执行以下命令:

curl -s http://localhost:8083/connectors/todo-connector/status

测试变更数据捕获

要查看变更数据捕获的操作方式,需要在 Azure PostgreSQL 数据库中创建/更新/删除记录。

首先连接到 Azure PostgreSQL 数据库(以下示例使用了 psql)。

psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.chinacloudapi.cn -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require

e.g. 

psql -h my-postgres.postgres.database.chinacloudapi.cn -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require

创建一个表并插入记录

CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));

INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');

连接器现在应当发挥作用,并将变更数据事件发送到具有以下名称 my-server.public.todos 的事件中心主题,前提是你已将 my-server 用作 database.server.name 的值,并且 public.todos 是要跟踪其变更的表(根据 table.whitelist 配置)。

检查事件中心主题

让我们对该主题的内容自检一下,确保一切符合预期。 下面的示例使用了 kafkacat,但你也可以使用此处列出的任何选项来创建使用者

创建包含以下内容的名为 kafkacat.conf 的文件:

metadata.broker.list=<enter event hubs namespace>.servicebus.chinacloudapi.cn:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>

注意

根据事件中心信息,更新 kafkacat.conf 中的 metadata.broker.listsasl.password 属性。

在另一个终端中,启动一个使用者:

export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.chinacloudapi.cn:9093
export TOPIC=my-server.public.todos

kafkacat -b $BROKER -t $TOPIC -o beginning

你应该会看到 JSON 有效负载,它们表示为响应你已添加到 todos 表中的行而在 PostgreSQL 中生成的变更数据事件。 下面是有效负载的片段:

{
    "schema": {...},
    "payload": {
        "before": null,
        "after": {
            "id": 1,
            "description": "setup postgresql on azure",
            "todo_status": "complete"
        },
        "source": {
            "version": "1.2.0.Final",
            "connector": "postgresql",
            "name": "fullfillment",
            "ts_ms": 1593018069944,
            "snapshot": "last",
            "db": "postgres",
            "schema": "public",
            "table": "todos",
            "txId": 602,
            "lsn": 184579736,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1593018069947,
        "transaction": null
    }

该事件包含 payload 及其 schema(为简洁起见,此处进行了省略)。 在 payload 节中,请注意 create 操作 ("op": "c") 是如何表示的 - "before": null 表示它是一个新 INSERT 的行,after 提供行中各个列的值,source 提供从中选取此事件的 PostgreSQL 实例元数据,等等。

你可以对 update 或 delete 操作进行同样的尝试,并对变更数据事件进行自检。 例如,若要更新 configure and install connector 的任务状态(假设其 id3),可以执行以下命令:

UPDATE todos SET todo_status = 'complete' WHERE id = 3;

(可选)安装 FileStreamSink 连接器

现在,你已在事件中心主题中捕获所有 todos 表更改,你将通过 FileStreamSink 连接器(在 Kafka Connect 中默认提供)来使用这些事件。

为连接器创建配置文件 (file-sink-connector.json) - 根据你的文件系统替换 file 属性。

{
    "name": "cdc-file-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
        "tasks.max": "1",
        "topics": "my-server.public.todos",
        "file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
    }
}

若要创建连接器并检查其状态,请执行以下命令:

curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors

curl http://localhost:8083/connectors/cdc-file-sink/status

插入/更新/删除数据库记录,并在配置的输出接收器文件中监视记录:

tail -f /Users/foo/todos-cdc.txt

清理

Kafka Connect 创建的事件中心主题可用于存储配置、偏移量和状态,即使在 Connect 群集关闭后,仍会保留这些内容。 除非需要此持久性,否则建议删除这些主题。 还可以删除在本演练中创建的 my-server.public.todos 事件中心。

后续步骤

若要详细了解适用于 Kafka 的事件中心,请参阅以下文章: