快速入门:使用 Kafka 协议通过事件中心进行数据流式传输

此快速入门介绍如何在不更改协议客户端或运行自己的群集的情况下将数据流式传输到事件中心。 你将了解如何只需更改应用程序配置,即可使用生产者和使用者与事件中心通信。

备注

GitHub 上提供了此示例

先决条件

若要完成本快速入门,请确保符合以下先决条件:

创建事件中心命名空间

当你创建事件中心命名空间时,系统会自动为该命名空间启用 Kafka 终结点。 可以从使用 Kafka 协议的应用程序,将事件流式传输到事件中心。 按照使用 Azure 门户创建事件中心中的分步说明创建事件中心命名空间。 如果使用专用群集,请参阅在专用群集中创建命名空间和事件中心

备注

基本层不支持适用于 Kafka 的事件中心。

在事件中心内使用 Kafka 发送和接收消息

  1. 克隆用于 Kafka 的 Azure 事件中心存储库

  2. 导航到 azure-event-hubs-for-kafka/quickstart/java/producer

  3. src/main/resources/producer.config 中更新生产者的配置详细信息,如下所示:

    TLS/SSL:

    bootstrap.servers=NAMESPACENAME.servicebus.chinacloudapi.cn:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    

    重要

    {YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

    OAuth:

    bootstrap.servers=NAMESPACENAME.servicebus.chinacloudapi.cn:9093
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler;
    

    可以在此处的 GitHub 上找到示例处理程序类 CustomAuthenticateCallbackHandler 的源代码。

  4. 运行生产者代码并将事件流式传输到事件中心:

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  5. 导航到 azure-event-hubs-for-kafka/quickstart/java/consumer

  6. src/main/resources/consumer.config 中更新使用者的配置详细信息,如下所示:

    TLS/SSL:

    bootstrap.servers=NAMESPACENAME.servicebus.chinacloudapi.cn:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
    

    重要

    {YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

    OAuth:

    bootstrap.servers=NAMESPACENAME.servicebus.chinacloudapi.cn:9093
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
    sasl.login.callback.handler.class=CustomAuthenticateCallbackHandler;
    

    可以在此处的 GitHub 上找到示例处理程序类 CustomAuthenticateCallbackHandler 的源代码。

    可在此处找到 Kafka 的事件中心的所有 OAuth 示例。

  7. 使用 Kafka 客户端运行使用者代码并处理来自事件中心的事件:

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestConsumer"                                    
    

如果事件中心 Kafka 群集有事件,则现在开始从使用者接收这些事件。

后续步骤

本文介绍了如何在不更改协议客户端或运行自己的群集的情况下,将事件流式传输到事件中心。 若要了解详细信息,请参阅针对 Azure 事件中心的 Apache Kafka 开发人员指南