共用方式為

在 Lakeflow 声明性管道中使用参数

本文介绍如何使用 Lakeflow 声明性管道配置参数化管道代码。

引用参数

在更新期间,管道源代码可以使用语法访问管道参数,以获取 Spark 配置的值。

使用键来引用管道参数。 在源代码逻辑计算之前,该值将作为字符串注入源代码中。

以下示例语法使用具有键 source_catalog 和值 dev_catalog 的参数来指定具体化视图的数据源:

SQL

CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id

Python

import dlt
from pyspark.sql.functions import col, sum, count

@dlt.table
def transaction_summary():

  source_catalog = spark.conf.get("source_catalog")

  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )

设置参数

通过将任意键值对作为配置传递参数给管道。 使用工作区 UI 或 JSON 定义或编辑管道配置时,可以设置参数。 请参照 配置 Lakeflow 声明性管道

管道参数键只能包含 _ - . 或字母数字字符。 参数值设置为字符串。

管道参数不支持动态值。 必须更新与管道配置中的密钥关联的值。

重要

不要使用与保留管道或 Apache Spark 配置值冲突的关键字。

在 Python 或 SQL 中参数化数据集声明

定义数据集的 Python 和 SQL 代码可由管道的设置参数化。 参数化支持以下用例:

  • 将长路径和其他变量与代码分离。
  • 减少开发或过渡环境中处理的数据量,以加快测试速度。
  • 重用同一转换逻辑从多个数据源进行处理。

以下示例使用 startDate 配置值将开发管道限制为输入数据的子集:

CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}

使用参数控制数据源

可以使用管道参数在同一管道的不同配置中指定不同的数据源。

例如,可以使用变量 data_source_path 为管道指定开发、测试和生产配置中的不同路径,然后使用以下代码引用它:

SQL

CREATE STREAMING TABLE bronze AS
SELECT *, _metadata.file_path AS source_file_path
FROM STREAM read_files(
  '${data_source_path}',
  format => 'csv',
  header => true
)

Python

import dlt
from pyspark.sql.functions import col

data_source_path = spark.conf.get("data_source_path")

@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )

此模式有利于测试引入逻辑在初始引入期间如何处理架构或格式不正确的数据。 在切换数据集时,可以在所有环境中在整个管道中使用相同的代码。