使用 Lakeflow Jobs 设置支持回填的定期查询

常见方案是按常规计划运行的查询,其中作业会对其进行协调。 例如,在每天结束时,将运行一个查询,以基于当天对源数据集的更改更新系统。 本教程将指导您创建一个包含参数的查询,这些参数用于标识导入数据的时间范围,然后创建一个作业以每天运行该查询。

本教程中创建的查询和参数与最佳做法相匹配,并设置为允许稍后根据需要运行 回填作业

先决条件

若要完成本教程,必须有权访问 Azure Databricks 中的系统表。

步骤 1:创建查询

在本教程中,你将创建一个查询,该查询使用参数描述要拉取的数据。 例如,本教程使用系统表中的计费数据来计算每日 Azure Databricks 成本。

查询使用两个参数:

参数 Use
data_interval_end 作业运行的日期(用于定期计划),即应该处理的时间范围的结束日期。 或者,对于回填作业,回填数据的截止日期。
lookback_days 要查询的数据天数。 查询从 data_interval_end运行查询的时间或日期(通常是运行查询的时间或日期)进行回溯,因此需要回溯而不是向前查看。

按照以下步骤创建查询:

  1. 在工作区中,单击“加号”图标。“新建”,然后选择“笔记本”图标。用于创建新笔记本的笔记本

  2. 名称默认为 Untitled Notebook <date-time>. 单击笔记本顶部的名称,并为其指定描述性名称,例如 Query billing with parameters tutorial

  3. 在笔记本编辑器顶部,从语言选择器中选择 SQL

  4. 在第一个单元格中,添加以下代码。 将<catalog><schema>替换为你有权访问且要使用的目录和架构。

    USE CATALOG <catalog>;
    USE SCHEMA <schema>;
    
    CREATE TABLE IF NOT EXISTS tutorial_databricks_product_spend (billing_origin_product STRING, usage_date DATE, total_dollar_cost DECIMAL(12, 2));
    
    -- Process the last N days specified by :lookback_days ending on :data_interval_end
    INSERT INTO TABLE tutorial_databricks_product_spend
      REPLACE WHERE
        usage_date >= date_add(:data_interval_end, - CAST(:lookback_days AS INT)) AND usage_date < :data_interval_end
      SELECT
        usage.billing_origin_product,
        usage.usage_date,
        SUM(usage.usage_quantity * list_prices.pricing.effective_list.default) AS total_dollar_cost
      FROM
        system.billing.usage AS usage
          JOIN system.billing.list_prices AS list_prices
            ON usage.sku_name = list_prices.sku_name
            AND usage.usage_end_time >= list_prices.price_start_time
            AND (
              list_prices.price_end_time IS NULL
              OR usage.usage_end_time < list_prices.price_end_time
            )
      WHERE
        usage.usage_date >=
          date_add(:data_interval_end, -CAST(:lookback_days AS INT))
        AND usage.usage_date <
          :data_interval_end
      GROUP BY
        usage.billing_origin_product,
        usage.usage_date
    
  5. 通过单击编辑然后选择添加参数以添加两个参数。 参数应具有以下名称和默认值:

    Name 默认值
    lookback_days 1
    data_interval_end < none>。 此参数始终是必需的。

    若要了解参数以及如何在不同的任务类型或 Python 中访问它们,请参阅 任务的 Access 参数值

查询现已准备就绪。 查询从系统表中读取整整一天的数据,然后使用 REPLACE WHERE 替换目标表中的现有数据。 通过替换而不是插入数据,即使同一天的查询被再次运行,也不会有任何损害。 事实上,如果处理过程中出现错误或数据延迟到达,这实际上使您可以重新执行当天的处理任务。

可以按照以下步骤测试查询:

  1. 请在笔记本单元格上方提供一个值,格式为data_interval_end,例如yyyy-mm-dd
  2. (可选)单击 “圆圈”图标。连接 并选择要使用的计算资源。
  3. 单击播放图标。全部运行
  4. 运行完成后,可以通过打开目录图标查看创建的表。从左侧菜单中选择目录,然后选择在查询中设置的目录和架构。

接下来,为查询创建定期计划。

步骤 2:创建一个作业来安排查询计划

  1. 在工作区中,单击工作流图标,然后在边栏中选择作业和管道

  2. 单击创建,然后选择作业。 “ 任务 ”选项卡显示空任务窗格。

    注释

    如果 Lakeflow 作业 UION,请单击 Notebook 磁贴以配置第一个任务。 如果 笔记本 磁贴不可用,请单击“ 添加其他任务类型 ”并搜索 笔记本

  3. (可选)将作业的名称(默认为 New Job <date-time>)替换为作业名称。

  4. “任务名称 ”字段中,输入任务的名称;例如, tutorial-databricks-spend.

  5. 如有必要,请从“类型”下拉菜单中选择“笔记本”。

  6. “源 ”下拉菜单中,选择 “工作区”,以便使用之前保存的笔记本。

  7. 对于 Path,请使用文件浏览器查找创建的第一个笔记本,单击笔记本名称,然后单击“ 确认”。

  8. 单击“加号”图标。“参数”部分中添加。 添加参数lookback_days,其值为1

  9. 单击“加号”图标。“参数”部分中添加。 添加data_interval_end参数。 单击旁边的 { } 以查看参数化值的列表。 在列表中选择{{job.trigger.time.iso_date}}来插入值。

    这会传递作业运行被触发的日期作为参数。

    注释

    如果你的查询回溯时间较小,则可以用于 {{job.trigger.time.iso_datetime}} 传递时间,例如一小时而不是一天。 在这种情况下,任一选项在查询中都有效,但 iso_date 显示参数的意图。

  10. 单击“创建任务”。

  11. 在任务右侧的详细信息面板上,在 “计划和触发器”下,单击“ 添加触发器”。

  12. 触发器类型 下,选择 定时

  13. 保留每天运行一次的活动触发器的默认值。

  14. 单击“ 保存”。

你的任务现在每天运行你的查询。 默认情况下,此作在创建触发器的同一天时间运行。 可以编辑触发器,并选择 高级 触发器类型以设置特定时间。

注释

如果不想收取每天运行本教程作业的费用,可以单击“ 暂停”图标。 刚刚创建的计划下暂停。 这会保留计划,但在解除暂停状态之前,它不会运行。 可以随时手动运行它。

接下来,运行回填以将旧数据加载到表中。

步骤 3:运行旧数据的补全

可以运行回填操作来填充旧数据。 例如,如果需要在表中填充上周的数据。 这些说明将创建 7 个回填作业以处理过去 7 天的数据。

  1. 单击页面顶部Chevron down icon.Chevron down icon.“立即运行”旁边的向下箭头()。

  2. 从显示的下拉列表中选择 “运行回填 ”。 这将打开 “运行回填 ”对话框。

  3. 将日期范围更改为要回填的范围。 选择 开始 时间为 7 天前,凌晨 0:00,结束 时间为今天,同样是凌晨 0:00。 例如,可以选择 09/14/2025, 12:00 AM 作为开始时间和 09/21/2025, 12:00 AM 结束时间。

  4. 对于每个回填的时间间隔,请选择 “每1Day 一个”。

  5. “作业参数”下,现有参数以键和值显示。 确认data_interval_end参数已设置为{{backfill.iso_datetime}},且lookback_days1

  6. 单击“ 运行 ”以启动回填运行。 这会触发 7 次回填运行,每天一次。

回填可以并行或按顺序运行,具体取决于作业设置。 有关回填的详细信息,请参阅 回填作业