将 Azure 事件中心上的 Apache Kafka Connect 支持与 Debezium 集成以进行变更数据捕获
变更数据捕获 (CDC) 是一项技术,用来跟踪为响应创建、更新和删除操作而在数据库表中进行的行级更改。 Debezium 是一个基于不同数据库中提供的变更数据捕获功能(例如,PostgreSQL 中的逻辑解码)构建的分布式平台。 它提供了一组 Kafka Connect 连接器,它们可以深入探索数据库表中的行级更改,并将它们转换为事件流,这些事件流随后会发送到 Apache Kafka。
本教程逐步介绍如何使用事件中心(适用于 Kafka)、Azure Database for PostgreSQL 和 Debezium 在 Azure 上设置基于变更数据捕获的系统。 该系统使用 Debezium PostgreSQL 连接器将数据库修改从 PostgreSQL 流式传输到事件中心内的 Kafka 主题。
注意
本文包含一个对 Azure 不再使用的术语的引用。 在从软件中删除该术语后,我们会将其从本文中删除。
在本教程中,我们将执行以下步骤:
- 创建事件中心命名空间
- 设置和配置 Azure Database for PostgreSQL
- 使用 Debezium PostgreSQL 连接器配置并运行 Kafka Connect
- 测试变更数据捕获
- (可选)通过
FileStreamSink
连接器使用变更数据事件
先决条件
要完成本演练,需要:
- Azure 订阅。 如果没有,请创建一个试用版订阅。
- Linux/MacOS
- Kafka 发行版(版本为 1.1.1,Scala 版本为 2.11),通过 kafka.apache.org 提供
- 通读用于 Apache Kafka 的事件中心简介文章
创建事件中心命名空间
要从事件中心服务进行发送和接收,需要使用事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅创建事件中心。 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。 有关说明,请参阅获取事件中心连接字符串。
设置和配置 Azure Database for PostgreSQL
Azure Database for PostgreSQL 是一项基于开放源代码 PostgreSQL 数据库引擎社区版本的关系数据库服务,提供了三种部署选项:单一服务器、灵活服务器和 Cosmos DB for PostgreSQL。 请按照这些说明使用 Azure 门户创建 Azure Database for PostgreSQL 服务器。
设置并运行 Kafka Connect
本部分涵盖了以下主题:
- Debezium 连接器安装
- 为事件中心配置 Kafka Connect
- 使用 Debezium 连接器启动 Kafka Connect 群集
下载并设置 Debezium 连接器
按照 Debezium 文档中的最新说明下载并设置连接器。
- 下载连接器的插件存档。 例如,若要下载连接器的
1.2.0
版本,请使用此链接 - https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gz - 提取 JAR 文件并将其复制到 Kafka Connect plugin.path。
为事件中心配置 Kafka Connect
将 Kafka Connect 吞吐量从 Kafka 重定向到事件中心时,必须进行最低限定的重新配置。 以下 connect-distributed.properties
示例演示了如何配置 Connect,以便进行身份验证并与事件中心的 Kafka 终结点通信:
重要
- Debezium 会为每个表自动创建一个主题,并自动创建一组元数据主题。 Kafka 主题对应于事件中心实例(事件中心)。 有关 Apache Kafka 到 Azure 事件中心的映射,请参阅 Kafka 和事件中心概念映射。
- 事件中心命名空间中的事件中心数量有不同的限制,具体取决于层级(基本、标准、高级或专用)。 有关这些限制,请参阅配额。
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.chinacloudapi.cn:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
重要
将 {YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
运行 Kafka Connect
此步骤在本地以分布式模式启动了一个 Kafka Connect 辅助角色,使用事件中心来保留群集状态。
- 在本地保存上述
connect-distributed.properties
文件。 请务必替换大括号中的所有值。 - 在本地计算机上导航到 Kafka 发行版的位置。
- 运行
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
并等待群集启动。
注意
Kafka Connect 使用 Kafka AdminClient API 自动创建具有建议配置(包括压缩)的主题。 在 Azure 门户中快速查看命名空间就可以发现,Connect 辅助角色的内部主题已自动创建。
Kafka Connect 内部主题必须使用压缩。 如果未正确配置内部连接主题,事件中心团队不负责修复不正确的配置。
配置并启动 Debezium PostgreSQL 源连接器
为 PostgreSQL 源连接器创建配置文件 (pg-source-connector.json
) - 根据你的 Azure PostgreSQL 实例替换这些值。
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.chinacloudapi.cn",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
提示
database.server.name
属性是一个逻辑名称,可标识所监视的特定 PostgreSQL 数据库服务器/群集并为其提供命名空间。
若要创建连接器的实例,请使用 Kafka Connect REST API 终结点:
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
若要检查连接器的状态,请执行以下命令:
curl -s http://localhost:8083/connectors/todo-connector/status
测试变更数据捕获
要查看变更数据捕获的操作方式,需要在 Azure PostgreSQL 数据库中创建/更新/删除记录。
首先连接到 Azure PostgreSQL 数据库(以下示例使用了 psql)。
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.chinacloudapi.cn -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.chinacloudapi.cn -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
创建一个表并插入记录
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
连接器现在应当发挥作用,并将变更数据事件发送到具有以下名称 my-server.public.todos
的事件中心主题,前提是你已将 my-server
用作 database.server.name
的值,并且 public.todos
是要跟踪其变更的表(根据 table.whitelist
配置)。
检查事件中心主题
让我们对该主题的内容自检一下,确保一切符合预期。 下面的示例使用了 kafkacat
,但你也可以使用此处列出的任何选项来创建使用者。
创建包含以下内容的名为 kafkacat.conf
的文件:
metadata.broker.list=<enter event hubs namespace>.servicebus.chinacloudapi.cn:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
注意
根据事件中心信息,更新 kafkacat.conf
中的 metadata.broker.list
和 sasl.password
属性。
在另一个终端中,启动一个使用者:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.chinacloudapi.cn:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
你应该会看到 JSON 有效负载,它们表示为响应你已添加到 todos
表中的行而在 PostgreSQL 中生成的变更数据事件。 下面是有效负载的片段:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fullfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
该事件包含 payload
及其 schema
(为简洁起见,此处进行了省略)。 在 payload
节中,请注意 create 操作 ("op": "c"
) 是如何表示的 - "before": null
表示它是一个新 INSERT
的行,after
提供行中各个列的值,source
提供从中选取此事件的 PostgreSQL 实例元数据,等等。
你可以对 update 或 delete 操作进行同样的尝试,并对变更数据事件进行自检。 例如,若要更新 configure and install connector
的任务状态(假设其 id
为 3
),可以执行以下命令:
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
(可选)安装 FileStreamSink 连接器
现在,你已在事件中心主题中捕获所有 todos
表更改,你将通过 FileStreamSink 连接器(在 Kafka Connect 中默认提供)来使用这些事件。
为连接器创建配置文件 (file-sink-connector.json
) - 根据你的文件系统替换 file
属性。
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
若要创建连接器并检查其状态,请执行以下命令:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
插入/更新/删除数据库记录,并在配置的输出接收器文件中监视记录:
tail -f /Users/foo/todos-cdc.txt
清理
Kafka Connect 创建的事件中心主题可用于存储配置、偏移量和状态,即使在 Connect 群集关闭后,仍会保留这些内容。 除非需要此持久性,否则建议删除这些主题。 还可以删除在本演练中创建的 my-server.public.todos
事件中心。
后续步骤
若要详细了解适用于 Kafka 的事件中心,请参阅以下文章: