Compartir a través de

在 Azure 事件中心集成 Apache Kafka Connect 支持

Apache Kafka Connect 是一个框架,可通过 Kafka 群集连接到任何外部系统(例如 MySQL、HDFS)和文件系统并与之进行数据的导入/导出操作。 本文介绍如何将 Kafka Connect 框架与事件中心配合使用。

本文介绍如何将 Kafka Connect 与事件中心集成,以及部署基本的 FileStreamSourceFileStreamSink 连接器。 虽然这些连接器不是用于生产的,但它们可以用于演示端到端 Kafka Connect 方案,让 Azure 事件中心充当 Kafka 中转站。

注意

GitHub 上提供了此示例。

先决条件

若要完成本演练,请确保具备以下先决条件:

创建事件中心命名空间

要从事件中心服务进行发送和接收,需要使用事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅创建事件中心。 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。 有关说明,请参阅获取事件中心连接字符串

克隆示例项目

克隆 Azure 事件中心存储库并导航到 tutorials/connect 子文件夹:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/connect

为事件中心配置 Kafka Connect

将 Kafka Connect 吞吐量从 Kafka 重定向到事件中心时,必须进行最低限定的重新配置。 以下 connect-distributed.properties 示例演示了如何配置 Connect,以便进行身份验证并与事件中心的 Kafka 终结点通信:

# e.g. namespace.servicebus.chinacloudapi.cn:9093
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=connect-cluster-group

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=connect
offset.flush.interval.ms=10000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

运行 Kafka Connect

此步骤在本地以分布式模式启动了一个 Kafka Connect 辅助角色,使用事件中心来保留群集状态。

  1. 在本地保存 connect-distributed.properties 文件。 请务必替换大括号中的所有值。
  2. 在本地计算机上导航到 Kafka 发行版的位置。
  3. 运行 ./bin/connect-distributed.sh /PATH/TO/connect-distributed.properties。 看到 'INFO Finished starting connectors and tasks' 时,Connect 辅助角色 REST API 就可以进行交互了。

注意

Kafka Connect 使用 Kafka AdminClient API 自动创建具有建议配置(包括压缩)的主题。 在 Azure 门户中快速查看命名空间就可以发现,Connect 辅助角色的内部主题已自动创建。

Kafka Connect 内部主题必须使用压缩。 如果未正确配置内部连接主题,事件中心团队不负责修复不正确的配置。

创建连接器

本部分介绍如何运转 FileStreamSourceFileStreamSink 连接器。

  1. 为输入和输出数据文件创建目录。

    mkdir ~/connect-quickstart
    
  2. 创建两个文件:一个文件包含 FileStreamSource 连接器读取的种子数据,另一个文件供 FileStreamSink 连接器写入数据。

    seq 1000 > ~/connect-quickstart/input.txt
    touch ~/connect-quickstart/output.txt
    
  3. 创建 FileStreamSource 连接器。 确保将大括号替换为主目录路径。

    curl -s -X POST -H "Content-Type: application/json" --data '{"name": "file-source","config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector","tasks.max":"1","topic":"connect-quickstart","file": "{YOUR/HOME/PATH}/connect-quickstart/input.txt"}}' http://localhost:8083/connectors
    

    运行该命令后,你应会在事件中心实例上看到事件中心 connect-quickstart

  4. 检查源连接器的状态。

    curl -s http://localhost:8083/connectors/file-source/status
    

    可以选择使用 Service Bus Explorer 来验证事件是否已进入 connect-quickstart 主题。

  5. 创建 FileStreamSink 连接器。 再次请确保将大括号替换为主目录路径。

    curl -X POST -H "Content-Type: application/json" --data '{"name": "file-sink", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max":"1", "topics":"connect-quickstart", "file": "{YOUR/HOME/PATH}/connect-quickstart/output.txt"}}' http://localhost:8083/connectors
    
  6. 检查接收器连接器的状态。

    curl -s http://localhost:8083/connectors/file-sink/status
    
  7. 验证是否已在文件之间复制数据,以及两个文件之间的数据是否相同。

    # read the file
    cat ~/connect-quickstart/output.txt
    # diff the input and output files
    diff ~/connect-quickstart/input.txt ~/connect-quickstart/output.txt
    

清理

Kafka Connect 创建的事件中心主题用于存储配置、偏移量和状态,这些存储的内容在 Connect 群集关闭后也会保留。 除非需要此持久性,否则建议删除这些主题。 你还可以删除在本演练中创建的 connect-quickstart 事件中心。

若要详细了解适用于 Kafka 的事件中心,请参阅以下文章: