将 Apache Flink 与适用于 Apache Kafka 的 Azure 事件中心配合使用Use Apache Flink with Azure Event Hubs for Apache Kafka

本教程演示如何在不更改你的协议客户端或运行你自己的群集的情况下,将 Apache Flink 连接到事件中心。This tutorial shows you how to connect Apache Flink to an event hub without changing your protocol clients or running your own clusters. Azure 事件中心支持 Apache Kafka 版本 1.0Azure Event Hubs supports Apache Kafka version 1.0..

使用 Apache Kafka 的主要优势之一是它可连接的框架的生态系统。One of the key benefits of using Apache Kafka is the ecosystem of frameworks it can connect to. 事件中心将 Kafka 的灵活性与 Azure 生态系统的可伸缩性、一致性和支持结合在一起。Event Hubs combines the flexibility of Kafka with the scalability, consistency, and support of the Azure ecosystem.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 创建事件中心命名空间Create an Event Hubs namespace
  • 克隆示例项目Clone the example project
  • 运行 Flink 制造者Run Flink producer
  • 运行 Flink 使用者Run Flink consumer

备注

GitHub 上提供了此示例This sample is available on GitHub

先决条件Prerequisites

若要完成本教程,请确保具备以下先决条件:To complete this tutorial, make sure you have the following prerequisites:

  • 通读用于 Apache Kafka 的事件中心一文。Read through the Event Hubs for Apache Kafka article.
  • Azure 订阅。An Azure subscription. 如果没有订阅,请在开始之前创建一个试用帐户If you do not have one, create a trial account before you begin.
  • Java 开发工具包 (JDK) 1.7+Java Development Kit (JDK) 1.7+
    • 在 Ubuntu 上运行 apt-get install default-jdk,以便安装 JDK。On Ubuntu, run apt-get install default-jdk to install the JDK.
    • 请确保设置 JAVA_HOME 环境变量,使之指向在其中安装了 JDK 的文件夹。Be sure to set the JAVA_HOME environment variable to point to the folder where the JDK is installed.
  • 下载安装 Maven 二进制存档Download and install a Maven binary archive
    • 在 Ubuntu 上,可以通过运行 apt-get install maven 来安装 Maven。On Ubuntu, you can run apt-get install maven to install Maven.
  • GitGit
    • 在 Ubuntu 上,可以通过运行 sudo apt-get install git 来安装 Git。On Ubuntu, you can run sudo apt-get install git to install Git.

创建事件中心命名空间Create an Event Hubs namespace

要从任何事件中心服务进行发送或接收,需要事件中心命名空间。An Event Hubs namespace is required to send or receive from any Event Hubs service. 有关创建命名空间和事件中心的说明,请参阅创建事件中心See Creating an event hub for instructions to create a namespace and an event hub. 请确保复制事件中心连接字符串,以供将来使用。Make sure to copy the Event Hubs connection string for later use.

克隆示例项目Clone the example project

获得事件中心连接字符串后,克隆适用于 Kafka 的 Azure 事件中心存储库并导航到 flink 子文件夹:Now that you have the Event Hubs connection string, clone the Azure Event Hubs for Kafka repository and navigate to the flink subfolder:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink

使用提供的 Flink 制造者事例,将消息发送到事件中心服务。Using the provided Flink producer example, send messages to the Event Hubs service.

提供事件中心 Kafka 终结点Provide an Event Hubs Kafka endpoint

producer.configproducer.config

更新 producer/src/main/resources/producer.config 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将生产者定向到事件中心 Kafka 终结点。Update the bootstrap.servers and sasl.jaas.config values in producer/src/main/resources/producer.config to direct the producer to the Event Hubs Kafka endpoint with the correct authentication.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

通过命令行运行生产者Run producer from the command line

若要通过命令行运行生产者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):To run the producer from the command line, generate the JAR and then run from within Maven (or generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"

生产者现在将开始向主题 test 中的事件中心发送事件,并将事件输出到 stdout。The producer will now begin sending events to the event hub at topic test and printing the events to stdout.

使用所提供的使用者示例,接收来自事件中心的消息。Using the provided consumer example, receive messages from the event hub.

提供事件中心 Kafka 终结点Provide an Event Hubs Kafka endpoint

consumer.configconsumer.config

更新 consumer/src/main/resources/consumer.config 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将使用者定向到事件中心 Kafka 终结点。Update the bootstrap.servers and sasl.jaas.config values in consumer/src/main/resources/consumer.config to direct the consumer to the Event Hubs Kafka endpoint with the correct authentication.

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

通过命令行运行使用者Run consumer from the command line

若要通过命令行运行使用者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):To run the consumer from the command line, generate the JAR and then run from within Maven (or generate the JAR using Maven, then run in Java by adding the necessary Kafka JAR(s) to the classpath):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"

如果事件中心具有事件(例如,如果生产者也在运行),则使用者现在将开始接收来自主题 test 的事件。If the event hub has events (for example, if your producer is also running), then the consumer now begins receiving events from the topic test.

查看 Flink 的 Kafka 连接器指南,了解有关将 Flink 连接到 Kafka 的更为详细的信息。Check out Flink's Kafka Connector Guide for more detailed information about connecting Flink to Kafka.

后续步骤Next steps

若要详细了解适用于 Kafka 的事件中心,请参阅以下文章:To learn more about Event Hubs for Kafka, see the following articles: