使用查找表来处理任务中的For each大型参数数组

For each 任务迭代传递参数数组给嵌套任务,为每个嵌套任务提供其运行所需的信息。 如果使用任务值引用传递参数数组,则参数数组限制为 10,000 个字符,或 48 KB。 如果要传递到嵌套任务的数据量较大,则不能直接使用输入、任务值或作业参数来传递该数据。

传递完整数据的一种替代方法是将任务数据存储为 JSON 文件,并在任务输入中传入一个查找键,而不是传递完整的数据。 嵌套任务可以使用密钥来检索每次迭代所需的特定数据。

以下示例演示了一个示例 JSON 配置文件,以及如何将参数传递给在 JSON 配置中查找值的嵌套任务。

示例 JSON 配置

此示例配置是一个步骤列表,每个迭代都有相应的参数(args)(此示例仅展示三个步骤)。 假定此 JSON 文件保存为 /Workspace/Users/<user>/copy-filtered-table-config.json。 我们在嵌套任务中引用它。

{
  "steps": [
    {
      "key": "table_1",
      "args": {
        "catalog": "my-catalog",
        "schema": "my-schema",
        "source_table": "raw_data_table_1",
        "destination_table": "filtered_table_1",
        "filter_column": "col_a",
        "filter_value": "value_1"
      }
    }
    {
      "key": "table_2",
      "args": {
        "catalog": "my-catalog",
        "schema": "my-schema",
        "source_table": "raw_data_table_2",
        "destination_table": "filtered_table_2",
        "filter_column": "col_b",
        "filter_value": "value_2"
      }
    },
    {
      "key": "table_3",
      "args": {
        "catalog": "my-catalog",
        "schema": "my-schema",
        "source_table": "raw_data_table_3",
        "destination_table": "filtered_table_3",
        "filter_column": "col_c",
        "filter_value": "value_3"
      }
    },
  ]
}

示例 For each 任务

作业中的 For each 任务包括每次迭代的键输入。 此示例展示了一个名叫copy-filtered-tables的任务,其中输入已设置为["table_1","table_2","table_3"]。 此列表限制为 10,000 个字符,但由于你只是传递密钥,所以它比完整数据要小得多。

在此示例中,步骤不依赖于其他步骤或任务,因此我们可以设置大于 1 的并发,使任务运行速度更快。

对于每个任务,显示输入和并发

示例嵌套任务

嵌套任务接收来自父For each任务的输入。 在此示例中,我们将输入设置为用作配置文件的 Key。 下图显示了嵌套任务,包括设置名为 key、值为 {{input}} 的参数

对于每个嵌套任务,显示如何使用输入

此任务是包含代码的笔记本。 在笔记本中,可以使用以下 Python 代码读取输入,并将其用作配置 JSON 文件中的密钥。 来自 JSON 文件的数据用于读取、筛选和写入表中的数据。

# copy-filtered-table (iteratable task code to read a table, filter by a value, and write as a new table)

from pyspark.sql.functions import expr
from types import SimpleNamespace

import json

# If the notebook is run outside of a job with a key parameter, this provides
# a default. This allows testing outside of a For each task
dbutils.widgets.text("key", "table_1", "key")

# load configuration (note that the path must be set to valid configuration file)
config_path = "/Workspace/Users/<user>/copy-filtered-table-config.json"
with open(config_path, "r") as file:
    config = json.loads(file.read())

# look up step and arguments
key = dbutils.widgets.get("key")
current_step = next((step for step in config['steps'] if step['key'] == key), None)
if current_step is None:
    raise ValueError(f"Could not find step '{key}' in the configuration")

args = SimpleNamespace(**current_step["args"])

# read the source table defined for the step, and filter it
df = spark.read.table(f"{args.catalog}.{args.schema}.{args.source_table}") \
          .filter(expr(f"{args.filter_column} like '%{args.filter_value}%'"))

# write the filtered table to the destination
df.write.mode("overwrite").saveAsTable(f"{args.catalog}.{args.schema}.{args.destination_table}")