监视增量实时表管道

本文介绍如何使用增量实时表中的内置功能来监视和观测管道,这些功能包括数据世系、更新历史和数据质量报告。

可以通过管道详细信息 UI 手动查看大多数监视数据。 通过查询事件日志元数据可以更轻松地完成某些任务。 请参阅什么是增量实时表事件日志?

UI 中提供了哪些管道详细信息?

成功启动管道更新后,将显示管道图。 箭头表示管道中数据集之间的依赖关系。 默认情况下,管道详细信息页显示表的最近更新,但你可以从下拉菜单中选择旧的更新。

显示的详细信息包括管道 ID、源库、计算成本、产品版本、为管道配置的通道。

要查看数据集的表格视图,请点击“列表”选项卡。列表视图允许查看管道中的所有数据集,这些数据集在表中表示为行,并且当管道 DAG 太大而无法在Graph视图中可视化时非常有用。 可以使用多个筛选器(例如,数据集名称、类型和状态)控制表中显示的数据集。 要切换回 DAG 可视化效果,请点击“Graph”。

“运行身份”用户是管道所有者,管道更新使用此用户的权限运行。 若要更改 run as 用户,请单击“权限”并更改管道所有者。

如何查看数据集详细信息?

点击管道图或数据集列表中的数据集会显示有关数据集的详细信息。 详细信息包括数据集架构、数据质量指标,以及指向定义数据集的源代码的链接。

看更新历史记录

若要查看管道更新的历史记录和状态,请单击顶部栏中的“更新历史记录”下拉菜单。

要查看某项更新的图、详细信息和事件,请在下拉菜单中选择该更新。 若要返回到最新更新,请单击“显示最新更新”。

获取管道事件的通知

若要接收管道事件的实时通知(例如管道更新成功完成或管道更新失败),请在创建或编辑管道时添加为管道事件添加电子邮件通知

什么是增量实时表事件日志?

增量实时表事件日志包含与管道相关的所有信息,包括审核日志、数据质量检查、管道进度和数据世系。 可以使用事件日志来跟踪、了解和监视数据管道的状态。

可以在 Delta Live Tables 用户界面、Delta Live Tables API 中查看事件日志条目,也可以通过直接查询事件日志来这样做。 本文重点介绍如何直接查询事件日志。

还可以定义在记录事件时要运行的自定义操作,例如,使用事件挂钩发送警报。

事件日志架构

下表描述了事件日志架构。 其中一些字段包含需要分析才能执行某些查询的 JSON 数据,例如 details 字段。 Azure Databricks 支持使用 : 运算符分析 JSON 字段。 请参阅:(冒号)运算符

字段 说明
id 事件日志记录的唯一标识符。
sequence 一个 JSON 文档,其中包含用于标识事件和对事件排序的元数据。
origin 一个 JSON 文档,其中包含云提供商、云提供商区域、user_idpipeline_idpipeline_type 等事件源的元数据,用于显示管道的创建位置(DBSQLWORKSPACE)。
timestamp 记录事件的时间。
message 描述事件的用户可读消息。
level 事件类型,例如 INFOWARNERRORMETRICS
error 如果出现错误,则为描述错误的详细信息。
details 一个 JSON 文档,其中包含事件的结构化详细信息。 这是用于分析事件的主要字段。
event_type 事件类型。
maturity_level 事件架构的稳定性。 可能的值包括:

* STABLE:架构稳定,不会改变。
* NULL:架构稳定,不会改变。 如果记录是在添加 maturity_level 字段之前(版本 2022.37)创建的,则该值可能是 NULL
* EVOLVING:架构不稳定,可能会发生变化。
* DEPRECATED:架构已弃用,Delta Live Tables 运行时可能随时停止生成此事件。

查询事件日志

事件日志的位置和用于查询事件日志的接口取决于管道是配置为使用 Hive 元存储还是 Unity Catalog。

Hive 元存储

如果管道将表发布到 Hive 元存储,则事件日志存储在 storage 位置下的 /system/events 中。 例如,如果已将管道 storage 设置配置为 /Users/username/data,则事件日志存储在 DBFS 的 /Users/username/data/system/events 路径中。

如果尚未配置 storage 设置,则默认的事件日志位置为 DBFS 中的 /pipelines/<pipeline-id>/system/events。 例如,如果管道的 ID 为 91de5e48-35ed-11ec-8d3d-0242ac130003,则存储位置为 /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events

可以创建视图以简化对事件日志的查询。 以下示例创建一个名为 event_log_raw 的临时视图。 此视图用于本文中包含的示例事件日志查询:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

请将 <event-log-path> 替换为事件日志位置。

管道运行的每个实例称为更新。 你通常需要提取有关最近更新的信息。 运行以下查询以查找最新更新的标识符,并将其保存在 latest_update_id 临时视图中。 此视图用于本文中包含的示例事件日志查询:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

可以在 Azure Databricks 笔记本或 SQL 编辑器中查询事件日志。 使用笔记本或 SQL 编辑器运行示例事件日志查询。

Unity Catalog

如果管道将表发布到 Unity Catalog,你必须使用 event_log 表值函数 (TVF) 来提取管道的事件日志。 通过将管道 ID 或表名称传递给 TVF 来检索管道的事件日志。 例如,若要检索 ID 为 04c78631-3dd7-4856-b2a6-7d84e9b2638b 的管道的事件日志记录,请执行以下命令:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

若要检索创建或拥有表 my_catalog.my_schema.table1 的管道的事件日志记录,请执行以下命令:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

若要调用 TVF,必须使用共享群集或 SQL 仓库。 例如,可以使用连接到共享群集的笔记本,也可以使用连接到 SQL 仓库的 SQL 编辑器

为了简化对管道事件的查询,管道所有者可以通过 event_log TVF 创建视图。 以下示例创建管道事件日志的视图。 此视图用于本文中包含的示例事件日志查询。

注意

event_log TVF 只能由管道所有者调用,而通过 event_log TVF 创建的视图只能由管道所有者查询。 无法与其他用户共享视图。

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

<pipeline-ID> 替换为 Delta Live Tables 管道的唯一标识符。 可以在 Delta Live Tables UI 的“管道详细信息”面板中找到该 ID。

管道运行的每个实例称为更新。 你通常需要提取有关最近更新的信息。 运行以下查询以查找最新更新的标识符,并将其保存在 latest_update_id 临时视图中。 此视图用于本文中包含的示例事件日志查询:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

从事件日志查询世系信息

包含有关世系的信息的事件具有事件类型 flow_definitiondetails:flow_definition 对象包含用于定义图中每种关系的 output_datasetinput_datasets

可以使用以下查询提取输入和输出数据集以查看世系信息:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
1 customers null
2 sales_orders_raw null
3 sales_orders_cleaned ["customers", "sales_orders_raw"]
4 sales_order_in_la ["sales_orders_cleaned"]

从事件日志查询数据质量

如果你针对管道中的数据集定义了期望,则数据质量指标将存储在 details:flow_progress.data_quality.expectations 对象中。 包含有关数据质量的信息的事件具有事件类型 flow_progress。 以下示例查询最后一个管道更新的数据质量指标:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
1 sales_orders_cleaned valid_order_number 4083 0

通过查询事件日志监视数据积压

增量实时表会跟踪 details:flow_progress.metrics.backlog_bytes 对象中积压的数据量。 包含积压指标的事件具有事件类型 flow_progress。 以下示例查询最后一个管道更新的积压工作 (backlog) 指标:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

注意

积压工作指标可能不可用,具体取决于管道的数据源类型和 Databricks Runtime 版本。

从事件日志监视增强型自动缩放事件

如果管道中启用了增强的自动缩放,事件日志会捕获群集重设大小。 包含有关增强的自动缩放的信息的事件具有事件类型 autoscale。 群集重设大小请求信息存储在 details:autoscale 对象中。 以下示例查询上次管道更新的增强型自动缩放群集重设大小请求:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

监视计算资源利用率

cluster_resources 事件提供有关群集中任务槽数、这些任务槽的使用量,以及等待计划的任务数的指标。

启用增强型自动缩放后,cluster_resources 事件还包含自动缩放算法的指标,包括 latest_requested_num_executorsoptimal_num_executors。 这些事件还会将算法的状态显示为不同的状态,例如 CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION。 可以结合自动缩放事件一起查看此信息,以全面了解增强型自动缩放的情况。

以下示例查询上次管道更新的任务队列大小历史记录:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

以下示例查询上次管道更新的使用历史记录:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

以下示例查询执行程序计数历史记录,以及仅适用于增强型自动缩放管道的指标,包括算法在最新请求中请求的执行程序数量、算法根据最近指标建议的最佳执行程序数量,以及自动缩放算法状态:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

审核增量实时表管道

可以使用增量实时表事件日志记录和其他 Azure Databricks 审核日志来全面了解数据在增量实时表中的更新方式。

增量实时表使用管道所有者的凭据来运行更新。 你可以更改通过更新管道所有者使用的凭据。 增量实时表记录用户对管道执行的操作,包括管道创建、配置编辑和触发更新。

有关 Unity Catalog 审核事件的参考,请参阅 Unity Catalog 事件

在事件日志中查询用户操作

可以使用事件日志来审核事件,例如用户操作。 包含有关用户操作的信息的事件具有事件类型 user_action

有关操作的信息存储在 details 字段的 user_action 对象中。 请使用以下查询来构造用户事件的审核日志。 若要创建此查询中使用的 event_log_raw 视图,请参阅查询事件日志

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
1 2021-05-20T19:36:03.517+0000 START user@company.com
2 2021-05-20T19:35:59.913+0000 CREATE user@company.com
3 2021-05-27T00:35:51.971+0000 START user@company.com

运行时信息

可以查看管道更新的运行时信息,例如更新的 Databricks Runtime 版本:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
1 11.0