Azure Event Hubs data connection

Azure Event Hubs is a big data streaming platform and event ingestion service. Azure Data Explorer offers continuous ingestion from customer-managed Event Hubs.

The Event Hubs ingestion pipeline transfers events to Azure Data Explorer in several steps. You first create an event hub in the Azure portal. You then create a target table in Azure Data Explorer into which the data in a particular format, will be ingested using the given ingestion properties. The Event Hubs connection needs to know events routing. Data may be embedded with selected properties according to the event system properties. Create a connection to Event Hubs to create an event hub and send events. This process can be managed through the Azure portal, programmatically with C# or Python, or with the Azure Resource Manager template.

For general information about data ingestion in Azure Data Explorer, see Azure Data Explorer data ingestion overview.

Azure Data Explorer data connection authentication mechanisms

Caution

If the managed identity permissions are removed from the data source, the data connection is disabled and can't fetch data from the data source.

  • Key-based data connection: If a managed identity is not specified in the data connection, the connection automatically defaults to key-based authentication. Key-based connections fetch data using a resource connection string, such as the Azure Event Hubs connection string. Azure Data Explorer generates the resource connection string for the specified resource and securely saves it in the data connection. The connection string is then used to fetch data from the data source.

Caution

If the key is rotated, the data connection is disabled and can't fetch data from the data source. To fix the issue, update or recreate the data connection.

Data format

  • Data is read from the event hub in form of EventData objects.

  • See supported formats.

    Note

    • Ingestion from Event Hub doesn't support RAW format.
    • Azure Event Hub Schema Registry and schema-less Avro are not supported.
  • Data can be compressed using the GZip compression algorithm. You can specify Compression dynamically using ingestion properties, or in the static Data Connection settings.

    Note

    Data compression isn't supported for compressed formats (Avro, Parquet, ORC, ApacheAvro and W3CLOGFILE). Custom encoding and embedded system properties aren't supported on compressed data.

Event Hubs properties

Azure Data Explorer supports the following Event Hubs properties:

Note

Ingesting Event Hubs custom properties, used to associate metadata with events, isn't supported. If you need to ingest custom properties, send them in the body of the event data. For more information, see Ingest custom properties.

Ingestion properties

Ingestion properties instruct the ingestion process, where to route the data, and how to process it. You can specify ingestion properties of the events ingestion using the EventData.Properties. You can set the following properties:

Note

Property names are case sensitive.

Property Description
Database The case-sensitive name of the target database. By default, data is ingested into the target database associated with the data connection. Use this property to override the default database and send data to a different database. To do so, you must first set up the connection as a multi-database connection.
Table The case-sensitive name of the existing target table. Overrides the Table set on the Data Connection pane.
Format Data format. Overrides the Data format set on the Data Connection pane.
IngestionMappingReference Name of the existing ingestion mapping to be used. Overrides the Column mapping set on the Data Connection pane.
Compression Data compression, None (default), or GZip compression.
Encoding Data encoding, the default is UTF8. Can be any of .NET supported encodings.
Tags A list of tags to associate with the ingested data, formatted as a JSON array string. There are performance implications when using tags.
RawHeaders Indicates that event source is Kafka and Azure Data Explorer must use byte array deserialization to read other routing properties. Value is ignored.

Note

Only events enqueued after you create the data connection are ingested.

Events routing

When you create a data connection to your cluster, you can specify the routing for where to send ingested data. The default routing is to the target table specified in the connection string that is associated with the target database. The default routing for your data is also referred to as static routing. You can specify an alternative routing for your data by setting the event data properties mentioned above.

Route event data to an alternate database

Routing data to an alternate database is off by default. To send the data to a different database, you must first set the connection as a multi-database connection. You can do this in the Azure portal Azure portal, C#, Python, or an ARM template. The user, group, service principal, or managed identity used to allow database routing must at least have the contributor role and write permissions on the cluster.

To specify an alternate database, set the Database ingestion property.

Warning

Specifying an alternate database without setting the connection as a multi-database data connection will cause the ingestion to fail.

Route event data to an alternate table

To specify an alternate table for each event, set the Table, Format, Compression, and mapping ingestion properties. The connection dynamically routes the ingested data as specified in the EventData.Properties, overriding the static properties for this event.

The following example shows you how to set the event hub details and send weather metric data to alternate database (MetricsDB) and table (WeatherMetrics). The data is in JSON format and mapping1 is pre-defined on table WeatherMetrics.

// This sample uses Azure.Messaging.EventHubs which is a .Net Framework library.
await using var producerClient = new EventHubProducerClient("<eventHubConnectionString>");
// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(
    new { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }
)));
eventData.Properties.Add("Database", "MetricsDB");
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['myDataTag']");
var events = new[] { eventData };
// Send events
await producerClient.SendAsync(events);

Event system properties mapping

System properties store properties that are set by the Event Hubs service, at the time the event is enqueued. The data connection to the event hub can embed a selected set of system properties into the data ingested into a table based on a given mapping.

Note

  • Embedding system properties is supported for json and tabular formats (i.e. JSON, MultiJSON, CSV, TSV, PSV, SCsv, SOHsv, TSVE).
  • When using a non-supported format (i.e. TXT or compressed formats like Parquet, Avro etc.) the data will still be ingested, but the properties will be ignored.
  • Embedding system properties is not supported when a compression of Event Hub messages is set. In such scenarios, an appropriate error will be emitted and the data will not be ingested.
  • For tabular data, system properties are supported only for single-record event messages.
  • For json data, system properties are also supported for multiple-record event messages. In such cases, the system properties are added only to the first record of the event message.
  • For CSV mapping, properties are added at the beginning of the record in the order listed in the creation of the data connection. Don't rely on the order of these properties, as it may change in the future.
  • For JSON mapping, properties are added according to property names in the System properties table.

Event Hubs service exposes the following system properties:

Property Data Type Description
x-opt-enqueued-time datetime UTC time when the event was enqueued
x-opt-sequence-number long The logical sequence number of the event within the partition stream of the event hub
x-opt-offset string The offset of the event from the event hub partition stream. The offset identifier is unique within a partition of the event hub stream
x-opt-publisher string The publisher name, if the message was sent to a publisher endpoint
x-opt-partition-key string The partition key of the corresponding partition that stored the event

When you work with IoT Central event hubs, you can also embed IoT Hub system properties in the payload. For the complete list, see IoT Hub system properties.

If you selected Event system properties in the Data Source section of the table, you must include the properties in the table schema and mapping.

Schema mapping examples

Table schema mapping example

If your data includes three columns (Timespan, Metric, and Value) and the properties you include are x-opt-enqueued-time and x-opt-offset, create or alter the table schema by using this command:

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

CSV mapping example

Run the following commands to add data to the beginning of the record. Note ordinal values.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

JSON mapping example

Data is added by using the system properties mapping. Run these commands:

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

Schema mapping for Event Hub Capture Avro files

One way to consume Event Hub data is to capture events through Azure Event Hubs in Azure Blob Storage or Azure Data Lake Storage. You can then ingest the capture files as they are written using an Event Grid Data Connection in Azure Data Explorer.

The schema of the capture files is different from the schema of the original event sent to Event Hub. You should design the destination table schema with this difference in mind. Specifically, the event payload is represented in the capture file as a byte array, and this array isn't automatically decoded by the Event Grid Azure Data Explorer data connection. For more specific information on the file schema for Event Hub Avro capture data, see Exploring captured Avro files in Azure Event Hubs.

To correctly decode the event payload:

  1. Map the Body field of the captured event to a column of type dynamic in the destination table.
  2. Apply an update policy that converts the byte array into a readable string using the unicode_codepoints_to_string() function.

Ingest custom properties

When ingesting events from Event Hubs, data is taken from the body section of the event data object. However, Event Hubs custom properties are defined in the properties section of the object and are not ingested. To ingest customer properties, you must embed them into the data in body section of the object.

The following example compares the events data object containing custom property customProperty as defined by Event Hubs (left) with the embedded property required for ingestion (right).

{
"body":{
"value": 42
},
"properties":{
"customProperty": "123456789"
}
}
{
"body":{
"value": 42,
"customProperty": "123456789"
}
}

You can use one of the following methods to embed custom properties into the data in body section of the event data object:

  • In Event Hubs, when creating the event data object, embed the custom properties as part of the data in the body section of the object.
  • Use Azure Stream Analytics to process events from the event hub and embed the custom properties in the event data. From Azure Stream Analytics you can ingest the data natively using the Azure Data Explorer output connector, or route the data into another event hub and from there into your cluster.
  • Use Azure Functions to add the custom properties and then ingest the data.

Cross-region Event Hub data connection

For best performance, create all the following resources in the same region as the cluster. If there is no other alternative, consider using Premium or Dedicated Event Hub tiers. Event Hub tiers comparison can be found here.

Create an event hub

If you don't already have one, Create an event hub. Connecting to event hub can be managed through the Azure portal, programmatically with C# or Python, or with the Azure Resource Manager template.

Note

  • The ability to dynamically add partitions after creating an event hub is only available with Event Hubs Premium and Dedicated tiers. Consider the long-term scale when setting partition count.
  • Consumer group must be unique per consumer. Create a consumer group dedicated to Azure Data Explorer connection.

Send events

See the sample app that generates data and sends it to an event hub.

For an example of how to generate sample data, see Ingest data from event hub into Azure Data Explorer

Set up Geo-disaster recovery solution

Event hub offers a Geo-disaster recovery solution. Azure Data Explorer doesn't support Alias event hub namespaces. To implement the Geo-disaster recovery in your solution, create two event hub data connections: one for the primary namespace and one for the secondary namespace. Azure Data Explorer will listen to both event hub connections.

Note

It's the user's responsibility to implement a failover from the primary namespace to the secondary namespace.