使用查找表来处理任务中的
For each
任务迭代传递参数数组给嵌套任务,为每个嵌套任务提供其运行所需的信息。 如果使用任务值引用传递参数数组,则参数数组限制为 10,000 个字符,或 48 KB。 如果要传递到嵌套任务的数据量较大,则不能直接使用输入、任务值或作业参数来传递该数据。
传递完整数据的一种替代方法是将任务数据存储为 JSON 文件,并在任务输入中传入一个查找键,而不是传递完整的数据。 嵌套任务可以使用密钥来检索每次迭代所需的特定数据。
以下示例演示了一个示例 JSON 配置文件,以及如何将参数传递给在 JSON 配置中查找值的嵌套任务。
此示例配置是一个步骤列表,每个迭代都有相应的参数(args
)(此示例仅展示三个步骤)。 假定此 JSON 文件保存为 /Workspace/Users/<user>/copy-filtered-table-config.json
。 我们在嵌套任务中引用它。
{
"steps": [
{
"key": "table_1",
"args": {
"catalog": "my-catalog",
"schema": "my-schema",
"source_table": "raw_data_table_1",
"destination_table": "filtered_table_1",
"filter_column": "col_a",
"filter_value": "value_1"
}
}
{
"key": "table_2",
"args": {
"catalog": "my-catalog",
"schema": "my-schema",
"source_table": "raw_data_table_2",
"destination_table": "filtered_table_2",
"filter_column": "col_b",
"filter_value": "value_2"
}
},
{
"key": "table_3",
"args": {
"catalog": "my-catalog",
"schema": "my-schema",
"source_table": "raw_data_table_3",
"destination_table": "filtered_table_3",
"filter_column": "col_c",
"filter_value": "value_3"
}
},
]
}
作业中的 For each
任务包括每次迭代的键输入。 此示例展示了一个名叫copy-filtered-tables
的任务,其中输入已设置为["table_1","table_2","table_3"]
。 此列表限制为 10,000 个字符,但由于你只是传递密钥,所以它比完整数据要小得多。
在此示例中,步骤不依赖于其他步骤或任务,因此我们可以设置大于 1 的并发,使任务运行速度更快。
嵌套任务接收来自父For each
任务的输入。 在此示例中,我们将输入设置为用作配置文件的 Key
。 下图显示了嵌套任务,包括设置名为 key
、值为 {{input}}
的参数。
此任务是包含代码的笔记本。 在笔记本中,可以使用以下 Python 代码读取输入,并将其用作配置 JSON 文件中的密钥。 来自 JSON 文件的数据用于读取、筛选和写入表中的数据。
# copy-filtered-table (iteratable task code to read a table, filter by a value, and write as a new table)
from pyspark.sql.functions import expr
from types import SimpleNamespace
import json
# If the notebook is run outside of a job with a key parameter, this provides
# a default. This allows testing outside of a For each task
dbutils.widgets.text("key", "table_1", "key")
# load configuration (note that the path must be set to valid configuration file)
config_path = "/Workspace/Users/<user>/copy-filtered-table-config.json"
with open(config_path, "r") as file:
config = json.loads(file.read())
# look up step and arguments
key = dbutils.widgets.get("key")
current_step = next((step for step in config['steps'] if step['key'] == key), None)
if current_step is None:
raise ValueError(f"Could not find step '{key}' in the configuration")
args = SimpleNamespace(**current_step["args"])
# read the source table defined for the step, and filter it
df = spark.read.table(f"{args.catalog}.{args.schema}.{args.source_table}") \
.filter(expr(f"{args.filter_column} like '%{args.filter_value}%'"))
# write the filtered table to the destination
df.write.mode("overwrite").saveAsTable(f"{args.catalog}.{args.schema}.{args.destination_table}")