将 Apache Spark 应用程序与 Azure 事件中心连接
本教程详细介绍如何将 Spark 应用程序连接到事件中心进行实时流式处理。 通过此集成,可以进行流式处理,而无需更改协议客户端,也无需运行你自己的 Kafka 或 Zookeeper 群集。 本教程需要 Apache Spark v2.4 及更高版本和 Apache Kafka v2.0 及更高版本。
注意
GitHub 上提供了此示例
本教程介绍如何执行下列操作:
- 创建事件中心命名空间
- 克隆示例项目
- 运行 Spark
- 从用于 Kafka 的事件中心读取
- 写入到用于 Kafka 的事件中心
先决条件
开始本教程前,请确保具备:
- Azure 订阅。 如果没有,请创建一个试用版订阅。
- Apache Spark v2.4
- Apache Kafka v2.0
- Git
注意
Spark-Kafka 适配器已更新,可以支持自 Spark v2.4 以来发布的 Kafka v2.0。 在以前版本的 Spark 中,此适配器支持 Kafka v0.10 及更高版本,但特别依赖于 Kafka v0.10 API。 由于适用于 Kafka 的事件中心不支持 Kafka v0.10,因此 v2.4 之前的 Spark 版本提供的 Spark-Kafka 适配器不受适用于 Kafka 生态系统的事件中心的支持。
创建事件中心命名空间
要从事件中心服务进行发送和接收,需要使用事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅创建事件中心。 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。 有关说明,请参阅获取事件中心连接字符串。
克隆示例项目
克隆 Azure 事件中心存储库并导航到 tutorials/spark
子文件夹:
git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark
从用于 Kafka 的事件中心读取
进行一些配置更改以后,即可从适用于 Kafka 的事件中心读取数据。 根据命名空间提供的详细信息更新 BOOTSTRAP_SERVERS 和 EH_SASL 以后,即可使用事件中心进行流式处理,就像使用 Kafka 一样。 如需完整的示例代码,请查看 GitHub 上的 sparkConsumer.scala 文件。
//Read from your Event Hub!
val df = spark.readStream
.format("kafka")
.option("subscribe", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", GROUP_ID)
.option("failOnDataLoss", "true")
.load()
//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
.outputMode("append")
.format("console")
.start()
如果收到类似于以下错误的错误,请将将 .option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
添加到 spark.readStream
调用,然后重试。
IllegalArgumentException: requirement failed: Got wrong record for <spark job name> even after seeking to offset 4216 got offset 4217 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets
写入到用于 Kafka 的事件中心
也可向事件中心写入数据,所用方式与向 Kafka 写入数据一样。 请勿忘记更新配置,也就是使用事件中心命名空间的信息更改 BOOTSTRAP_SERVERS 和 EH_SASL。 如需完整的示例代码,请查看 GitHub 上的 sparkProducer.scala 文件。
df = /**Dataframe**/
//Write to your Event Hub!
df.writeStream
.format("kafka")
.option("topic", TOPIC)
.option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("checkpointLocation", "./checkpoint")
.start()
后续步骤
若要详细了解事件中心和适用于 Kafka 的事件中心,请参阅以下文章: