将 Akka Streams 与适用于 Apache Kafka 的事件中心配合使用
本教程介绍如何在不更改协议客户端或运行你自己的群集的情况下通过 Apache Kafka 的事件中心支持来连接 Akka 流。
在本教程中,你将了解如何执行以下操作:
- 创建事件中心命名空间
- 克隆示例项目
- 运行 Akka Streams 生产者
- 运行 Akka Streams 使用者
注意
GitHub 上提供了此示例
先决条件
若要完成本教程,请确保具备以下先决条件:
- 通读用于 Apache Kafka 的事件中心一文。
- Azure 订阅。 如果没有 Azure 订阅,请在开始之前创建一个试用版订阅。
- Java 开发工具包 (JDK) 1.8+
- 在 Ubuntu 上运行
apt-get install default-jdk
,以便安装 JDK。 - 请确保设置 JAVA_HOME 环境变量,使之指向在其中安装了 JDK 的文件夹。
- 在 Ubuntu 上运行
- 下载和安装 Maven 二进制存档
- 在 Ubuntu 上,可以通过运行
apt-get install maven
来安装 Maven。
- 在 Ubuntu 上,可以通过运行
- Git
- 在 Ubuntu 上,可以通过运行
sudo apt-get install git
来安装 Git。
- 在 Ubuntu 上,可以通过运行
创建事件中心命名空间
要从任何事件中心服务进行发送或接收,需要事件中心命名空间。 有关详细信息,请参阅创建事件中心。 请确保复制事件中心连接字符串,以供将来使用。
克隆示例项目
获得事件中心连接字符串后,即可克隆适用于 Kafka 的 Azure 事件中心存储库并导航到 akka
子文件夹:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/akka/java
运行 Akka Streams 生产者
使用所提供的 Akka Streams 生产者事例,将消息发送到事件中心服务。
提供事件中心 Kafka 终结点
生产者 application.conf
更新 producer/src/main/resources/application.conf
中的 bootstrap.servers
和 sasl.jaas.config
值,以使用正确的身份验证将生产者定向到事件中心 Kafka 终结点。
akka.kafka.producer {
#Akka Kafka producer properties can be defined here
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
重要
将 {YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
通过命令行运行生产者
若要通过命令行运行生产者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestProducer"
生产者将开始向主题 test
中的事件中心发送事件,并将事件输出到 stdout。
运行 Akka Streams 使用者
使用所提供的使用者示例,接收来自事件中心的消息。
提供事件中心 Kafka 终结点
使用者 application.conf
更新 consumer/src/main/resources/application.conf
中的 bootstrap.servers
和 sasl.jaas.config
值,以使用正确的身份验证将使用者定向到事件中心 Kafka 终结点。
akka.kafka.consumer {
#Akka Kafka consumer properties defined here
wakeup-timeout=60s
# Properties defined by org.apache.kafka.clients.consumer.ConsumerConfig
# defined in this configuration section.
kafka-clients {
request.timeout.ms=60000
group.id=akka-example-consumer
bootstrap.servers="{YOUR.EVENTHUBS.FQDN}:9093"
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{YOUR.EVENTHUBS.CONNECTION.STRING}\";"
}
}
重要
将 {YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
通过命令行运行使用者
若要通过命令行运行使用者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):
mvn clean package
mvn exec:java -Dexec.mainClass="AkkaTestConsumer"
如果事件中心具有事件(例如,如果生产者也在运行),则使用者将开始接收来自主题 test
的事件。
请查看 Akka Streams Kafka 指南,了解有关 Akka Streams 的更多详细信息。
后续步骤
若要详细了解适用于 Kafka 的事件中心,请参阅以下文章: