Azure 事件中心Azure Event Hubs

Azure 事件中心是超大规模的遥测引入服务,可收集、转换和存储数百万的事件。Azure Event Hubs is a hyper-scale telemetry ingestion service that collects, transforms, and stores millions of events. 作为分布式流式处理平台,它为用户提供低延迟和可配置的时间保留,使用户可以将大量遥测数据引入到云中,并使用发布-订阅语义从多个应用程序中读取数据。As a distributed streaming platform, it gives you low latency and configurable time retention, which enables you to ingress massive amounts of telemetry into the cloud and read the data from multiple applications using publish-subscribe semantics.

本文介绍了如何通过 Azure 事件中心和 Azure Databricks 群集使用结构化流式处理。This article explains how to use Structured Streaming with Azure Event Hubs and Azure Databricks clusters.

要求Requirements

有关当前的版本支持,请参阅 Azure 事件中心 Spark 连接器项目自述文件中的“最新版本”。For current release support, see “Latest Releases” in the Azure Event Hubs Spark Connector project readme file.

  1. 使用 Maven 坐标 com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 在 Azure Databricks 工作区中创建库Create a library in your Azure Databricks workspace using the Maven coordinate com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17.

    备注

    此连接器会定期更新,并且可能会有最新版本:建议你从 Maven 存储库拉取最新的连接器This connector is updated regularly, and a more recent version may be available: we recommend that you pull the latest connector from the Maven repository

  2. 将创建的库安装到群集中。Install the created library into your cluster.

架构Schema

记录的架构为:The schema of the records is:

Column 类型Type
bodybody binarybinary
partitionpartition stringstring
offsetoffset stringstring
sequenceNumbersequenceNumber longlong
enqueuedTimeenqueuedTime timestamptimestamp
publisherpublisher stringstring
partitionKeypartitionKey stringstring
propertiesproperties map[string,json]map[string,json]

body 始终以字节数组的形式提供。The body is always provided as a byte array. 请使用 cast("string") 显式反序列化 body 列。Use cast("string") to explicitly deserialize the body column.

快速启动Quick Start

让我们从一个快速示例开始:WordCount。Let’s start with a quick example: WordCount. 以下笔记本演示了如何通过 Azure 事件中心使用结构化流式处理来运行 WordCount。The following notebook is all that it takes to run WordCount using Structured Streaming with Azure Event Hubs.

采用结构化流式处理笔记本的 Azure 事件中心 WordCountAzure Event Hubs WordCount with Structured Streaming notebook

获取笔记本Get notebook

配置Configuration

本部分介绍了使用事件中心时所需的配置设置。This section discusses the configuration settings you need to work with Event Hubs.

有关使用 Azure 事件中心配置结构化流式处理的详细指南,请参阅 Microsoft 制定的结构化流式处理和 Azure 事件中心集成指南For detailed guidance on configuring Structured Streaming with Azure Event Hubs, see the Structured Streaming and Azure Event Hubs Integration Guide developed by Microsoft.

有关使用结构化流式处理的详细指南,请参阅结构化流式处理For detailed guidance on using Structured Streaming, see Structured Streaming.

连接字符串Connection string

需要使用事件中心连接字符串来连接到事件中心服务。An Event Hubs connection string is required to connect to the Event Hubs service. 可以使用 Azure 门户或使用库中的 ConnectionStringBuilder 获取事件中心实例的连接字符串。You can get the connection string for your Event Hubs instance from the Azure portal or by using the ConnectionStringBuilder in the library.

Azure 门户Azure portal

从 Azure 门户获取连接字符串时,它可能有也可能没有 EntityPath 键。When you get the connection string from the Azure portal, it may or may not have the EntityPath key. 请注意以下几点:Consider:

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

若要连接到事件中心,必须提供 EntityPathTo connect to your EventHubs, an EntityPath must be present. 如果你的连接字符串中没有该路径,请不要担心。If your connection string doesn’t have one, don’t worry. 下面的对象将处理此问题:This will take care of it:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilderConnectionStringBuilder

另外,也可以使用 ConnectionStringBuilder 来创建连接字符串。Alternatively, you can use the ConnectionStringBuilder to make your connection string.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConfEventHubsConf

与事件中心相关的所有配置都在 EventHubsConf 中进行。All configuration relating to Event Hubs happens in your EventHubsConf. 若要创建 EventHubsConf,必须传递连接字符串:To create an EventHubsConf, you must pass a connection string:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

若要详细了解如何获取有效的连接字符串,请参阅连接字符串See Connection String for more information about obtaining a valid connection string.

有关配置的完整列表,请参阅 EventHubsConfFor a complete list of configurations, see EventHubsConf. 若要开始,请参阅下面的部分配置。Here is a subset of configurations to get you started:

选项Option Value 默认Default 查询类型Query type 说明Description
consumerGroupconsumerGroup 字符串String “$Default”“$Default” 流式处理和批处理Streaming and batch 使用者组是整个事件中心的视图。A consumer group is a view of an entire event hub. 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调和偏移量独立读取流。Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. Microsoft 文档中提供了详细信息。More information is available in the Microsoft documentation.
startingPositionstartingPosition EventPositionEventPosition 流的开头Start of stream 流式处理和批处理Streaming and batch 结构化流式处理作业的起始位置。The starting position for your Structured Streaming job. 有关选项读取顺序的信息,请参阅 startingPositionsSee startingPositions for information about the order in which options are read.
maxEventsPerTriggermaxEventsPerTrigger longlong partitionCountpartitionCount

* 1000* 1000
流式处理查询Streaming query 每个触发器间隔处理的最大事件数的速率限制。Rate limit on maximum number of events processed per trigger interval. 指定的事件总数将按比例分配到不同卷的分区中。The specified total number of events will be proportionally split across partitions of different volume.

对于每个选项,EventHubsConf 中都存在一个相应的设置。For each option, there exists a corresponding setting in EventHubsConf. 例如: 。For example:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPositionEventPosition

EventHubsConf 允许用户通过 EventPosition 类指定起始(和结束)位置。EventHubsConf allows users to specify starting (and ending) positions with the EventPosition class. EventPosition 定义事件在事件中心分区中的位置。EventPosition defines the position of an event in an Event Hub partition. 位置可以是排队的时间、偏移量、序列号、流的开头或流的末尾。The position can be an enqueued time, offset, sequence number, the start of the stream, or the end of the stream.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Specifies any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

如果要在特定位置开始(或结束),只需创建正确的 EventPosition 并在 EventHubsConf 中对它进行设置即可:If you would like to start (or end) at a specific position, simply create the correct EventPosition and set it in your EventHubsConf:

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

采用 Azure 事件中心的生产结构化流式处理Production Structured Streaming with Azure Event Hubs

与简单地将笔记本附加到群集并以交互方式运行流式处理查询相比,在生产环境中运行流式处理查询时,你可能需要更强的可靠性和正常运行时间保证。When you run streaming queries in production, you probably want more robustness and uptime guarantees than you would have when you simply attach a notebook to a cluster and run your streaming queries interactively. 导入并运行以下笔记本,以便演示如何使用 Azure 事件中心和 Azure Databricks 在生产环境中配置和运行结构化流式处理。Import and run the following notebook for a demonstration of how to configure and run Structured Streaming in production with Azure Event Hubs and Azure Databricks.

有关详细信息,请参阅生产中的结构化流式处理For more information, see Structured Streaming in production.

采用 Azure 事件中心笔记本的生产结构化流式处理Production Structured Streaming with Azure Event Hubs notebook

获取笔记本Get notebook

端到端事件中心流式处理教程End-to-end Event Hubs streaming tutorial

有关使用事件中心将数据流式传输到群集的端到端示例,请参阅教程:使用事件中心将数据流式传输到 Azure DatabricksFor an end-to-end example of streaming data into a cluster using Event Hubs, see Tutorial: Stream data into Azure Databricks using Event Hubs.