使用 DLT 汇聚器将记录流传输到外部服务

重要

DLT sink API 目前为公共预览版

本文介绍 DLT sink API 以及如何将其与 DLT 流 配合使用,以将管道转换的记录写入到外部数据接收器,例如 Unity 目录托管表和外部表、Hive 元存储表以及 Apache Kafka 或 Azure 事件中心等事件流服务。

什么是 DLT 接收器?

使用 DLT 接收器可以将转换后的数据写入目标,例如 Apache Kafka 或 Azure 事件中心等事件流式处理服务,以及由 Unity Catalog 或 Hive 元存储管理的外部表。 以前,在 DLT 管道中创建的流式处理表和具体化视图只能保存到 Azure Databricks 托管 Delta 表。 有了接收器,你现在有更多的选项来保存 DLT 管道的输出。

何时应使用 DLT 接收器?

如果你有以下需求,Databricks 建议使用 DLT 接收器:

  • 构建作用例,例如欺诈检测、实时分析和客户建议。 作用例通常从消息总线(例如 Apache Kafka 主题)读取数据,然后以低延迟处理数据,并将处理过的记录写回到消息总线。 此方法使你能够通过不从云存储写入或读取来实现较低的延迟。
  • 将转换后的数据从 DLT 流写入外部 Delta 实例管理的表,包括 Unity Catalog 托管表和外部表,以及 Hive 元存储表。
  • 将提取-转换-加载 (ETL) 反向执行到 Databricks 外部的接收器,例如 Apache Kafka 主题。 使用此方法可以有效地支持需要读取或使用 Unity 目录表或其他 Databricks 托管存储之外的数据的用例。

如何使用 DLT 接收器?

注释

  • 仅支持使用 spark.readStreamdlt.read_stream 的流式处理查询。 不支持批处理查询。
  • 只可使用 append_flow 写入到接收器。 不支持其他流,例如 apply_changes
  • 运行完全刷新更新不会清理以前在接收器中计算的结果数据。 这意味着任何重新处理的数据都将追加到接收器,并且不会更改现有数据。

当事件数据从流式来源引入到 DLT 管道时,您可以使用 DLT 功能处理和优化这些数据,然后使用追加流处理将转换后的数据记录流式传输到 DLT 接收端。 使用 create_sink() 函数创建此接收器。 有关使用 create_sink 函数的更多详细信息,请参阅 接收器 API 参考

若要实现 DLT 接收器,请执行以下步骤:

  1. 设置 DLT 管道来处理流式处理事件数据,并准备用于写入 DLT 接收器的数据记录。

  2. 配置并创建 DLT 接收器以使用首选目标接收器格式。

  3. 使用追加流将准备好的记录写入接收器。

本主题的其余部分介绍了这些步骤。

设置 DLT 管道以准备数据写入汇聚点

第一步是设置 DLT 管道,将原始事件流数据转换为要写入接收器的已准备数据。

为了更好地了解此过程,可以遵循 DLT 管道的此示例,该管道处理 wikipedia-datasets Databricks 中示例数据中的点击流事件数据。 此管道分析原始数据集,以识别链接到 Apache Spark 文档页的维基百科页面,并逐步将这些数据提炼为仅包含 Apache_Spark. 的引用链接所在的表行。

在此示例中,DLT 管道是使用 奖牌体系结构构建的,该体系结构将数据组织到不同的层以提高质量和处理效率。

首先,使用 自动加载程序将数据集中的原始 JSON 记录加载到青铜层。 此 Python 代码演示如何创建一个名为 clickstream_raw的流式处理表,其中包含源中的原始未处理数据:

import dlt

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/"

@dlt.table(
 comment="The raw Wikipedia clickstream dataset, ingested from databricks-datasets.",
 table_properties={
   "quality": "bronze"
 }
)
def clickstream_raw():
 return (
   spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").option("inferSchema", "true").load(json_path)
 )

运行此代码后,数据现在位于奖牌体系结构的“铜牌”(或“原始数据”)级别,必须进行清理。 下一步将数据优化为“silver”级别,这涉及到清理数据类型和列名,并使用 DLT 期望来确保数据完整性。

以下代码演示如何通过将铜牌层数据清理和验证到 clickstream_clean 银牌表来执行此操作:

@dlt.table(
 comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
 table_properties={
   "quality": "silver"
 }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
 return (
   spark.readStream.table("clickstream_raw")
     .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
     .withColumn("click_count", expr("CAST(n AS INT)"))
     .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
     .withColumnRenamed("curr_title", "current_page_title")
     .withColumnRenamed("prev_title", "previous_page_title")
     .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
 )

若要开发管道结构的“黄金”层,可以筛选已清理的点击流数据,以隔离引用页所在的 Apache_Spark条目。 在此最后一个代码示例中,仅选择写入到目标接收器表所需的列。

以下代码演示如何创建一个名为 spark_referrers 表示黄金层的表:

@dlt.table(
 comment="A table of the most common pages that link to the Apache Spark page.",
 table_properties={
   "quality": "gold"
 }
)
def spark_referrers():
 return (
   spark.readStream.table("clickstream_clean")
     .filter(expr("current_page_title == 'Apache_Spark'"))
     .withColumnRenamed("previous_page_title", "referrer")
     .select("referrer", "current_page_id", "current_page_title", "click_count")
 )

完成此数据准备过程后,必须配置要将清理的记录写入到的目标接收器。

配置 DLT 接收器

Databricks 支持将处理自流数据的记录写入三种类型的目标接收器:

  • 增量表接收器
  • Apache Kafka 接收器
  • Azure 事件中心接收器

下面是 Delta、Kafka 和 Azure 事件中心接收器的配置示例:

德尔塔水槽

若要按文件路径创建 Delta 接收器,请执行以下操作:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = {"path": "/Volumes/catalog_name/schema_name/volume_name/path/to/data"}
)

若要使用完全限定的目录和架构路径按表名称创建 Delta 接收器:

dlt.create_sink(
  name = "delta_sink",
  format = "delta",
  options = { "tableName": "my_catalog.my_schema.my_table" }
)

Kafka 和 Azure 事件中心接收器

此代码适用于 Apache Kafka 和 Azure 事件中心接收器。

topic_name = "dlt-sink"
eh_namespace_name = "dlt-eventhub"
bootstrap_servers = f"{eh_namespace_name}.servicebus.chinacloudapi.cn:9093"
connection_string = dbutils.secrets.get(scope="secret-lab", key="kafka-connection-string")

eh_sasl = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule' \
  + f' required username="$ConnectionString" password="{connection_string}";'

dlt.create_sink(
name = "eh_sink",
format = "kafka",
options = {
    "kafka.bootstrap.servers": bootstrap_servers,
    "kafka.sasl.mechanism": "PLAIN",
    "kafka.security.protocol": "SASL_SSL",
    "kafka.sasl.jaas.config": eh_sasl,
    "topic": topic_name
  }
)

一旦配置好汇入点并准备好 DLT 管道,即可开始将处理过的记录流式传输到汇入点。

使用追加流写入 DLT 接收器

配置接收器后,下一步是通过将接收器指定为追加流输出的记录的目标,向其写入处理好的记录。 为此,请将接收器指定为 target 修饰器中的 append_flow 值。

  • 对于 Unity 目录托管表和外部表,请使用格式 delta 并在选项中指定路径或表名称。 DLT 管道必须配置为使用 Unity Catalog。
  • 对于 Apache Kafka 主题,请使用 kafka 格式,并在选项中指定主题名称、连接信息和身份验证信息。 这些选项与 Spark 结构化流式处理 Kafka 接收器支持的选项相同。 请参阅配置 Kafka 结构化流式处理编写器
  • 对于 Azure 事件中心,请使用 kafka 格式,并在选项中指定事件中心名称、连接信息和身份验证信息。 这些选项与使用 Kafka 接口的 Spark 结构化流式处理事件中心接收器中支持的选项相同。 请参阅使用 Microsoft Entra ID 和 Azure 事件中心进行服务主体身份验证
  • 对于 Hive 元存储表,请使用 delta 格式并在选项中指定路径或表名称。 DLT 管道必须配置为使用Hive 元存储。

下面的示例显示如何设置数据流,以将经过 DLT 管道处理的记录写入 Delta、Kafka 和 Azure 事件中心接收器。

Delta 接收器

@dlt.append_flow(name = "delta_sink_flow", target="delta_sink")
def delta_sink_flow():
return(
  spark.readStream.table("spark_referrers")
  .selectExpr("current_page_id", "referrer", "current_page_title", "click_count")
)

Kafka 和 Azure 事件中心接收器

@dlt.append_flow(name = "kafka_sink_flow", target = "eh_sink")
def kafka_sink_flow():
return (
  spark.readStream.table("spark_referrers")
  .selectExpr("cast(current_page_id as string) as key", "to_json(struct(referrer, current_page_title, click_count)) AS value")
)

对于 Azure 事件中心接收器,value 参数是必需的。 其他参数(如 keypartitionheaderstopic)是可选的。

有关 append_flow 修饰器的更多详细信息,请参阅使用追加流从多个源流写入流式处理表

局限性

  • 仅支持 Python API。 不支持 SQL。

  • 仅支持使用 spark.readStreamdlt.read_stream 的流式处理查询。 不支持批处理查询。

  • 只可使用 append_flow 写入到接收器。 不支持其他流(例如 apply_changes),并且无法在 DLT 数据集定义中使用接收器。 例如,不支持以下各项:

    @table("from_sink_table")
    def fromSink():
      return read_stream("my_sink")
    
  • 对于 Delta 接收器,表名称必须完全限定。 具体而言,对于 Unity Catalog 托管的外部表,表名称必须是格式 <catalog>.<schema>.<table>。 对于 Hive 元存储,它必须采用 <schema>.<table> 格式。

  • 运行 FullRefresh 不会清理以前计算的结果数据。 这意味着任何重新处理的数据都将追加到接收器,并且不会更改现有数据。

  • 不支持 DLT 预期。

资源