监视增量实时表管道

本文介绍如何对增量实时表管道使用内置监视和可观测性功能。 这些功能支持以下任务:

为管道事件添加电子邮件通知

可在出现以下情况时配置一个或多个电子邮件地址来接收通知:

  • 管道更新成功完成。
  • 管道更新失败,出现可重试或不可重试错误。 选择此选项将接收有关所有管道故障的通知。
  • 管道更新失败并出现不可重试(致命)错误。 选择此选项将仅在发生不可重试错误时接收通知。
  • 单个数据流失败。

若要在创建或编辑管道时配置电子邮件通知,请执行以下操作:

  1. 单击“添加通知”。
  2. 输入单个或多个电子邮件地址以接收通知。
  3. 单击要发送到配置的电子邮件地址的每种通知类型的复选框。
  4. 单击“添加通知”。

UI 中提供了哪些管道详细信息?

成功启动管道更新后,将显示管道图。 箭头表示管道中数据集之间的依赖关系。 默认情况下,管道详细信息页显示表的最近更新,但你可以从下拉菜单中选择旧的更新。

详细信息包括管道 ID、源代码、计算成本、产品版本、为管道配置的通道。

要查看数据集的表格视图,请点击“列表”选项卡。列表视图允许查看管道中的所有数据集,这些数据集在表中表示为行,并且当管道 DAG 太大而无法在Graph视图中可视化时非常有用。 可以使用多个筛选器(例如,数据集名称、类型和状态)控制表中显示的数据集。 要切换回 DAG 可视化效果,请点击“Graph”。

“运行身份”用户是管道所有者,管道更新使用此用户的权限运行。 若要更改 run as 用户,请单击“权限”并更改管道所有者。

如何查看数据集详细信息?

点击管道图或数据集列表中的数据集会显示有关数据集的详细信息。 详细信息包括数据集架构、数据质量指标,以及定义数据集的源代码的链接。

看更新历史记录

若要查看管道更新的历史记录和状态,请单击顶部栏中的“更新历史记录”下拉菜单。

在下拉菜单中选择更新,以查看更新的图、详细信息和事件。 若要返回到最新更新,请单击“显示最新更新”。

什么是增量实时表事件日志?

增量实时表事件日志包含与管道相关的所有信息,包括审核日志、数据质量检查、管道进度和数据世系。 可以使用事件日志来跟踪、了解和监视数据管道的状态。

可以在 Delta Live Tables 用户界面、Delta Live Tables API 中查看事件日志条目,也可以通过直接查询事件日志来这样做。 此部分重点介绍如何直接查询事件日志。

还可以定义在记录事件时要运行的自定义操作,例如,使用事件挂钩发送警报。

事件日志架构

下表描述了事件日志架构。 其中一些字段包含需要分析才能执行某些查询的 JSON 数据,例如 details 字段。 Azure Databricks 支持使用 : 运算符分析 JSON 字段。 请参阅:(冒号)运算符

字段 说明
id 事件日志记录的唯一标识符。
sequence 一个 JSON 文档,其中包含用于标识事件和对事件排序的元数据。
origin 一个 JSON 文档,其中包含云提供商、云提供商区域、user_idpipeline_idpipeline_type 等事件源的元数据,用于显示管道的创建位置(DBSQLWORKSPACE)。
timestamp 记录事件的时间。
message 描述事件的用户可读消息。
level 事件类型,例如 INFOWARNERRORMETRICS
error 如果出现错误,则为描述错误的详细信息。
details 一个 JSON 文档,其中包含事件的结构化详细信息。 这是用于分析事件的主要字段。
event_type 事件类型。
maturity_level 事件架构的稳定性。 可能的值包括:

- STABLE:架构稳定,不会改变。
- NULL:架构稳定,不会改变。 如果记录是在添加 maturity_level 字段之前(版本 2022.37)创建的,则该值可能是 NULL
- EVOLVING:架构不稳定,可能会发生变化。
- DEPRECATED:架构已弃用,Delta Live Tables 运行时可能随时停止生成此事件。

查询事件日志

事件日志的位置和用于查询事件日志的接口取决于管道是配置为使用 Hive 元存储还是 Unity Catalog。

Hive 元存储

如果管道将表发布到 Hive 元存储,则事件日志存储在 storage 位置下的 /system/events 中。 例如,如果已将管道 storage 设置配置为 /Users/username/data,则事件日志存储在 DBFS 的 /Users/username/data/system/events 路径中。

如果尚未配置 storage 设置,则默认的事件日志位置为 DBFS 中的 /pipelines/<pipeline-id>/system/events。 例如,如果管道的 ID 为 91de5e48-35ed-11ec-8d3d-0242ac130003,则存储位置为 /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events

可以创建视图以简化对事件日志的查询。 以下示例创建一个名为 event_log_raw 的临时视图。 此视图用于本文中包含的示例事件日志查询:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

请将 <event-log-path> 替换为事件日志位置。

管道运行的每个实例称为更新。 你通常需要提取有关最近更新的信息。 运行以下查询以查找最新更新的标识符,并将其保存在 latest_update_id 临时视图中。 此视图用于本文中包含的示例事件日志查询:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

可以在 Azure Databricks 笔记本或 SQL 编辑器中查询事件日志。 使用笔记本或 SQL 编辑器运行示例事件日志查询。

Unity Catalog

如果管道将表发布到 Unity Catalog,你必须使用 event_log 表值函数 (TVF) 来提取管道的事件日志。 通过将管道 ID 或表名称传递给 TVF 来检索管道的事件日志。 例如,若要检索 ID 为 04c78631-3dd7-4856-b2a6-7d84e9b2638b 的管道的事件日志记录,请执行以下命令:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

若要检索创建或拥有表 my_catalog.my_schema.table1 的管道的事件日志记录,请执行以下命令:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

若要调用 TVF,必须使用共享群集或 SQL 仓库。 例如,可以使用连接到共享群集的笔记本,也可以使用连接到 SQL 仓库的 SQL 编辑器

为了简化对管道事件的查询,管道所有者可以通过 event_log TVF 创建视图。 以下示例创建管道事件日志的视图。 此视图用于本文中包含的示例事件日志查询。

注意

event_log TVF 只能由管道所有者调用,而通过 event_log TVF 创建的视图只能由管道所有者查询。 无法与其他用户共享视图。

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

<pipeline-ID> 替换为 Delta Live Tables 管道的唯一标识符。 可以在 Delta Live Tables UI 的“管道详细信息”面板中找到该 ID。

管道运行的每个实例称为更新。 你通常需要提取有关最近更新的信息。 运行以下查询以查找最新更新的标识符,并将其保存在 latest_update_id 临时视图中。 此视图用于本文中包含的示例事件日志查询:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

从事件日志查询世系信息

包含有关世系的信息的事件具有事件类型 flow_definitiondetails:flow_definition 对象包含用于定义图中每种关系的 output_datasetinput_datasets

可以使用以下查询提取输入和输出数据集以查看世系信息:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
customers null
sales_orders_raw null
sales_orders_cleaned ["customers", "sales_orders_raw"]
sales_order_in_la ["sales_orders_cleaned"]

从事件日志查询数据质量

如果你针对管道中的数据集定义了期望,则数据质量指标将存储在 details:flow_progress.data_quality.expectations 对象中。 包含有关数据质量的信息的事件具有事件类型 flow_progress。 以下示例查询最后一个管道更新的数据质量指标:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
sales_orders_cleaned valid_order_number 4083 0

通过查询事件日志监视数据积压

增量实时表会跟踪 details:flow_progress.metrics.backlog_bytes 对象中积压的数据量。 包含积压指标的事件具有事件类型 flow_progress。 以下示例查询最后一个管道更新的积压工作 (backlog) 指标:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

注意

积压工作指标可能不可用,具体取决于管道的数据源类型和 Databricks Runtime 版本。

监视未启用无服务器功能的管道事件日志中的增强型自动缩放事件

对于未使用无服务器计算的 DLT 管道,事件日志会在管道中启用增强型自动缩放时捕获群集重设大小。 包含有关增强型自动缩放的信息的事件具有事件类型 autoscale。 群集重设大小请求信息存储在 details:autoscale 对象中。 以下示例查询上次管道更新的增强型自动缩放群集重设大小请求:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

监视计算资源利用率

cluster_resources 事件提供有关群集中任务槽数、这些任务槽的使用量,以及等待计划的任务数的指标。

启用增强型自动缩放后,cluster_resources 事件还包含自动缩放算法的指标,包括 latest_requested_num_executorsoptimal_num_executors。 这些事件还会将算法的状态显示为不同的状态,例如 CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION。 可以结合自动缩放事件一起查看此信息,以全面了解增强型自动缩放的情况。

以下示例查询上次管道更新的任务队列大小历史记录:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

以下示例查询上次管道更新的使用历史记录:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

以下示例查询执行程序计数历史记录,以及仅适用于增强型自动缩放管道的指标,包括算法在最新请求中请求的执行程序数量、算法根据最近指标建议的最佳执行程序数量,以及自动缩放算法状态:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

审核增量实时表管道

可以使用增量实时表事件日志记录和其他 Azure Databricks 审核日志来全面了解数据在增量实时表中的更新方式。

增量实时表使用管道所有者的凭据来运行更新。 你可以更改通过更新管道所有者使用的凭据。 增量实时表记录用户对管道执行的操作,包括管道创建、配置编辑和触发更新。

有关 Unity Catalog 审核事件的参考,请参阅 Unity Catalog 事件

在事件日志中查询用户操作

可以使用事件日志来审核事件,例如用户操作。 包含有关用户操作的信息的事件具有事件类型 user_action

有关操作的信息存储在 details 字段的 user_action 对象中。 请使用以下查询来构造用户事件的审核日志。 若要创建此查询中使用的 event_log_raw 视图,请参阅查询事件日志

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

运行时信息

可以查看管道更新的运行时信息,例如更新的 Databricks Runtime 版本:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0