在 Azure 事件中心集成 Apache Kafka Connect 支持(预览版)Integrate Apache Kafka Connect support on Azure Event Hubs (Preview)

随着业务需求的引入增加,对各种外部源和接收器的引入需求也增加。As ingestion for business needs increases, so does the requirement to ingest for various external sources and sinks. Apache Kafka Connect 提供的框架可以通过 Kafka 群集连接到任何外部系统(例如 MySQL、HDFS)和文件系统并与之进行数据的导入/导出。Apache Kafka Connect provides such framework to connect and import/export data from/to any external system such as MySQL, HDFS, and file system through a Kafka cluster. 本教程详细介绍如何将 Kafka Connect 框架与事件中心配合使用。This tutorial walks you through using Kafka Connect framework with Event Hubs.

本教程详细介绍如何将 Kafka Connect 与事件中心集成,以及如何部署基本的 FileStreamSource 和 FileStreamSink 连接器。This tutorial walks you through integrating Kafka Connect with an event hub and deploying basic FileStreamSource and FileStreamSink connectors. 此功能目前以预览版提供。This feature is currently in preview. 虽然这些连接器不是用于生产的,但它们可以用于演示端到端 Kafka Connect 方案,让 Azure 事件中心充当 Kafka 中转站。While these connectors are not meant for production use, they demonstrate an end-to-end Kafka Connect scenario where Azure Event Hubs acts as a Kafka broker.

备注

GitHub 上提供了此示例。This sample is available on GitHub.

在本教程中,我们将执行以下步骤:In this tutorial, you take the following steps:

  • 创建事件中心命名空间Create an Event Hubs namespace
  • 克隆示例项目Clone the example project
  • 为事件中心配置 Kafka ConnectConfigure Kafka Connect for Event Hubs
  • 运行 Kafka ConnectRun Kafka Connect
  • 创建连接器Create connectors

先决条件Prerequisites

若要完成本演练,请确保具备以下先决条件:To complete this walkthrough, make sure you have the following prerequisites:

创建事件中心命名空间Create an Event Hubs namespace

要从事件中心服务进行发送和接收,需要使用事件中心命名空间。An Event Hubs namespace is required to send and receive from any Event Hubs service. 有关创建命名空间和事件中心的说明,请参阅创建事件中心See Creating an event hub for instructions to create a namespace and an event hub. 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。Get the Event Hubs connection string and fully qualified domain name (FQDN) for later use. 有关说明,请参阅获取事件中心连接字符串For instructions, see Get an Event Hubs connection string.

克隆示例项目Clone the example project

克隆 Azure 事件中心存储库并导航到 tutorials/connect 子文件夹:Clone the Azure Event Hubs repository and navigate to the tutorials/connect subfolder:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

为事件中心配置 Kafka ConnectConfigure Kafka Connect for Event Hubs

将 Kafka Connect 吞吐量从 Kafka 重定向到事件中心时,必须进行最低限定的重新配置。Minimal reconfiguration is necessary when redirecting Kafka Connect throughput from Kafka to Event Hubs. 以下 connect-distributed.properties 示例演示了如何配置 Connect,以便进行身份验证并与事件中心的 Kafka 终结点通信:The following connect-distributed.properties sample illustrates how to configure Connect to authenticate and communicate with the Kafka endpoint on Event Hubs:

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.chinacloudapi.cn:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

运行 Kafka ConnectRun Kafka Connect

此步骤在本地以分布式模式启动了一个 Kafka Connect 辅助角色,使用事件中心来保留群集状态。In this step, a Kafka Connect worker is started locally in distributed mode, using Event Hubs to maintain cluster state.

  1. 在本地保存上述 connect-distributed.properties 文件。Save the above connect-distributed.properties file locally. 请务必替换大括号中的所有值。Be sure to replace all values in braces.
  2. 在本地计算机上导航到 Kafka 发行版的位置。Navigate to the location of the Kafka release on your machine.
  3. 运行 ./bin/connect-distributed.sh /PATH/TO/connect-distributed.propertiesRun ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties. 看到 'INFO Finished starting connectors and tasks' 时,Connect 辅助角色 REST API 就可以进行交互了。The Connect worker REST API is ready for interaction when you see 'INFO Finished starting connectors and tasks'.

备注

Kafka Connect 使用 Kafka AdminClient API 自动创建具有建议配置(包括压缩)的主题。Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. 在 Azure 门户中快速查看命名空间就可以发现,Connect 辅助角色的内部主题已自动创建。A quick check of the namespace in the Azure portal reveals that the Connect worker's internal topics have been created automatically.

Kafka Connect 内部主题必须使用压缩Kafka Connect internal topics must use compaction. 如果未正确配置内部连接主题,事件中心团队不负责修复不正确的配置。The Event Hubs team is not responsible for fixing improper configurations if internal Connect topics are incorrectly configured.

创建连接器Create connectors

此部分详细介绍如何启动 FileStreamSource 和 FileStreamSink 连接器。This section walks you through spinning up FileStreamSource and FileStreamSink connectors.

  1. 为输入和输出数据文件创建目录。Create a directory for input and output data files.

    mkdir ~/connect-quickstart
    
  2. 创建两个文件:一个文件包含种子数据,供 FileStreamSource 连接器读取,另一个文件供 FileStreamSink 连接器写入。Create two files: one file with seed data from which the FileStreamSource connector reads, and another to which our FileStreamSink connector writes.

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. 创建 FileStreamSource 连接器。Create a FileStreamSource connector. 确保将大括号替换为主目录路径。Be sure to replace the curly braces with your home directory path.

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    运行以上命令以后,应该会在事件中心实例上看到事件中心 connect-quickstartYou should see the Event Hub connect-quickstart on your Event Hubs instance after running the above command.

  4. 检查源连接器的状态。Check status of source connector.

    curl -s http://localhost:8083/connectors/file-source/status
    

    可以选择使用服务总线资源管理器来验证事件是否已到达 connect-quickstart 主题。Optionally, you can use Service Bus Explorer to verify that events have arrived in the connect-quickstart topic.

  5. 创建 FileStreamSink 连接器。Create a FileStreamSink Connector. 再次请确保将大括号替换为主目录路径。Again, make sure you replace the curly braces with your home directory path.

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. 检查接收器连接器的状态。Check the status of sink connector.

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. 验证是否已在文件之间复制数据,以及两个文件之间的数据是否相同。Verify that data has been replicated between files and that the data is identical across both files.

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

清理Cleanup

Kafka Connect 创建的事件中心主题用于存储配置、偏移量和状态,这些存储的内容在 Connect 群集关闭后也会保留。Kafka Connect creates Event Hub topics to store configurations, offsets, and status that persist even after the Connect cluster has been taken down. 除非需要这样进行保存,否则建议将这些主题删除。Unless this persistence is desired, it is recommended that these topics are deleted. 可能还需删除在本演练过程中创建的 connect-quickstart 事件中心。You may also want to delete the connect-quickstart Event Hub that were created during the course of this walkthrough.

后续步骤Next steps

若要详细了解适用于 Kafka 的事件中心,请参阅以下文章:To learn more about Event Hubs for Kafka, see the following articles: