在 Azure 事件中心集成 Apache Kafka Connect 支持
Apache Kafka Connect 是一个框架,可通过 Kafka 群集连接到任何外部系统(例如 MySQL、HDFS)和文件系统并与之进行数据的导入/导出操作。 本文介绍如何将 Kafka Connect 框架与事件中心配合使用。
本文介绍如何将 Kafka Connect 与事件中心集成,以及部署基本的 FileStreamSource
和 FileStreamSink
连接器。 虽然这些连接器不是用于生产的,但它们可以用于演示端到端 Kafka Connect 方案,让 Azure 事件中心充当 Kafka 中转站。
注意
GitHub 上提供了此示例。
先决条件
若要完成本演练,请确保具备以下先决条件:
- Azure 订阅。 如果没有,请创建一个试用版订阅。
- Git
- Linux/MacOS
- 可从 kafka.apache.org 获得最新的 Kafka 版本
- 通读用于 Apache Kafka 的事件中心简介文章
创建事件中心命名空间
要从事件中心服务进行发送和接收,需要使用事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅创建事件中心。 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。 有关说明,请参阅获取事件中心连接字符串。
克隆示例项目
克隆 Azure 事件中心存储库并导航到 tutorials/connect 子文件夹:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect
为事件中心配置 Kafka Connect
将 Kafka Connect 吞吐量从 Kafka 重定向到事件中心时,必须进行最低限定的重新配置。 以下 connect-distributed.properties
示例演示了如何配置 Connect,以便进行身份验证并与事件中心的 Kafka 终结点通信:
# e.g. namespace.servicebus.chinacloudapi.cn:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
重要
将 {YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
运行 Kafka Connect
此步骤在本地以分布式模式启动了一个 Kafka Connect 辅助角色,使用事件中心来保留群集状态。
- 在本地保存
connect-distributed.properties
文件。 请务必替换大括号中的所有值。 - 在本地计算机上导航到 Kafka 发行版的位置。
- 运行
./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
。 看到'INFO Finished starting connectors and tasks'
时,Connect 辅助角色 REST API 就可以进行交互了。
注意
Kafka Connect 使用 Kafka AdminClient API 自动创建具有建议配置(包括压缩)的主题。 在 Azure 门户中快速查看命名空间就可以发现,Connect 辅助角色的内部主题已自动创建。
Kafka Connect 内部主题必须使用压缩。 如果未正确配置内部连接主题,事件中心团队不负责修复不正确的配置。
创建连接器
本部分介绍如何运转 FileStreamSource
和 FileStreamSink
连接器。
为输入和输出数据文件创建目录。
mkdir ~/connect-quickstart
创建两个文件:一个文件包含
FileStreamSource
连接器读取的种子数据,另一个文件供FileStreamSink
连接器写入数据。seq 1000 > ~/connect-quickstart/input.txt touch ~/connect-quickstart/output.txt
创建
FileStreamSource
连接器。 确保将大括号替换为主目录路径。curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
运行该命令后,你应会在事件中心实例上看到事件中心
connect-quickstart
。检查源连接器的状态。
curl -s http://localhost:8083/connectors/file-source/status
可以选择使用 Service Bus Explorer 来验证事件是否已进入
connect-quickstart
主题。创建 FileStreamSink 连接器。 再次请确保将大括号替换为主目录路径。
curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
检查接收器连接器的状态。
curl -s http://localhost:8083/connectors/file-sink/status
验证是否已在文件之间复制数据,以及两个文件之间的数据是否相同。
# read the file cat ~/connect-quickstart/output.txt # diff the input and output files diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
清理
Kafka Connect 创建的事件中心主题用于存储配置、偏移量和状态,这些存储的内容在 Connect 群集关闭后也会保留。 除非需要此持久性,否则建议删除这些主题。 你还可以删除在本演练中创建的 connect-quickstart
事件中心。
相关内容
若要详细了解适用于 Kafka 的事件中心,请参阅以下文章: