使用 Azure 事件中心作为增量实时表数据源

本文介绍如何使用增量实时表处理来自 Azure 事件中心的消息。 你不能使用结构化流式处理事件中心连接器,因为此库不作为 Databricks Runtime 的一部分提供,并且增量实时表不允许使用第三方 JVM 库

增量实时表如何连接到 Azure 事件中心?

Azure 事件中心提供与 Apache Kafka 兼容的终结点,该终结点可与 Databricks Runtime 中提供的结构化流式处理 Kafka 连接器一起使用,以处理来自 Azure 事件中心的消息。 有关 Azure 事件中心和 Apache Kafka 兼容性的详细信息,请参阅使用 Apache Kafka 应用程序中的 Azure 事件中心

以下步骤介绍如何将增量实时表管道连接到现有事件中心实例,以及如何使用主题中的事件。 若要完成这些步骤,需要以下事件中心连接值:

  • 事件中心命名空间的名称。
  • 事件中心命名空间中的事件中心实例的名称。
  • 事件中心的共享访问策略名称和策略密钥。 默认情况下,系统会为每个事件中心命名空间创建 RootManageSharedAccessKey 策略。 此策略具有 managesendlisten 权限。 如果管道仅从事件中心读取数据,Databricks 建议创建一个仅具有侦听权限的新策略。

有关事件中心连接字符串的详细信息,请参阅获取事件中心连接字符串

注意

  • Azure 事件中心提供 OAuth 2.0 和共享访问签名 (SAS) 选项来授权访问安全资源。 这些说明使用基于 SAS 的身份验证。
  • 如果从 Azure 门户获取事件中心连接字符串,则它可能不包含 EntityPath 值。 仅当使用结构化流式处理事件中心连接器时,才需要 EntityPath 值。 使用结构化流式处理 Kafka 连接器只需要提供主题名称。

将策略密钥存储在 Azure Databricks 机密中

由于策略密钥是敏感信息,因此 Databricks 建议不要在管道代码中对值进行硬编码。 请改用 Azure Databricks 机密来存储和管理对密钥的访问权限。

以下示例使用 Databricks CLI 创建机密范围并将密钥存储在该机密范围内。 在管道代码中,将 dbutils.secrets.get() 函数与 scope-nameshared-policy-name 配合使用来检索键值。

databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

有关 Azure Databricks 机密的详细信息,请参阅机密管理

创建笔记本并添加管道代码以使用事件

以下示例从主题中读取 IoT 事件,但你可以根据应用程序的要求改编该示例。 最佳做法是,Databricks 建议使用增量实时表管道设置来配置应用程序变量。 然后,管道代码使用 spark.conf.get() 函数来检索值。 有关使用管道设置参数化管道的详细信息,请参阅参数化管道

import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE                    = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME                         = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME  = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE                    = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR                     = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.chinacloudapi.cn/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
  "kafka.bootstrap.servers"  : f"{EH_NAMESPACE}.servicebus.chinacloudapi.cn:9093",
  "subscribe"                : EH_NAME,
  "kafka.sasl.mechanism"     : "PLAIN",
  "kafka.security.protocol"  : "SASL_SSL",
  "kafka.sasl.jaas.config"   : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
  "kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
  "kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
  "maxOffsetsPerTrigger"     : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
  "failOnDataLoss"           : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
  "startingOffsets"          : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp  BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
  return (df
    .withColumn("records", col("value").cast("string"))
    .withColumn("parsed_records", from_json(col("records"), payload_schema))
    .withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
    .withColumn("eh_enqueued_timestamp", expr("timestamp"))
    .withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
    .withColumn("etl_processed_timestamp", col("current_timestamp"))
    .withColumn("etl_rec_uuid", expr("uuid()"))
    .drop("records", "value", "key")
  )

@dlt.create_table(
  comment="Raw IOT Events",
  table_properties={
    "quality": "bronze",
    "pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
  },
  partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
  return (
   spark.readStream
    .format("kafka")
    .options(**KAFKA_OPTIONS)
    .load()
    .transform(parse)
  )

创建管道

使用以下设置创建新管道,将占位符值替换为适合你的环境的值。

{
  "clusters": [
    {
      "spark_conf": {
        "spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.chinacloudapi.cn": "{{secrets/<scope-name>/<secret-name>}}"
      },
      "num_workers": 4
    }
  ],
  "development": true,
  "continuous": false,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "photon": false,
  "libraries": [
    {
      "notebook": {
        "path": "<path-to-notebook>"
      }
    }
  ],
  "name": "dlt_eventhub_ingestion_using_kafka",
  "storage": "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/iot/",
  "configuration": {
    "iot.ingestion.eh.namespace": "<eh-namespace>",
    "iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
    "iot.ingestion.eh.name": "<eventhub>",
    "io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
    "iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
    "iot.ingestion.spark.startingOffsets": "latest",
    "iot.ingestion.spark.failOnDataLoss": "false",
    "iot.ingestion.kafka.requestTimeout": "60000",
    "iot.ingestion.kafka.sessionTimeout": "30000"
  },
  "target": "<target-database-name>"
}

替换

  • <container-name> 替换为 Azure 存储帐户容器的名称。
  • <storage-account-name> 替换为 ADLS Gen2 存储帐户的名称。
  • <eh-namespace> 替换为事件中心命名空间的名称。
  • <eh-policy-name> 替换为事件中心策略密钥的机密范围密钥。
  • <eventhub> 替换为事件中心实例的名称。
  • <secret-scope-name> 替换为包含事件中心策略密钥的 Azure Databricks 机密范围的名称。

最佳做法是,此管道不使用默认 DBFS 存储路径,而是使用 Azure Data Lake Storage Gen2 (ADLS Gen2) 存储帐户。 有关为 ADLS Gen2 存储帐户配置身份验证的详细信息,请参阅使用管道中的机密安全访问存储凭据