本文介绍如何使用增量实时表管道配置参数化管道代码。
引用参数
在更新期间,管道源代码可以使用语法访问管道参数,以获取 Spark 配置的值。
使用密钥引用管道参数。 在源代码逻辑求值之前,该值将作为字符串被注入源代码中。
以下示例语法使用具有键 source_catalog 和值 dev_catalog 的参数来指定具体化视图的数据源:
SQL
CREATE OR REFRESH MATERIALIZED VIEW transation_summary AS
SELECT account_id,
  COUNT(txn_id) txn_count,
  SUM(txn_amount) account_revenue
FROM ${source_catalog}.sales.transactions_table
GROUP BY account_id
Python
import dlt
from pyspark.sql.functions import col, sum, count
@dlt.table
def transaction_summary():
  source_catalog = spark.conf.get("source_catalog")
  return (spark.read
      .table(f"{source_catalog}.sales.transactions_table")
      .groupBy("account_id")
      .agg(
        count(col("txn_id").alias("txn_count")),
        sum(col("txn_amount").alias("account_revenue"))
      )
    )
设置参数
通过将任意键值对作为管道的配置传递给管道,从而将参数传递给管道。 使用工作区 UI 或 JSON 定义或编辑管道配置时,可以设置参数。 请参阅配置增量实时表管道。
作业参数键只能包含 _ - . 或字母数字字符。 参数值设置为字符串。
管道参数不支持动态值。 必须更新与管道配置中的键关联的值。
重要
不要使用与保留管道或 Apache Spark 配置值冲突的关键字。
在 Python 或 SQL 中参数化数据集声明
可以通过管道设置将定义数据集的 Python 和 SQL 代码参数化。 参数化支持以下用例:
- 从代码中分离长路径和其他变量。
 - 减少在开发或过渡环境中处理的数据量,以加快测试速度。
 - 重用同一转换逻辑来处理多个数据源。
 
以下示例使用 startDate 配置值将开发管道限制为输入数据的子集:
CREATE OR REFRESH MATERIALIZED VIEW customer_events
AS SELECT * FROM sourceTable WHERE date > '${mypipeline.startDate}';
@dlt.table
def customer_events():
  start_date = spark.conf.get("mypipeline.startDate")
  return read("sourceTable").where(col("date") > start_date)
{
  "name": "Data Ingest - DEV",
  "configuration": {
    "mypipeline.startDate": "2021-01-02"
  }
}
{
  "name": "Data Ingest - PROD",
  "configuration": {
    "mypipeline.startDate": "2010-01-02"
  }
}
使用参数控制数据源
可使用管道参数在同一管道的不同配置中指定不同的数据源。
例如,可以使用变量 data_source_path 在管道的开发、测试和生产配置中指定不同的路径,然后使用以下代码引用该变量:
SQL
CREATE STREAMING TABLE bronze
AS (
    SELECT
    *,
    _metadata.file_path AS source_file_path
    FROM read_files( '${data_source_path}', 'csv',
            map("header", "true"))
)
Python
import dlt
from pyspark.sql.functions import col
data_source_path = spark.conf.get("data_source_path")
@dlt.table
def bronze():
    return (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", True)
        .load(data_source_path )
        .select("*", col("_metadata.file_path").alias("source_file_name"))
    )
要测试引入逻辑如何在初始引入期间处理架构或格式错误的数据,此模式比较有用。 在切换数据集时,可以在所有环境的整个管道中使用相同的代码。