使用 Azure 事件中心作为增量实时表数据源
本文介绍如何使用增量实时表处理来自 Azure 事件中心的消息。 你不能使用结构化流式处理事件中心连接器,因为此库不作为 Databricks Runtime 的一部分提供,并且增量实时表不允许使用第三方 JVM 库。
增量实时表如何连接到 Azure 事件中心?
Azure 事件中心提供与 Apache Kafka 兼容的终结点,该终结点可与 Databricks Runtime 中提供的结构化流式处理 Kafka 连接器一起使用,以处理来自 Azure 事件中心的消息。 有关 Azure 事件中心和 Apache Kafka 兼容性的详细信息,请参阅使用 Apache Kafka 应用程序中的 Azure 事件中心。
以下步骤介绍如何将增量实时表管道连接到现有事件中心实例,以及如何使用主题中的事件。 若要完成这些步骤,需要以下事件中心连接值:
- 事件中心命名空间的名称。
- 事件中心命名空间中的事件中心实例的名称。
- 事件中心的共享访问策略名称和策略密钥。 默认情况下,系统会为每个事件中心命名空间创建
RootManageSharedAccessKey
策略。 此策略具有manage
、send
和listen
权限。 如果管道仅从事件中心读取数据,Databricks 建议创建一个仅具有侦听权限的新策略。
有关事件中心连接字符串的详细信息,请参阅获取事件中心连接字符串。
注意
- Azure 事件中心提供 OAuth 2.0 和共享访问签名 (SAS) 选项来授权访问安全资源。 这些说明使用基于 SAS 的身份验证。
- 如果从 Azure 门户获取事件中心连接字符串,则它可能不包含
EntityPath
值。 仅当使用结构化流式处理事件中心连接器时,才需要EntityPath
值。 使用结构化流式处理 Kafka 连接器只需要提供主题名称。
将策略密钥存储在 Azure Databricks 机密中
由于策略密钥是敏感信息,因此 Databricks 建议不要在管道代码中对值进行硬编码。 请改用 Azure Databricks 机密来存储和管理对密钥的访问权限。
以下示例使用 Databricks CLI 创建机密范围并将密钥存储在该机密范围内。 在管道代码中,将 dbutils.secrets.get()
函数与 scope-name
和 shared-policy-name
配合使用来检索键值。
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
有关 Azure Databricks 机密的详细信息,请参阅机密管理。
创建笔记本并添加管道代码以使用事件
以下示例从主题中读取 IoT 事件,但你可以根据应用程序的要求改编该示例。 最佳做法是,Databricks 建议使用增量实时表管道设置来配置应用程序变量。 然后,管道代码使用 spark.conf.get()
函数来检索值。 有关使用管道设置来将管道参数化的详细信息,请参阅在 Python 或 SQL 中将数据集声明参数化。
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.chinacloudapi.cn/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.chinacloudapi.cn:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
创建管道
使用以下设置创建新管道,将占位符值替换为适合你的环境的值。
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.chinacloudapi.cn": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
替换
<container-name>
替换为 Azure 存储帐户容器的名称。<storage-account-name>
替换为 ADLS Gen2 存储帐户的名称。<eh-namespace>
替换为事件中心命名空间的名称。<eh-policy-name>
替换为事件中心策略密钥的机密范围密钥。<eventhub>
替换为事件中心实例的名称。<secret-scope-name>
替换为包含事件中心策略密钥的 Azure Databricks 机密范围的名称。
最佳做法是,此管道不使用默认 DBFS 存储路径,而是使用 Azure Data Lake Storage Gen2 (ADLS Gen2) 存储帐户。 有关为 ADLS Gen2 存储帐户配置身份验证的详细信息,请参阅使用管道中的机密安全访问存储凭据。