Use Azure Event Hubs as a Delta Live Tables data source

This article explains how to use Delta Live Tables to process messages from Azure Event Hubs. You cannot use the Structured Streaming Event Hubs connector because this library is not available as part of Databricks Runtime, and Delta Live Tables does not allow you to use third-party JVM libraries.

How can Delta Live Tables connect to Azure Event Hubs?

Azure Event Hubs provides an endpoint compatible with Apache Kafka that you can use with the Structured Streaming Kafka connector, available in Databricks Runtime, to process messages from Azure Event Hubs. For more information about Azure Event Hubs and Apache Kafka compatibility, see Use Azure Event Hubs from Apache Kafka applications.

The following steps describe connecting a Delta Live Tables pipeline to an existing Event Hubs instance and consuming events from a topic. To complete these steps, you need the following Event Hubs connection values:

  • The name of the Event Hubs namespace.
  • The name of the Event Hub instance in the Event Hubs namespace.
  • A shared access policy name and policy key for Event Hubs. By default, A RootManageSharedAccessKey policy is created for each Event Hubs namespace. This policy has manage, send and listen permissions. If your pipeline only reads from Event Hubs, Databricks recommends creating a new policy with listen permission only.

For more information about the Event Hubs connection string, see Get an Event Hubs connection string.

Note

  • Azure Event Hubs provides both OAuth 2.0 and shared access signature (SAS) options to authorize access to your secure resources. These instructions use SAS-based authentication.
  • If you get the Event Hubs connection string from the Azure portal, it may not contain the EntityPath value. The EntityPath value is required only when using the Structured Streaming Event Hubs connector. Using the Structured Streaming Kafka Connector requires providing only the topic name.

Store the policy key in an Azure Databricks secret

Because the policy key is sensitive information, Databricks recommends not hardcoding the value in your pipeline code. Instead, use Azure Databricks secrets to store and manage access to the key.

The following example uses the Databricks CLI to create a secret scope and store the key in that secret scope. In your pipeline code, use the dbutils.secrets.get() function with the scope-name and shared-policy-name to retrieve the key value.

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

For more information on Azure Databricks secrets, see Secret management.

Create a notebook and add the pipeline code to consume events

The following example reads IoT events from a topic, but you can adapt the example for the requirements of your application. As a best practice, Databricks recommends using the Delta Live Tables pipeline settings to configure application variables. Your pipeline code then uses the spark.conf.get() function to retrieve values. For more information on using pipeline settings to parameterize your pipeline, see Use parameters with Delta Live Tables pipelines.

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.chinacloudapi.cn/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.chinacloudapi.cn:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

Create the pipeline

Create a new pipeline with the following settings, replacing the placeholder values with appropriate values for your environment.

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.chinacloudapi.cn": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

Replace

  • <container-name> with the name of an Azure storage account container.
  • <storage-account-name> with the name of an ADLS Gen2 storage account.
  • <eh-namespace> with the name of your Event Hubs namespace.
  • <eh-policy-name> with the secret scope key for the Event Hubs policy key.
  • <eventhub> with the name of your Event Hubs instance.
  • <secret-scope-name> with the name of the Azure Databricks secret scope that contains the Event Hubs policy key.

As a best practice, this pipeline doesn't use the default DBFS storage path but instead uses an Azure Data Lake Storage Gen2 (ADLS Gen2) storage account. For more information on configuring authentication for an ADLS Gen2 storage account, see Securely access storage credentials with secrets in a pipeline.