将 Apache Flink 与适用于 Apache Kafka 的 Azure 事件中心配合使用

本教程演示如何在不更改你的协议客户端或运行你自己的群集的情况下,将 Apache Flink 连接到事件中心。 如需详细了解事件中心对 Apache Kafka 使用者协议的支持,请参阅 Apache Kafka 的事件中心

在本教程中,你将了解如何执行以下操作:

  • 创建事件中心命名空间
  • 克隆示例项目
  • 运行 Flink 制造者
  • 运行 Flink 使用者

注意

GitHub 上提供了此示例

先决条件

若要完成本教程,请确保具备以下先决条件:

  • 通读用于 Apache Kafka 的事件中心一文。
  • Azure 订阅。 如果没有 Azure 订阅,请在开始之前创建一个试用版订阅
  • Java 开发工具包 (JDK) 1.7+
    • 在 Ubuntu 上运行 apt-get install default-jdk,以便安装 JDK。
    • 请确保设置 JAVA_HOME 环境变量,使之指向在其中安装了 JDK 的文件夹。
  • 下载安装 Maven 二进制存档
    • 在 Ubuntu 上,可以通过运行 apt-get install maven 来安装 Maven。
  • Git
    • 在 Ubuntu 上,可以通过运行 sudo apt-get install git 来安装 Git。

创建事件中心命名空间

要从任何事件中心服务进行发送或接收,需要事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅创建事件中心。 请确保复制事件中心连接字符串,以供将来使用。

克隆示例项目

获得事件中心连接字符串后,克隆适用于 Kafka 的 Azure 事件中心存储库并导航到 flink 子文件夹:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/flink

使用提供的 Flink 制造者事例,将消息发送到事件中心服务。

提供事件中心 Kafka 终结点

producer.config

更新 producer/src/main/resources/producer.config 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将生产者定向到事件中心 Kafka 终结点。

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
client.id=FlinkExampleProducer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

通过命令行运行生产者

若要通过命令行运行生产者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestProducer"

生产者现在将开始向主题 test 中的事件中心发送事件,并将事件输出到 stdout。

使用所提供的使用者示例,接收来自事件中心的消息。

提供事件中心 Kafka 终结点

consumer.config

更新 consumer/src/main/resources/consumer.config 中的 bootstrap.serverssasl.jaas.config 值,以使用正确的身份验证将使用者定向到事件中心 Kafka 终结点。

bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093
group.id=FlinkExampleConsumer
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
   username="$ConnectionString" \
   password="{YOUR.EVENTHUBS.CONNECTION.STRING}";

重要

{YOUR.EVENTHUBS.CONNECTION.STRING} 替换为事件中心命名空间的连接字符串。 有关获取连接字符串的说明,请参阅获取事件中心连接字符串。 下面是一个配置示例:sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.chinacloudapi.cn/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";

通过命令行运行使用者

若要通过命令行运行使用者,生成 JAR,然后从 Maven 中运行(或使用 Maven 生成 JAR,然后通过向 classpath 添加必要的 Kafka JAR 在 Java 中运行):

mvn clean package
mvn exec:java -Dexec.mainClass="FlinkTestConsumer"

如果事件中心具有事件(例如,如果生产者也在运行),则使用者现在将开始接收来自主题 test 的事件。

查看 Flink 的 Kafka 连接器指南,了解有关将 Flink 连接到 Kafka 的更为详细的信息。

后续步骤

若要详细了解适用于 Kafka 的事件中心,请参阅以下文章: