常见方案是按常规计划运行的查询,其中作业会对其进行协调。 例如,在每天结束时,将运行一个查询,以基于当天对源数据集的更改更新系统。 本教程将指导您创建一个包含参数的查询,这些参数用于标识导入数据的时间范围,然后创建一个作业以每天运行该查询。
本教程中创建的查询和参数与最佳做法相匹配,并设置为允许稍后根据需要运行 回填作业 。
先决条件
若要完成本教程,必须有权访问 Azure Databricks 中的系统表。
步骤 1:创建查询
在本教程中,你将创建一个查询,该查询使用参数描述要拉取的数据。 例如,本教程使用系统表中的计费数据来计算每日 Azure Databricks 成本。
查询使用两个参数:
| 参数 | Use |
|---|---|
data_interval_end |
作业运行的日期(用于定期计划),即应该处理的时间范围的结束日期。 或者,对于回填作业,回填数据的截止日期。 |
lookback_days |
要查询的数据天数。 查询从 data_interval_end运行查询的时间或日期(通常是运行查询的时间或日期)进行回溯,因此需要回溯而不是向前查看。 |
按照以下步骤创建查询:
在工作区中,单击
“新建”,然后选择
用于创建新笔记本的笔记本。
名称默认为
Untitled Notebook <date-time>. 单击笔记本顶部的名称,并为其指定描述性名称,例如Query billing with parameters tutorial。在笔记本编辑器顶部,从语言选择器中选择 SQL 。
在第一个单元格中,添加以下代码。 将
<catalog>和<schema>替换为你有权访问且要使用的目录和架构。USE CATALOG <catalog>; USE SCHEMA <schema>; CREATE TABLE IF NOT EXISTS tutorial_databricks_product_spend (billing_origin_product STRING, usage_date DATE, total_dollar_cost DECIMAL(12, 2)); -- Process the last N days specified by :lookback_days ending on :data_interval_end INSERT INTO TABLE tutorial_databricks_product_spend REPLACE WHERE usage_date >= date_add(:data_interval_end, - CAST(:lookback_days AS INT)) AND usage_date < :data_interval_end SELECT usage.billing_origin_product, usage.usage_date, SUM(usage.usage_quantity * list_prices.pricing.effective_list.default) AS total_dollar_cost FROM system.billing.usage AS usage JOIN system.billing.list_prices AS list_prices ON usage.sku_name = list_prices.sku_name AND usage.usage_end_time >= list_prices.price_start_time AND ( list_prices.price_end_time IS NULL OR usage.usage_end_time < list_prices.price_end_time ) WHERE usage.usage_date >= date_add(:data_interval_end, -CAST(:lookback_days AS INT)) AND usage.usage_date < :data_interval_end GROUP BY usage.billing_origin_product, usage.usage_date通过单击编辑然后选择添加参数以添加两个参数。 参数应具有以下名称和默认值:
Name 默认值 lookback_days1data_interval_end< none>。 此参数始终是必需的。 若要了解参数以及如何在不同的任务类型或 Python 中访问它们,请参阅 任务的 Access 参数值。
查询现已准备就绪。 查询从系统表中读取整整一天的数据,然后使用 REPLACE WHERE 替换目标表中的现有数据。 通过替换而不是插入数据,即使同一天的查询被再次运行,也不会有任何损害。 事实上,如果处理过程中出现错误或数据延迟到达,这实际上使您可以重新执行当天的处理任务。
可以按照以下步骤测试查询:
- 请在笔记本单元格上方提供一个值,格式为
data_interval_end,例如yyyy-mm-dd。 - (可选)单击
连接 并选择要使用的计算资源。
- 单击
全部运行。
- 运行完成后,可以通过打开
查看创建的表。从左侧菜单中选择目录,然后选择在查询中设置的目录和架构。
接下来,为查询创建定期计划。
步骤 2:创建一个作业来安排查询计划
在工作区中,单击
,然后在边栏中选择作业和管道。
单击创建,然后选择作业。 “ 任务 ”选项卡显示空任务窗格。
注释
如果 Lakeflow 作业 UI 为 ON,请单击 Notebook 磁贴以配置第一个任务。 如果 笔记本 磁贴不可用,请单击“ 添加其他任务类型 ”并搜索 笔记本。
(可选)将作业的名称(默认为
New Job <date-time>)替换为作业名称。在 “任务名称 ”字段中,输入任务的名称;例如,
tutorial-databricks-spend.如有必要,请从“类型”下拉菜单中选择“笔记本”。
在 “源 ”下拉菜单中,选择 “工作区”,以便使用之前保存的笔记本。
对于 Path,请使用文件浏览器查找创建的第一个笔记本,单击笔记本名称,然后单击“ 确认”。
单击
在“参数”部分中添加。 添加参数
lookback_days,其值为1。单击
在“参数”部分中添加。 添加
data_interval_end参数。 单击值旁边的 { } 以查看参数化值的列表。 在列表中选择{{job.trigger.time.iso_date}}来插入值。这会传递作业运行被触发的日期作为参数。
注释
如果你的查询回溯时间较小,则可以用于
{{job.trigger.time.iso_datetime}}传递时间,例如一小时而不是一天。 在这种情况下,任一选项在查询中都有效,但iso_date显示参数的意图。单击“创建任务”。
在任务右侧的详细信息面板上,在 “计划和触发器”下,单击“ 添加触发器”。
在 触发器类型 下,选择 定时。
保留每天运行一次的活动触发器的默认值。
单击“ 保存”。
你的任务现在每天运行你的查询。 默认情况下,此作在创建触发器的同一天时间运行。 可以编辑触发器,并选择 高级 触发器类型以设置特定时间。
注释
如果不想收取每天运行本教程作业的费用,可以单击“ 在 刚刚创建的计划下暂停。 这会保留计划,但在解除暂停状态之前,它不会运行。 可以随时手动运行它。
接下来,运行回填以将旧数据加载到表中。
步骤 3:运行旧数据的补全
可以运行回填操作来填充旧数据。 例如,如果需要在表中填充上周的数据。 这些说明将创建 7 个回填作业以处理过去 7 天的数据。
单击页面顶部
Chevron down icon.“立即运行”旁边的向下箭头()。
从显示的下拉列表中选择 “运行回填 ”。 这将打开 “运行回填 ”对话框。
将日期范围更改为要回填的范围。 选择 开始 时间为 7 天前,凌晨 0:00,结束 时间为今天,同样是凌晨 0:00。 例如,可以选择
09/14/2025, 12:00 AM作为开始时间和09/21/2025, 12:00 AM结束时间。对于每个回填的时间间隔,请选择 “每
1Day一个”。在 “作业参数”下,现有参数以键和值显示。 确认
data_interval_end参数已设置为{{backfill.iso_datetime}},且lookback_days是1。单击“ 运行 ”以启动回填运行。 这会触发 7 次回填运行,每天一次。
回填可以并行或按顺序运行,具体取决于作业设置。 有关回填的详细信息,请参阅 回填作业。