将 Apache Flink 与适用于 Apache Kafka 的 Azure 事件中心配合使用Use Apache Flink with Azure Event Hubs for Apache Kafka
本教程演示如何在不更改你的协议客户端或运行你自己的群集的情况下,将 Apache Flink 连接到事件中心。This tutorial shows you how to connect Apache Flink to an event hub without changing your protocol clients or running your own clusters. 如需详细了解事件中心对 Apache Kafka 使用者协议的支持,请参阅 Apache Kafka 的事件中心。For more information on Event Hubs' support for the Apache Kafka consumer protocol, see Event Hubs for Apache Kafka.
在本教程中,你将了解如何执行以下操作:In this tutorial, you learn how to:
- 创建事件中心命名空间Create an Event Hubs namespace
- 克隆示例项目Clone the example project
- 运行 Flink 制造者Run Flink producer
- 运行 Flink 使用者Run Flink consumer
先决条件Prerequisites
若要完成本教程,请确保具备以下先决条件:To complete this tutorial, make sure you have the following prerequisites:
- 通读用于 Apache Kafka 的事件中心一文。Read through the Event Hubs for Apache Kafka article.
- Azure 订阅。An Azure subscription. 如果没有 Azure 订阅,请在开始之前创建一个试用版订阅。If you do not have one, create a Trial Subscription before you begin.
- Java 开发工具包 (JDK) 1.7+Java Development Kit (JDK) 1.7+
- 在 Ubuntu 上运行
apt-get install default-jdk
,以便安装 JDK。On Ubuntu, runapt-get install default-jdk
to install the JDK. - 请确保设置 JAVA_HOME 环境变量,使之指向在其中安装了 JDK 的文件夹。Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
- 在 Ubuntu 上运行
- 下载和安装 Maven 二进制存档Download and install a Maven binary archive
- 在 Ubuntu 上,可以通过运行
apt-get install maven
来安装 Maven。On Ubuntu, you can runapt-get install maven
to install Maven.
- 在 Ubuntu 上,可以通过运行
- GitGit
- 在 Ubuntu 上,可以通过运行
sudo apt-get install git
来安装 Git。On Ubuntu, you can runsudo apt-get install git
to install Git.
- 在 Ubuntu 上,可以通过运行
创建事件中心命名空间Create an Event Hubs namespace
要从任何事件中心服务进行发送或接收,需要事件中心命名空间。An Event Hubs namespace is required to send or receive from any Event Hubs service. 有关创建命名空间和事件中心的说明,请参阅创建事件中心。See Creating an event hub for instructions to create a namespace and an event hub. 请确保复制事件中心连接字符串,以供将来使用。Make sure to copy the Event Hubs connection string for later use.
克隆示例项目Clone the example project
获得事件中心连接字符串后,克隆适用于 Kafka 的 Azure 事件中心存储库并导航到 flink
子文件夹:Now that you have the Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the flink
subfolder:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink
运行 Flink 制造者Run Flink producer
使用提供的 Flink 制造者事例,将消息发送到事件中心服务。Using the provided Flink producer example, send messages to the Event Hubs service.
提供事件中心 Kafka 终结点Provide an Event Hubs Kafka endpoint
producer.configproducer.config
更新 producer/src/main/resources/producer.config
中的 bootstrap.servers
和 sasl.jaas.config
值,以使用正确的身份验证将生产者定向到事件中心 Kafka 终结点。Update the bootstrap.servers
and sasl.jaas.config
values in producer/src/main/resources/producer.config
to direct the producer to the Event Hubs Kafka endpoint with the correct authentication.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
将 {YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。Replace {YOUR.EVENTHUBS.CONNECTION.STRING}
with the connection string for your Event Hubs namespace. 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。For instructions on getting the connection string, see Get an Event Hubs connection string. 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
通过命令行运行生产者Run producer from the command line
若要通过命令行运行生产者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):To run the producer from the command line, generate the JAR and then run from within Maven (or generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"
生产者现在将开始向主题 test
中的事件中心发送事件,并将事件输出到 stdout。The producer will now begin sending events to the event hub at topic test
and printing the events to stdout.
运行 Flink 使用者Run Flink consumer
使用所提供的使用者示例,接收来自事件中心的消息。Using the provided consumer example, receive messages from the event hub.
提供事件中心 Kafka 终结点Provide an Event Hubs Kafka endpoint
consumer.configconsumer.config
更新 consumer/src/main/resources/consumer.config
中的 bootstrap.servers
和 sasl.jaas.config
值,以使用正确的身份验证将使用者定向到事件中心 Kafka 终结点。Update the bootstrap.servers
and sasl.jaas.config
values in consumer/src/main/resources/consumer.config
to direct the consumer to the Event Hubs Kafka endpoint with the correct authentication.
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="$ConnectionString" \
password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
重要
将 {YOUR.EVENTHUBS.CONNECTION.STRING}
替换为事件中心命名空间的连接字符串。Replace {YOUR.EVENTHUBS.CONNECTION.STRING}
with the connection string for your Event Hubs namespace. 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。For instructions on getting the connection string, see Get an Event Hubs connection string. 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
通过命令行运行使用者Run consumer from the command line
若要通过命令行运行使用者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):To run the consumer from the command line, generate the JAR and then run from within Maven (or generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):
mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"
如果事件中心具有事件(例如,如果生产者也在运行),则使用者现在将开始接收来自主题 test
的事件。If the event hub has events (for example, if your producer is also running), then the consumer now begins receiving events from the topic test
.
查看 Flink 的 Kafka 连接器指南,了解有关将 Flink 连接到 Kafka 的更为详细的信息。Check out Flink's Kafka Connector Guide for more detailed information about connecting Flink to Kafka.
后续步骤Next steps
若要详细了解适用于 Kafka 的事件中心,请参阅以下文章:To learn more about Event Hubs for Kafka, see the following articles:
- 在事件中心镜像 Kafka 中转站Mirror a Kafka broker in an event hub
- 将 Apache Spark 连接到事件中心Connect Apache Spark to an event hub
- 将 Kafka Connect 与事件中心集成Integrate Kafka Connect with an event hub
- 了解 GitHub 上的示例Explore samples on our GitHub
- 将 Akka Streams 连接到事件中心Connect Akka Streams to an event hub
- 针对 Azure 事件中心的 Apache Kafka 开发人员指南Apache Kafka developer guide for Azure Event Hubs