将 Akka Streams 与适用于 Apache Kafka 的事件中心配合使用Using Akka Streams with Event Hubs for Apache Kafka

本教程演示如何在不更改协议客户端或运行自己的群集的情况下,将 Akka Streams 连接到事件中心。This tutorial shows you how to connect Akka Streams to an event hub without changing your protocol clients or running your own clusters. 适用于 Kafka 的 Azure 事件中心支持 Apache Kafka 版本 1.0Azure Event Hubs for the Kafka supports Apache Kafka version 1.0.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 创建事件中心命名空间Create an Event Hubs namespace
  • 克隆示例项目Clone the example project
  • 运行 Akka Streams 生产者Run Akka Streams producer
  • 运行 Akka Streams 使用者Run Akka Streams consumer

Note

GitHub 上提供了此示例This sample is available on GitHub

先决条件Prerequisites

若要完成本教程,请确保具备以下先决条件:To complete this tutorial, make sure you have the following prerequisites:

  • 通读用于 Apache Kafka 的事件中心一文。Read through the Event Hubs for Apache Kafka article.
  • Azure 订阅。An Azure subscription. 如果没有订阅,请在开始之前创建一个试用帐户If you do not have one, create a trial account before you begin.
  • Java 开发工具包 (JDK) 1.8+Java Development Kit (JDK) 1.8+
    • 在 Ubuntu 上运行 apt-get install default-jdk,以便安装 JDK。On Ubuntu, run apt-get install default-jdk to install the JDK.
    • 请确保设置 JAVA_HOME 环境变量,使之指向在其中安装了 JDK 的文件夹。Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
  • 下载安装 Maven 二进制存档Download and install a Maven binary archive
    • 在 Ubuntu 上,可以通过运行 apt-get install maven 来安装 Maven。On Ubuntu, you can run apt-get install maven to install Maven.
  • GitGit
    • 在 Ubuntu 上,可以通过运行 sudo apt-get install git 来安装 Git。On Ubuntu, you can run sudo apt-get install git to install Git.

创建事件中心命名空间Create an Event Hubs namespace

要从任何事件中心服务进行发送或接收,需要事件中心命名空间。An Event Hubs namespace is required to send or receive from any Event Hubs service. 有关详细信息,请参阅创建事件中心See Create an event hub for detailed information. 请确保复制事件中心连接字符串,以供将来使用。Make sure to copy the Event Hubs connection string for later use.

克隆示例项目Clone the example project

获得事件中心连接字符串后,即可克隆适用于 Kafka 的 Azure 事件中心存储库并导航到 akka 子文件夹:Now that you have a Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the akka subfolder:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java

运行 Akka Streams 生产者Run Akka Streams producer

使用所提供的 Akka Streams 生产者事例,将消息发送到事件中心服务。Using the provided Akka Streams producer example, send messages to the Event Hubs service.

提供事件中心 Kafka 终结点Provide an Event Hubs Kafka endpoint

生产者 application.confProducer application.conf

更新 producer/src/main/resources/application.conf 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将生产者定向到事件中心 Kafka 终结点。Update the bootstrap.servers and sasl.jaas.config values in producer/src/main/resources/application.conf to direct the producer to the Event Hubs Kafka endpoint with the correct authentication.

akka.kafka.producer {
    #Akka Kafka producer properties can be defined here


    # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
    # can be defined in this configuration section.
    kafka-clients {
        bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
        sasl.mechanism=PLAIN
        security.protocol=SASL_SSL
        sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

通过命令行运行生产者Run producer from the command line

若要通过命令行运行生产者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):To run the producer from the command line, generate the JAR and then run from within Maven (or generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"

生产者将开始向主题 test 中的事件中心发送事件,并将事件输出到 stdout。The producer begins sending events to the event hub at topic test, and prints the events to stdout.

运行 Akka Streams 使用者Run Akka Streams consumer

使用所提供的使用者示例,接收来自事件中心的消息。Using the provided consumer example, receive messages from the event hub.

提供事件中心 Kafka 终结点Provide an Event Hubs Kafka endpoint

使用者 application.confConsumer application.conf

更新 consumer/src/main/resources/application.conf 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将使用者定向到事件中心 Kafka 终结点。Update the bootstrap.servers and sasl.jaas.config values in consumer/src/main/resources/application.conf to direct the consumer to the Event Hubs Kafka endpoint with the correct authentication.

akka.kafka.consumer {
    #Akka Kafka consumer properties defined here
    wakeup-timeout=60s

    # Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
    # defined in this configuration section.
    kafka-clients {
       request.timeout.ms=60000
       group.id=akka-example-consumer

       bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
       sasl.mechanism=PLAIN
       security.protocol=SASL_SSL
       sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
    }
}

通过命令行运行使用者Run consumer from the command line

若要通过命令行运行使用者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):To run the consumer from the command line, generate the JAR and then run from within Maven (or generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"

如果事件中心具有事件(例如,如果生产者也在运行),则使用者将开始接收来自主题 test 的事件。If the event hub has events (for instance, if your producer is also running), then the consumer begins receiving events from topic test.

请查看 Akka Streams Kafka 指南,了解有关 Akka Streams 的更多详细信息。Check out the Akka Streams Kafka Guide for more detailed information about Akka Streams.

后续步骤Next steps

若要详细了解适用于 Kafka 的事件中心,请参阅以下文章:To learn more about Event Hubs for Kafka, see the following articles: