使用 Python 开发管道代码

DLT 引入了几个新的 Python 代码构造,用于在管道中定义具体化视图和流式处理表。 Python 对开发管道的支持基于 PySpark DataFrame 和结构化流式处理 API 的基础知识。

对于不熟悉 Python 和数据帧的用户,Databricks 建议使用 SQL 接口。 请参阅使用 SQL 开发管道代码

有关 DLT Python 语法的完整参考,请参阅 DLT Python 语言参考

用于管道开发的 Python 基础知识

创建 DLT 数据集的 Python 代码必须返回 DataFrames。

所有 DLT Python API 都在模块中 dlt 实现。 使用 Python 实现的 DLT 管道代码必须在 Python 笔记本和文件的顶部显式导入 dlt 模块。

读取和写入管道配置期间指定的目录和架构的默认值。 请参阅 设置目标目录和数据库架构

特定于 DLT 的 Python 代码与其他类型的 Python 代码不同,这一关键方式是:Python 管道代码不直接调用执行数据引入和转换以创建 DLT 数据集的函数。 相反,DLT 会从管道中配置的所有源代码文件中解释 dlt 模块里的修饰器函数,并生成数据流图。

重要

要避免管道运行时出现意外行为,请不要在定义数据集的函数中包含可能具有副作用的代码。 要了解详细信息,请参阅Python 参考

使用 Python 创建具体化视图或流式表

@dlt.table 修饰器指示 DLT 基于函数返回的结果创建物化视图或流表。 批处理读取的结果创建物化视图,而流式读取的结果则创建流表。

默认,物化视图和流式表的名称由函数名称推断而来。 下面的代码示例演示用于创建具体化视图和流式处理表的基本语法:

备注

这两个函数引用 samples 目录中的同一个表,并使用同一修饰器函数。 这些示例突出显示了具体化视图和流式处理表在基本语法上的唯一区别是使用spark.readspark.readStream

并非所有数据源都支持流式读取。 某些数据源应始终使用流处理语义进行处理。

import dlt

@dlt.table()
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table()
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

(可选)可以使用修饰器中的name参数指定表名@dlt.table。 以下示例演示具体化视图和流式处理表的此模式:

import dlt

@dlt.table(name = "trips_mv")
def basic_mv():
  return spark.read.table("samples.nyctaxi.trips")

@dlt.table(name = "trips_st")
def basic_st():
  return spark.readStream.table("samples.nyctaxi.trips")

从对象存储加载数据

DLT 支持从 Azure Databricks 支持的所有格式加载数据。 请参阅数据格式选项

备注

这些示例使用自动装载到工作区的 /databricks-datasets 下的可用数据。 Databricks 建议使用卷路径或云 URI 来引用存储在云对象存储中的数据。 请参阅什么是 Unity Catalog 卷?

Databricks 建议在针对存储在云对象存储中的数据配置增量引入工作负荷时使用自动加载器和流式处理表。 请参阅什么是自动加载程序?

以下示例使用自动加载程序从 JSON 文件创建流式处理表:

import dlt

@dlt.table()
def ingestion_st():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

以下示例使用批处理语义读取 JSON 目录并创建具体化视图:

import dlt

@dlt.table()
def batch_mv():
  return spark.read.format("json").load("/databricks-datasets/retail-org/sales_orders")

用预期来验证数据

可以使用预期来设置和强制实施数据质量约束。 请参阅通过管道预期管理数据质量

以下代码使用 @dlt.expect_or_drop 定义一个名为 valid_data 的预期,该预期在数据引入期间删除为 null 的记录。

import dlt

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders_valid():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

查询管道中定义的具体化视图和流式处理表

以下示例定义四个数据集:

  • 一个名为 orders 的流式处理表,用于加载 JSON 数据。
  • 一个名为 customers 的物化视图,用于加载 CSV 数据。
  • 一个名为customer_orders的具体化视图,该视图用于联接orderscustomers数据集中的记录,将订单时间戳转换为日期,并选择customer_idorder_numberstateorder_date字段。
  • 名为 daily_orders_by_state 的物化视图,用于汇总每个州的每日订单数量。

备注

在管道中查询视图或表时,可以直接指定目录和架构,也可以使用管道中配置的默认值。 在此示例中,orderscustomerscustomer_orders 表是从为管道配置的默认目录和架构中写入和读取的。

传统发布模式使用 LIVE 架构来查询管道中定义的其他物化视图和流式表。 在新管道中,LIVE 架构语法会被悄无声息地忽略。 请参阅 LIVE 架构(旧版)。

import dlt
from pyspark.sql.functions import col

@dlt.table()
@dlt.expect_or_drop("valid_date", "order_datetime IS NOT NULL AND length(order_datetime) > 0")
def orders():
  return (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .load("/databricks-datasets/retail-org/sales_orders")
  )

@dlt.table()
def customers():
    return spark.read.format("csv").option("header", True).load("/databricks-datasets/retail-org/customers")

@dlt.table()
def customer_orders():
  return (spark.read.table("orders")
    .join(spark.read.table("customers"), "customer_id")
      .select("customer_id",
        "order_number",
        "state",
        col("order_datetime").cast("int").cast("timestamp").cast("date").alias("order_date"),
      )
  )

@dlt.table()
def daily_orders_by_state():
    return (spark.read.table("customer_orders")
      .groupBy("state", "order_date")
      .count().withColumnRenamed("count", "order_count")
    )

for 循环中创建表

可以使用 Python for 循环以编程方式创建多个表。 这在您有多个数据源或目标数据集时非常有用,这些数据源或数据集仅因几个参数的不同而异,从而减少需要维护的代码总量和代码冗余。

for 循环按串行顺序评估逻辑,但在为数据集完成规划后,管道并行运行逻辑。

重要

使用此模式定义数据集时,请确保传递给 for 循环的值列表始终是累加的。 如果以前在管道中定义的数据集从将来的管道运行中省略,该数据集将自动从目标架构中删除。

以下示例创建五个表,用于按区域筛选客户订单。 在这里,区域名称用于设置目标具体化视图的名称并筛选源数据。 临时视图用于定义来自源表的联接,这些表用于构造最终的具体化视图。

import dlt
from pyspark.sql.functions import collect_list, col

@dlt.view()
def customer_orders():
  orders = spark.read.table("samples.tpch.orders")
  customer = spark.read.table("samples.tpch.customer")

  return (orders.join(customer, orders.o_custkey == customer.c_custkey)
    .select(
      col("c_custkey").alias("custkey"),
      col("c_name").alias("name"),
      col("c_nationkey").alias("nationkey"),
      col("c_phone").alias("phone"),
      col("o_orderkey").alias("orderkey"),
      col("o_orderstatus").alias("orderstatus"),
      col("o_totalprice").alias("totalprice"),
      col("o_orderdate").alias("orderdate"))
  )

@dlt.view()
def nation_region():
  nation = spark.read.table("samples.tpch.nation")
  region = spark.read.table("samples.tpch.region")

  return (nation.join(region, nation.n_regionkey == region.r_regionkey)
    .select(
      col("n_name").alias("nation"),
      col("r_name").alias("region"),
      col("n_nationkey").alias("nationkey")
    )
  )

# Extract region names from region table

region_list = spark.read.table("samples.tpch.region").select(collect_list("r_name")).collect()[0][0]

# Iterate through region names to create new region-specific materialized views

for region in region_list:

  @dlt.table(name=f"{region.lower().replace(' ', '_')}_customer_orders")
  def regional_customer_orders(region_filter=region):

    customer_orders = spark.read.table("customer_orders")
    nation_region = spark.read.table("nation_region")

    return (customer_orders.join(nation_region, customer_orders.nationkey == nation_region.nationkey)
      .select(
        col("custkey"),
        col("name"),
        col("phone"),
        col("nation"),
        col("region"),
        col("orderkey"),
        col("orderstatus"),
        col("totalprice"),
        col("orderdate")
      ).filter(f"region = '{region_filter}'")
    )

下面是此管道的数据流图示例:

两个视图的数据流图指向五个区域表。

故障排除: for 循环创建多个具有相同值的表

由管道用来评估 Python 代码的惰性执行模型要求你的逻辑在调用由 @dlt.table() 修饰的函数时直接引用各个值。

以下示例演示了使用循环定义表 for 的两种正确方法。 在这两个示例中,列表中的每个表名tables都在由@dlt.table()修饰的函数中被明确引用。

import dlt

# Create a parent function to set local variables

def create_table(table_name):
  @dlt.table(name=table_name)
  def t():
    return spark.read.table(table_name)

tables = ["t1", "t2", "t3"]
for t_name in tables:
  create_table(t_name)

# Call `@dlt.table()` within a for loop and pass values as variables

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table(table_name=t_name):
    return spark.read.table(table_name)

以下示例未正确引用值。 此示例创建具有不同名称的表,但所有表都从循环中的 for 最后一个值加载数据:

import dlt

# Don't do this!

tables = ["t1", "t2", "t3"]
for t_name in tables:

  @dlt.table(name=t_name)
  def create_table():
    return spark.read.table(t_name)