重要
DLT sink
API 目前为公共预览版。
本文介绍 DLT sink
API 以及如何将其与 DLT 流 配合使用,以将管道转换的记录写入到外部数据接收器,例如 Unity 目录托管表和外部表、Hive 元存储表以及 Apache Kafka 或 Azure 事件中心等事件流服务。
什么是 DLT 接收器?
使用 DLT 接收器可以将转换后的数据写入目标,例如 Apache Kafka 或 Azure 事件中心等事件流式处理服务,以及由 Unity Catalog 或 Hive 元存储管理的外部表。 以前,在 DLT 管道中创建的流式处理表和具体化视图只能保存到 Azure Databricks 托管 Delta 表。 有了接收器,你现在有更多的选项来保存 DLT 管道的输出。
何时应使用 DLT 接收器?
如果你有以下需求,Databricks 建议使用 DLT 接收器:
- 构建作用例,例如欺诈检测、实时分析和客户建议。 作用例通常从消息总线(例如 Apache Kafka 主题)读取数据,然后以低延迟处理数据,并将处理过的记录写回到消息总线。 此方法使你能够通过不从云存储写入或读取来实现较低的延迟。
- 将转换后的数据从 DLT 流写入外部 Delta 实例管理的表,包括 Unity Catalog 托管表和外部表,以及 Hive 元存储表。
- 将提取-转换-加载 (ETL) 反向执行到 Databricks 外部的接收器,例如 Apache Kafka 主题。 使用此方法可以有效地支持需要读取或使用 Unity 目录表或其他 Databricks 托管存储之外的数据的用例。
如何使用 DLT 接收器?
注释
- 仅支持使用
spark.readStream
和dlt.read_stream
的流式处理查询。 不支持批处理查询。 - 只可使用
append_flow
写入到接收器。 不支持其他流,例如apply_changes
。 - 运行完全刷新更新不会清理以前在接收器中计算的结果数据。 这意味着任何重新处理的数据都将追加到接收器,并且不会更改现有数据。
当事件数据从流式来源引入到 DLT 管道时,您可以使用 DLT 功能处理和优化这些数据,然后使用追加流处理将转换后的数据记录流式传输到 DLT 接收端。 使用 create_sink()
函数创建此接收器。 有关使用 create_sink
函数的更多详细信息,请参阅 接收器 API 参考。
若要实现 DLT 接收器,请执行以下步骤:
设置 DLT 管道来处理流式处理事件数据,并准备用于写入 DLT 接收器的数据记录。
配置并创建 DLT 接收器以使用首选目标接收器格式。
使用追加流将准备好的记录写入接收器。
本主题的其余部分介绍了这些步骤。
设置 DLT 管道以准备数据写入汇聚点
第一步是设置 DLT 管道,将原始事件流数据转换为要写入接收器的已准备数据。
为了更好地了解此过程,可以遵循 DLT 管道的此示例,该管道处理 wikipedia-datasets
Databricks 中示例数据中的点击流事件数据。 此管道分析原始数据集,以识别链接到 Apache Spark 文档页的维基百科页面,并逐步将这些数据提炼为仅包含 Apache_Spark.
的引用链接所在的表行。
在此示例中,DLT 管道是使用 奖牌体系结构构建的,该体系结构将数据组织到不同的层以提高质量和处理效率。
首先,使用 自动加载程序将数据集中的原始 JSON 记录加载到青铜层。 此 Python 代码演示如何创建一个名为 clickstream_raw
的流式处理表,其中包含源中的原始未处理数据:
import dlt
json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"
@dlt.table(
comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
table_properties={
"quality": "bronze"
}
)
def clickstream_raw():
return (
spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
)
运行此代码后,数据现在位于奖牌体系结构的“铜牌”(或“原始数据”)级别,必须进行清理。 下一步将数据优化为“silver”级别,这涉及到清理数据类型和列名,并使用 DLT 期望来确保数据完整性。
以下代码演示如何通过将铜牌层数据清理和验证到 clickstream_clean
银牌表来执行此操作:
@dlt.table(
comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
table_properties={
"quality": "silver"
}
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
return (
spark.readStream.table("clickstream_raw")
.withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
.withColumn("click_count", expr("CAST(n AS INT)"))
.withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
.withColumnRenamed("curr_title", "current_page_title")
.withColumnRenamed("prev_title", "previous_page_title")
.select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
)
若要开发管道结构的“黄金”层,可以筛选已清理的点击流数据,以隔离引用页所在的 Apache_Spark
条目。 在此最后一个代码示例中,仅选择写入到目标接收器表所需的列。
以下代码演示如何创建一个名为 spark_referrers
表示黄金层的表:
@dlt.table(
comment="A table of the most common pages that link to the Apache Spark page.",
table_properties={
"quality": "gold"
}
)
def spark_referrers():
return (
spark.readStream.table("clickstream_clean")
.filter(expr("current_page_title == 'Apache_Spark'"))
.withColumnRenamed("previous_page_title", "referrer")
.select("referrer", "current_page_id", "current_page_title", "click_count")
)
完成此数据准备过程后,必须配置要将清理的记录写入到的目标接收器。
配置 DLT 接收器
Databricks 支持将处理自流数据的记录写入三种类型的目标接收器:
- 增量表接收器
- Apache Kafka 接收器
- Azure 事件中心接收器
下面是 Delta、Kafka 和 Azure 事件中心接收器的配置示例:
德尔塔水槽
若要按文件路径创建 Delta 接收器,请执行以下操作:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)
若要使用完全限定的目录和架构路径按表名称创建 Delta 接收器:
dlt.create_sink(
name = "delta_sink",
format = "delta",
options = { "tableName": "my_catalog.my_schema.my_table" }
)
Kafka 和 Azure 事件中心接收器
此代码适用于 Apache Kafka 和 Azure 事件中心接收器。
topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.chinacloudapi.cn:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")
eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
+ f' required username="$ConnectionString" password="{connection_string}";'
dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
"kafka.bootstrap.servers": bootstrap_servers,
"kafka.sasl.mechanism": "PLAIN",
"kafka.security.protocol": "SASL_SSL",
"kafka.sasl.jaas.config": eh_sasl,
"topic": topic_name
}
)
一旦配置好汇入点并准备好 DLT 管道,即可开始将处理过的记录流式传输到汇入点。
使用追加流写入 DLT 接收器
配置接收器后,下一步是通过将接收器指定为追加流输出的记录的目标,向其写入处理好的记录。 为此,请将接收器指定为 target
修饰器中的 append_flow
值。
- 对于 Unity 目录托管表和外部表,请使用格式
delta
并在选项中指定路径或表名称。 DLT 管道必须配置为使用 Unity Catalog。 - 对于 Apache Kafka 主题,请使用
kafka
格式,并在选项中指定主题名称、连接信息和身份验证信息。 这些选项与 Spark 结构化流式处理 Kafka 接收器支持的选项相同。 请参阅配置 Kafka 结构化流式处理编写器。 - 对于 Azure 事件中心,请使用
kafka
格式,并在选项中指定事件中心名称、连接信息和身份验证信息。 这些选项与使用 Kafka 接口的 Spark 结构化流式处理事件中心接收器中支持的选项相同。 请参阅使用 Microsoft Entra ID 和 Azure 事件中心进行服务主体身份验证。 - 对于 Hive 元存储表,请使用
delta
格式并在选项中指定路径或表名称。 DLT 管道必须配置为使用Hive 元存储。
下面的示例显示如何设置数据流,以将经过 DLT 管道处理的记录写入 Delta、Kafka 和 Azure 事件中心接收器。
Delta 接收器
@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
spark.readStream.table("spark_referrers")
.selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)
Kafka 和 Azure 事件中心接收器
@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
spark.readStream.table("spark_referrers")
.selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)
对于 Azure 事件中心接收器,value
参数是必需的。 其他参数(如 key
、partition
、headers
和 topic
)是可选的。
有关 append_flow
修饰器的更多详细信息,请参阅使用追加流从多个源流写入流式处理表。
局限性
仅支持 Python API。 不支持 SQL。
仅支持使用
spark.readStream
和dlt.read_stream
的流式处理查询。 不支持批处理查询。只可使用
append_flow
写入到接收器。 不支持其他流(例如apply_changes
),并且无法在 DLT 数据集定义中使用接收器。 例如,不支持以下各项:@table("from_sink_table") def fromSink(): return read_stream("my_sink")
对于 Delta 接收器,表名称必须完全限定。 具体而言,对于 Unity Catalog 托管的外部表,表名称必须是格式
<catalog>.<schema>.<table>
。 对于 Hive 元存储,它必须采用<schema>.<table>
格式。运行
FullRefresh
不会清理以前计算的结果数据。 这意味着任何重新处理的数据都将追加到接收器,并且不会更改现有数据。不支持 DLT 预期。