将 Apache Spark 应用程序与 Azure 事件中心连接Connect your Apache Spark application with Azure Event Hubs

本教程详细介绍如何将 Spark 应用程序连接到事件中心进行实时流式处理。This tutorial walks you through connecting your Spark application to Event Hubs for real-time streaming. 此集成允许在不更改协议客户端或运行自己的 Kafka 或 Zookeeper 群集的情况下进行流式处理。This integration enables streaming without having to change your protocol clients or run your own Kafka or Zookeeper clusters. 本教程需要 Apache Spark v2.4+ 和 Apache Kafka v2.0+。This tutorial, requires Apache Spark v2.4+ and Apache Kafka v2.0+.

备注

GitHub 上提供了此示例This sample is available on GitHub

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 创建事件中心命名空间Create an Event Hubs namespace
  • 克隆示例项目Clone the example project
  • 运行 SparkRun Spark
  • 从用于 Kafka 的事件中心读取Read from Event Hubs for Kafka
  • 写入到用于 Kafka 的事件中心Write to Event Hubs for Kafka

先决条件Prerequisites

开始本教程前,请确保具备:Before you start this tutorial, make sure that you have:

备注

Spark-Kafka 适配器已更新,可以支持自 Spark v2.4 以来发布的 Kafka v2.0。The Spark-Kafka adapter was updated to support Kafka v2.0 as of Spark v2.4. 在以前版本的 Spark 中,此适配器支持 Kafka v0.10 及更高版本,但特别依赖于 Kafka v0.10 API。In previous releases of Spark, the adapter supported Kafka v0.10 and later but relied specifically on Kafka v0.10 APIs. 由于适用于 Kafka 的事件中心不支持 Kafka v0.10,因此 v2.4 之前的 Spark 版本提供的 Spark-Kafka 适配器不受适用于 Kafka 生态系统的事件中心的支持。As Event Hubs for Kafka does not support Kafka v0.10, the Spark-Kafka adapters from versions of Spark prior to v2.4 are not supported by Event Hubs for Kafka Ecosystems.

创建事件中心命名空间Create an Event Hubs namespace

要从事件中心服务进行发送和接收,需要使用事件中心命名空间。An Event Hubs namespace is required to send and receive from any Event Hubs service. 有关创建命名空间和事件中心的说明,请参阅创建事件中心See Creating an event hub for instructions to create a namespace and an event hub. 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。Get the Event Hubs connection string and fully qualified domain name (FQDN) for later use. 有关说明,请参阅获取事件中心连接字符串For instructions, see Get an Event Hubs connection string.

克隆示例项目Clone the example project

克隆 Azure 事件中心存储库并导航到 tutorials/spark 子文件夹:Clone the Azure Event Hubs repository and navigate to the tutorials/spark subfolder:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

从用于 Kafka 的事件中心读取Read from Event Hubs for Kafka

进行一些配置更改以后,即可从适用于 Kafka 的事件中心读取数据。With a few configuration changes, you can start reading from Event Hubs for Kafka. 根据命名空间提供的详细信息更新 BOOTSTRAP_SERVERSEH_SASL 以后,即可使用事件中心进行流式处理,就像使用 Kafka 一样。Update BOOTSTRAP_SERVERS and EH_SASL with details from your namespace and you can start streaming with Event Hubs as you would with Kafka. 如需完整的示例代码,请查看 GitHub 上的 sparkConsumer.scala 文件。For the full sample code, see sparkConsumer.scala file on the GitHub.

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "false")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

写入到用于 Kafka 的事件中心Write to Event Hubs for Kafka

也可向事件中心写入数据,所用方式与向 Kafka 写入数据一样。You can also write to Event Hubs the same way you write to Kafka. 请勿忘记更新配置,也就是使用事件中心命名空间的信息更改 BOOTSTRAP_SERVERSEH_SASLDon't forget to update your configuration to change BOOTSTRAP_SERVERS and EH_SASL with information from your Event Hubs namespace. 如需完整的示例代码,请查看 GitHub 上的 sparkProducer.scala 文件。For the full sample code, see sparkProducer.scala file on the GitHub.

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

后续步骤Next steps

若要详细了解事件中心和适用于 Kafka 的事件中心,请参阅以下文章:To learn more about Event Hubs and Event Hubs for Kafka, see the following articles: