Azure Event Hubs
Azure Event Hubs is a hyper-scale telemetry ingestion service that collects, transforms, and stores millions of events. As a distributed streaming platform, it gives you low latency and configurable time retention, which enables you to ingress massive amounts of telemetry into the cloud and read the data from multiple applications using publish-subscribe semantics.
This article explains how to use Structured Streaming with Azure Event Hubs and Azure Databricks clusters.
Note
Azure Event Hubs provides an endpoint compatible with Apache Kafka that you can use with the Structured Streaming Kafka connector, available in Databricks Runtime, to process messages from Azure Event Hubs. Databricks recommends using the Structured Streaming Kafka connector to process messages from Azure Event Hubs.
Requirements
For current release support, see "Latest Releases" in the Azure Event Hubs Spark Connector project readme file.
Create a library in your Azure Databricks workspace using the Maven coordinate
com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17
.Note
This connector is updated regularly, and a more recent version may be available: we recommend that you pull the latest connector from the Maven repository
Install the created library into your cluster.
Schema
The schema of the records is:
Column | Type |
---|---|
body |
binary |
partition |
string |
offset |
string |
sequenceNumber |
long |
enqueuedTime |
timestamp |
publisher |
string |
partitionKey |
string |
properties |
map[string,json] |
The body
is always provided as a byte array. Use cast("string")
to explicitly deserialize the body
column.
Configuration
This section discusses the configuration settings you need to work with Event Hubs.
For detailed guidance on configuring Structured Streaming with Azure Event Hubs, see the Structured Streaming and Azure Event Hubs Integration Guide developed by Microsoft.
For detailed guidance on using Structured Streaming, see Streaming on Azure Databricks.
Connection string
An Event Hubs connection string is required to connect to the Event Hubs service. You can get the connection string
for your Event Hubs instance from the Azure portal or by using the ConnectionStringBuilder
in the library.
Azure portal
When you get the connection string from the Azure portal, it may or may not have the EntityPath
key. Consider:
// Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"
To connect to your EventHubs, an EntityPath
must be present. If your connection string doesn't have one, don't worry.
This will take care of it:
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder(without) // defined in the previous code block
.setEventHubName("<eventhub-name>")
.build
ConnectionStringBuilder
Alternatively, you can use the ConnectionStringBuilder
to make your connection string.
import org.apache.spark.eventhubs.ConnectionStringBuilder
val connectionString = ConnectionStringBuilder()
.setNamespaceName("<namespace-name>")
.setEventHubName("<eventhub-name>")
.setSasKeyName("<key-name>")
.setSasKey("<key>")
.build
EventHubsConf
All configuration relating to Event Hubs happens in your EventHubsConf
. To create an EventHubsConf
, you must
pass a connection string:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
See Connection String for more information about obtaining a valid connection string.
For a complete list of configurations, see EventHubsConf. Here is a subset of configurations to get you started:
Option | Value | Default | Query type | Description |
---|---|---|---|---|
consumerGroup |
String | "$Default" | Streaming and batch | A consumer group is a view of an entire event hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and with their own offsets. More information is available in the Microsoft documentation. |
startingPosition |
EventPosition | Start of stream | Streaming and batch | The starting position for your Structured Streaming job. See startingPositions for information about the order in which options are read. |
maxEventsPerTrigger |
long | partitionCount - 1000 |
Streaming query | Rate limit on maximum number of events processed per trigger interval. The specified total number of events will be proportionally split across partitions of different volume. |
For each option, there exists a corresponding setting in EventHubsConf
. For example:
import org.apache.spark.eventhubs.
val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
.setConsumerGroup("sample-cg")
.setMaxEventsPerTrigger(10000)
EventPosition
EventHubsConf
allows users to specify starting (and ending) positions with the EventPosition
class. EventPosition
defines the position of an event in an Event Hub partition. The position can be an enqueued time, offset, sequence number,
the start of the stream, or the end of the stream.
import org.apache.spark.eventhubs._
EventPosition.fromOffset("246812") // Specifies offset 246812
EventPosition.fromSequenceNumber(100L) // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream // Specifies from start of stream
EventPosition.fromEndOfStream // Specifies from end of stream
If you would like to start (or end) at a specific position, simply create the correct EventPosition
and
set it in your EventHubsConf
:
val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)