本文介绍如何使用 DLT 管道的内置监视和可观测性功能。 这些功能支持以下任务:
- 观察管道更新的进度和状态。 请参阅 UI 中提供了哪些管道详细信息?。
- 对管道事件发出警报,例如管道更新的成功或失败。 请参阅 为管道事件添加电子邮件通知。
- 查看 Apache Kafka 和自动加载程序(公共预览版)等流式处理源的指标。 请参阅查看流式处理指标。
- 提取有关管道更新的详细信息,例如数据世系、数据质量指标和资源使用状况。 请参阅什么是 DLT 事件日志?
- 定义在发生特定事件时要执行的自定义操作。 请参阅 使用事件挂钩定义 DLT 管道的自定义监视。
若要检查和诊断查询性能,请参阅 DLT 管道的访问查询历史记录。 该功能处于公开预览阶段。
为管道事件添加电子邮件通知
可在出现以下情况时配置一个或多个电子邮件地址来接收通知:
- 管道更新成功完成。
- 管道更新失败,出现可重试或不可重试错误。 选择此选项将接收有关所有管道故障的通知。
- 管道更新失败并出现不可重试(致命)错误。 选择此选项将仅在发生不可重试错误时接收通知。
- 单个数据流失败。
若要在创建或编辑管道时配置电子邮件通知,请执行以下操作:
单击“添加通知”。
输入单个或多个电子邮件地址以接收通知。
单击要发送到配置的电子邮件地址的每种通知类型的复选框。
单击“添加通知”。
在 UI 中查看管道
可以从工作区边栏中的 “管道 ”或 “工作流 ”选项中找到 DLT 管道。 这会打开“ 工作流 ”页到“ 作业和管道 ”选项卡,可在其中查看你有权访问的每个作业和管道的相关信息。 单击管道的名称以打开管道详细信息页。
如何使用作业和管道列表
若要查看有权访问的管道列表,请单击边栏中的 工作流 或
管道 。 工作流 UI 中的 “作业和管道 ”选项卡列出了有关所有可用作业和管道的信息,例如工作流的创建者、工作流的触发器(如果有)以及最后五次运行的结果。
若要更改列表中显示的列,请单击“ 并选择或取消选择列。
重要
统一的“作业和管道”列表现以公共预览版提供。 可以通过禁用作业和管道:统一管理、搜索和筛选来禁用该功能并返回默认体验。 有关详细信息,请参阅 “管理 Azure Databricks 预览版 ”。
可以在 “作业和管道 ”列表中筛选作业,如以下屏幕截图所示。
文本搜索: 名称和ID 字段支持关键字搜索。 若要搜索使用键和值创建的标记,可以按键、值或键和值进行搜索。 例如,对于具有键
department
和值finance
的标记,可以搜索department
或finance
查找匹配的作业。 若要按键和值进行搜索,请输入用冒号分隔的键和值(例如,department:finance
)。类型:按 作业、 管道或 全部筛选。 如果选择 “管道” ,还可以按 管道类型进行筛选,其中包括 ETL 和引入管道。
所有者:仅显示你拥有的作业。
收藏夹:显示已标记为收藏的作业。
标签:使用 标签。 若要按标记进行搜索,可以使用标记下拉菜单同时筛选最多五个标记,或者直接使用关键字搜索。
运行方式:最多按两
run as
个值进行筛选。
若要启动作业或管道,请单击 播放按钮。 若要停止作业或管道,请单击
按钮。 若要访问其他操作,请单击烤肉串菜单
。 例如,可以从该菜单中删除作业或管道,或访问管道的设置。
UI 中提供了哪些管道详细信息?
管道图在管道更新成功启动后立即显示。 箭头表示管道中数据集之间的依赖关系。 默认情况下,管道详细信息页显示表的最近更新,但你可以从下拉菜单中选择旧的更新。
详细信息包括管道 ID、源代码、计算成本、产品版本、为管道配置的通道。
要查看数据集的表格视图,请点击“列表”选项卡。列表视图允许查看管道中的所有数据集,这些数据集在表中表示为行,并且当管道 DAG 太大而无法在Graph视图中可视化时非常有用。 可以使用多个筛选器(例如,数据集名称、类型和状态)控制表中显示的数据集。 要切换回 DAG 可视化效果,请点击“Graph”。
“运行身份”用户是管道所有者,管道更新使用此用户的权限运行。 若要更改 run as
用户,请单击“权限”并更改管道所有者。
如何查看数据集详细信息?
单击管道图或数据集列表中的数据集会显示有关数据集的详细信息。 详细信息包括数据集架构、数据质量指标,以及定义数据集的源代码的链接。
看更新历史记录
若要查看管道更新的历史记录和状态,请单击顶部栏中的“更新历史记录”下拉菜单。
在下拉菜单中选择更新,以查看更新的图、详细信息和事件。 若要返回到最新更新,请单击“显示最新更新”。
查看流式处理指标
重要
DLT 的流式处理可观测性为公共预览版。
可以从 Spark 结构化流支持的数据源(如 Apache Kafka、Amazon Kinesis、自动加载器和 Delta 表)查看流式处理指标,这些指标适用于 DLT 管道中的每个流式处理流。 指标显示为 DLT UI 右窗格中的图表,包括积压工作秒、积压工作字节、积压工作记录和积压工作文件。 图表显示按分钟聚合的最大值,当将鼠标悬停在图表上时,工具提示会显示相应的最大值。 数据限制为从当前时间开始的最后 48 小时。
在 UI DLT Chart Icon图形视图中查看管道 DAG 时,管道中具有流处理指标的表显示 。 若要查看流式处理指标,请单击
,在右窗格中的 “流 ”选项卡中显示流式处理指标图表。 还可以应用筛选器来仅查看具有流式处理指标的表,方法是单击 列表,然后单击 “具有流式处理指标”。
每个流媒体源仅支持特定指标。 流式处理源不支持的指标在 UI 中无法查看。 下表显示了可用于所支持的流媒体源的度量标准:
来源 | 积压字节 | 积压记录 | 积压工作秒数 | 积压工作文件 |
---|---|---|---|---|
卡 夫 卡 | ✓ | ✓ | ||
动动力 | ✓ | ✓ | ||
三角洲 | ✓ | ✓ | ||
自动加载器 | ✓ | ✓ |
什么是 DLT 事件日志?
DLT 事件日志包含与管道相关的所有信息,包括审核日志、数据质量检查、管道进度和数据世系。 可以使用事件日志来跟踪、了解和监视数据管道的状态。
可以在 DLT 用户界面、DLT API 中或通过直接查询事件日志来查看事件日志条目。 此部分重点介绍如何直接查询事件日志。
还可以定义在记录事件时要运行的自定义操作,例如,使用事件挂钩发送警报。
重要
请勿删除事件日志或发布事件日志的父目录或架构。 删除事件日志可能会导致管道在将来的运行过程中无法更新。
事件日志架构
下表描述了事件日志架构。 其中一些字段包含需要分析才能执行某些查询的 JSON 数据,例如 details
字段。 Azure Databricks 支持使用 :
运算符分析 JSON 字段。 请参阅 :
(冒号)运算符。
领域 | DESCRIPTION |
---|---|
id |
事件日志记录的唯一标识符。 |
sequence |
一个 JSON 文档,其中包含用于标识事件和对事件排序的元数据。 |
origin |
一个 JSON 文档,其中包含云提供商、云提供商区域、user_id 、pipeline_id 或 pipeline_type 等事件源的元数据,用于显示管道的创建位置(DBSQL 或 WORKSPACE )。 |
timestamp |
记录事件的时间。 |
message |
描述事件的用户可读消息。 |
level |
事件类型,例如 INFO 、WARN 、ERROR 或 METRICS 。 |
maturity_level |
事件架构的稳定性。 可能的值为: |
error |
如果出现错误,则为描述错误的详细信息。 |
details |
一个 JSON 文档,其中包含事件的结构化详细信息。 这是用于分析事件的主要字段。 |
event_type |
事件类型。 |
查询事件日志
注释
本部分介绍使用 Unity 目录和默认发布模式配置的管道使用事件日志的默认行为和语法。
- 有关使用旧版发布模式的 Unity 目录管道的行为,请参阅 适用于 Unity 目录旧版发布模式管道的事件日志。
- 有关 Hive 元存储管道的行为和语法,请参阅使用 Hive 元存储管道的事件日志。
默认情况下,DLT 会将事件日志写入为管道配置的默认目录和架构中的隐藏 Delta 表。 隐藏时,该表仍可由所有足够特权的用户查询。 默认情况下,只有管道的所有者才能查询事件日志表。
默认情况下,隐藏事件日志的名称格式为event_log_{pipeline_id}
,其中管道 ID 是系统分配的 UUID,并将短划线替换为下划线。
可以与 JSON 配置交互以发布事件日志。 发布事件日志时,可以指定事件日志的名称,并且可以选择性地指定目录和架构,如以下示例所示:
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
事件日志位置还充当管道中任何自动加载程序查询的架构位置。 Databricks 建议在修改特权之前对事件日志表创建视图,因为一些计算设置可能允许用户直接共享事件日志表时访问架构元数据。 以下示例语法在事件日志表上创建视图,并在本文包含的示例事件日志查询中使用。
CREATE VIEW event_log_raw
AS SELECT * FROM catalog_name.schema_name.event_log_table_name;
管道运行的每个实例称为更新。 你通常需要提取有关最近更新的信息。 运行以下查询以查找最新更新的标识符,并将其保存在 latest_update
临时视图中。 此视图用于本文中包含的示例事件日志查询:
CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;
在 Unity Catalog 中,视图支持流式查询。 以下示例使用结构化流式处理查询在事件日志表顶部定义的视图:
df = spark.readStream.table("event_log_raw")
管道的所有者可以通过切换 Publish event log to metastore
管道配置 “高级 ”部分中的选项,将事件日志发布为公共 Delta 表。 可以选择为事件日志指定新的表名称、目录和架构。
从事件日志查询世系信息
包含有关世系的信息的事件具有事件类型 flow_definition
。
details:flow_definition
对象包含用于定义图中每种关系的 output_dataset
和 input_datasets
。
可以使用以下查询提取输入和输出数据集以查看世系信息:
SELECT
details:flow_definition.output_dataset as output_dataset,
details:flow_definition.input_datasets as input_dataset
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_definition'
AND
origin.update_id = latest_update.id
output_dataset |
input_datasets |
---|---|
customers |
null |
sales_orders_raw |
null |
sales_orders_cleaned |
["customers", "sales_orders_raw"] |
sales_order_in_la |
["sales_orders_cleaned"] |
从事件日志查询数据质量
如果您在管道中定义了数据集的期望,那么通过和未通过期望的记录数的指标将存储在 details:flow_progress.data_quality.expectations
对象中。 已删除记录数的指标存储在对象中 details:flow_progress.data_quality
。 包含有关数据质量的信息的事件具有事件类型 flow_progress
。
某些数据集可能无法使用数据质量指标。 查看 预期限制。
提供以下数据质量指标:
指标 | DESCRIPTION |
---|---|
dropped_records |
由于记录未能达到一个或多个预期而被删除的记录数。 |
passed_records |
通过预期条件的记录数。 |
failed_records |
不符合预期条件的记录数。 |
以下示例查询最后一个管道更新的数据质量指标:
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details :flow_progress :data_quality :expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name
dataset |
expectation |
passing_records |
failing_records |
---|---|---|---|
sales_orders_cleaned |
valid_order_number |
4083 | 0 |
从事件日志中查询自动加载程序事件
当自动加载程序处理文件时,DLT 将生成事件。 对于自动加载程序事件,event_type
是 operation_progress
且 details:operation_progress:type
既可以是 AUTO_LOADER_LISTING
也可以是 AUTO_LOADER_BACKFILL
。 对象 details:operation_progress
还包括 status
、 duration_ms
字段 auto_loader_details:source_path
和 auto_loader_details:num_files_listed
字段。
以下示例查询自动加载程序事件以获取最新更新:
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,
latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest.update_id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL')
通过查询事件日志监视数据积压
DLT 跟踪 details:flow_progress.metrics.backlog_bytes
对象中积压的数据量。 包含积压指标的事件具有事件类型 flow_progress
。 以下示例查询最后一个管道更新的积压工作 (backlog) 指标:
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id
注释
积压工作指标可能不可用,具体取决于管道的数据源类型和 Databricks Runtime 版本。
监视未启用无服务器功能的管道事件日志中的增强型自动缩放事件
对于不使用无服务器计算的 DLT 管道,事件日志会在管道中启用增强的自动缩放时捕获群集大小。 包含有关增强型自动缩放的信息的事件具有事件类型 autoscale
。 群集重设大小请求信息存储在 details:autoscale
对象中。 以下示例查询针对上次管道更新的增强型自动缩放群集重设大小请求:
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
监视计算资源利用率
cluster_resources
事件提供有关群集中任务槽数、这些任务槽的使用量,以及等待计划的任务数的指标。
启用增强型自动缩放后,cluster_resources
事件还包含自动缩放算法的指标,包括 latest_requested_num_executors
和 optimal_num_executors
。 这些事件还会将算法的状态显示为不同的状态,例如 CLUSTER_AT_DESIRED_SIZE
、SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS
和 BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION
。
可以结合自动缩放事件一起查看此信息,以全面了解增强型自动缩放的情况。
以下示例查询上次管道更新的任务队列大小历史记录:
SELECT
timestamp,
Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
以下示例查询上次管道更新的使用历史记录:
SELECT
timestamp,
Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
以下示例查询执行程序计数历史记录,以及仅适用于增强型自动缩放管道的指标,包括算法在最新请求中请求的执行程序数量、算法根据最近指标建议的最佳执行程序数量,以及自动缩放算法状态:
SELECT
timestamp,
Double(details :cluster_resources.num_executors) as current_executors,
Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id
审核 DLT 管道
可以使用 DLT 事件日志和其他 Azure Databricks 审核日志来全面了解如何在 DLT 中更新数据。
DLT 使用管道所有者的凭据来运行更新。 你可以更改通过更新管道所有者使用的凭据。 DLT 记录用户在管道上的操作,包括创建管道、编辑配置和触发更新。
有关 Unity Catalog 审核事件的参考,请参阅 Unity Catalog 事件。
在事件日志中查询用户操作
可以使用事件日志来审核事件,例如用户操作。 包含有关用户操作的信息的事件具有事件类型 user_action
。
有关操作的信息存储在 user_action
字段的 details
对象中。 请使用以下查询来构造用户事件的审核日志。 若要创建 event_log_raw
此查询中使用的视图,请参阅 查询事件日志。
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
---|---|---|
2021-05-20T19:36:03.517+0000 | START |
user@company.com |
2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
2021-05-27T00:35:51.971+0000 | START |
user@company.com |
运行时信息
可以查看管道更新的运行时信息,例如更新的 Databricks Runtime 版本:
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
---|
11.0 |