Azure 事件中心
Azure 事件中心是超大规模的遥测引入服务,可收集、传输和存储数百万的事件。 作为分布式流式处理平台,它为用户提供低延迟和可配置的时间保留,使用户可以将大量遥测数据引入到云中,并使用发布-订阅语义从多个应用程序中读取数据。
本文介绍了如何通过 Azure 事件中心和 Azure Databricks 群集使用结构化流式处理。
注意
Azure 事件中心提供与 Apache Kafka 兼容的终结点,该终结点可与 Databricks Runtime 中提供的结构化流式处理 Kafka 连接器一起使用,以处理来自 Azure 事件中心的消息。 Databricks 建议使用结构化流式处理 Kafka 连接器处理来自 Azure 事件中心的消息。
要求
有关当前的版本支持,请参阅 Azure 事件中心 Spark 连接器项目自述文件中的“最新版本”。
使用 Maven 坐标
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
在 Azure Databricks 工作区中创建库。注意
此连接器会定期更新,并且可能会有最新版本:建议你从 Maven 存储库拉取最新的连接器
将创建的库安装到群集中。
架构
记录的架构为:
列 | 类型 |
---|---|
body |
binary |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
body
始终以字节数组的形式提供。 请使用 cast("string")
显式反序列化 body
列。
配置
本部分介绍了使用事件中心时所需的配置设置。
有关使用 Azure 事件中心配置结构化流式处理的详细指南,请参阅 Microsoft 制定的结构化流式处理和 Azure 事件中心集成指南。
有关使用结构化流式处理的详细指导,请参阅 Azure Databricks 上的流式处理。
连接字符串
需要使用事件中心连接字符串来连接到事件中心服务。 可以使用 Azure 门户或使用库中的 ConnectionStringBuilder
获取事件中心实例的连接字符串。
Azure 门户
从 Azure 门户获取连接字符串时,它可能有也可能没有 EntityPath
键。 请注意以下几点:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
若要连接到事件中心,必须提供 EntityPath
。 如果你的连接字符串中没有该路径,请不要担心。
下面的对象将处理此问题:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
另外,也可以使用 ConnectionStringBuilder
来创建连接字符串。
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
与事件中心相关的所有配置都在 EventHubsConf
中进行。 若要创建 EventHubsConf
,必须传递连接字符串:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
若要详细了解如何获取有效的连接字符串,请参阅连接字符串。
有关配置的完整列表,请参阅 EventHubsConf。 若要开始,请参阅下面的部分配置。
选项 | 值 | 默认 | 查询类型 | 说明 |
---|---|---|---|---|
consumerGroup |
字符串 | “$Default” | 流式处理和批处理 | 使用者组是整个事件中心的视图。 使用者组使多个消费应用程序都有各自独立的事件流视图,并按自身步调和偏移量独立读取流。 Microsoft 文档中提供了详细信息。 |
startingPosition |
EventPosition | 流的开头 | 流式处理和批处理 | 结构化流式处理作业的起始位置。 有关选项读取顺序的信息,请参阅 startingPositions。 |
maxEventsPerTrigger |
long | partitionCount * 1000 |
流式处理查询 | 每个触发器间隔处理的最大事件数的速率限制。 指定的事件总数将按比例分配到不同卷的分区中。 |
对于每个选项,EventHubsConf
中都存在一个相应的设置。 例如: 。
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
允许用户通过 EventPosition
类指定起始(和结束)位置。 EventPosition
定义事件在事件中心分区中的位置。 位置可以是排队的时间、偏移量、序列号、流的开头或流的末尾。
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
如果要在特定位置开始(或结束),只需创建正确的 EventPosition
并在 EventHubsConf
中对它进行设置即可:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)