Lakeflow 声明性流水线事件日志

Lakeflow 声明性管道事件日志包含与管道相关的所有信息,包括审核日志、数据质量检查、管道进度和数据世系。 可以使用事件日志来跟踪、了解和监视数据管道的状态。

可以在 Lakeflow 声明性管道 用户界面、Lakeflow 声明性管道 API 中或通过直接查询事件日志来查看事件日志条目。 此部分重点介绍如何直接查询事件日志。

还可以定义在记录事件时要运行的自定义操作,例如,使用事件挂钩发送警报。

重要

请勿删除事件日志或发布事件日志的父目录或架构。 删除事件日志可能会导致管道在将来的运行过程中无法更新。

有关事件日志架构的完整详细信息,请参阅 Lakeflow 声明性管道事件日志架构

查询事件日志

注释

本部分介绍使用 Unity 目录和默认发布模式配置的管道使用事件日志的默认行为和语法。

默认情况下,Lakeflow 声明性管道会将事件日志写入为管道配置的默认目录和架构中的隐藏 Delta 表。 隐藏时,该表仍可由所有足够特权的用户查询。 默认情况下,只有管道的所有者才能查询事件日志表。

若要以所有者身份查询事件日志,请使用管道 ID:

SELECT * FROM event_log(<pipelineId>);

默认情况下,隐藏事件日志的名称的格式为 event_log_{pipeline_id},其中管道 ID 是系统分配的 UUID,用短划线替换下划线。

可以通过编辑管道 的高级设置 来发布事件日志,然后选择“ 将事件日志发布到元存储”。 有关详细信息,请参阅 事件日志的管道设置。 发布事件日志时,可以指定事件日志的名称,并且可以选择性地指定目录和架构,如以下示例所示:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

事件日志位置还充当管道中任何自动加载程序查询的架构位置。 Databricks 建议在修改特权之前对事件日志表创建视图,因为一些计算设置可能允许用户直接共享事件日志表时访问架构元数据。 以下示例语法在事件日志表上创建视图,并在本文包含的示例事件日志查询中使用。 将 `<catalog_name>.<schema_name>.<event_log_table_name>` 替换为管道事件日志的完全限定表名称。 如果已发布事件日志,请使用发布时指定的名称。 否则,请在管道 ID 所在的位置使用 event_log(<pipelineId>),其中 pipelineId 是您要查询的管道。

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

在 Unity Catalog 中,视图支持流式查询。 以下示例使用结构化流式处理查询在事件日志表顶部定义的视图:

df = spark.readStream.table("event_log_raw")

管道的所有者可以通过切换 Publish event log to metastore 管道配置 “高级 ”部分中的选项,将事件日志发布为公共 Delta 表。 可以选择为事件日志指定新的表名称、目录和架构。

基本查询示例

以下示例演示如何查询事件日志以获取有关管道的常规信息,以及帮助调试常见方案。

通过查询以前的更新来监视管道更新

以下示例查询管道的更新(或 运行),其中显示了更新 ID、状态、开始时间、完成时间和持续时间。 这为您提供了管道运行的概览。

假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

调试物化视图增量更新问题

此示例从管道的最新更新中查询所有工作流。 它显示了它们是否进行了增量更新,以及能帮助调试为何未进行增量刷新的其他相关规划信息。

假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

查询管道更新的成本

此示例演示如何查询管道的 DBU 使用情况,以及给定管道运行的用户。

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

高级查询

以下示例演示如何查询事件日志以处理不太常见的或更高级的方案。

查询管道中所有流的指标

此示例演示如何查询管道中每个流的详细信息。 它显示流名称、更新时长、数据质量指标以及有关已处理的行的信息(输出行、已删除、更新插入记录和已丢弃的记录)。

假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

查询数据质量或期望值指标

如果在管道中定义数据集的期望,则符合和不符合期望的记录数指标将存储在 details:flow_progress.data_quality.expectations 对象中。 已删除记录数的指标存储在对象中 details:flow_progress.data_quality 。 包含有关数据质量的信息的事件具有事件类型 flow_progress

某些数据集可能无法使用数据质量指标。 查看 预期限制

提供以下数据质量指标:

指标 DESCRIPTION
dropped_records 由于记录未能达到一个或多个预期而被删除的记录数。
passed_records 通过预期条件的记录数。
failed_records 不符合预期条件的记录数。

以下示例查询上次管道更新的数据质量指标。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

查询世系信息

包含有关世系的信息的事件具有事件类型 flow_definitiondetails:flow_definition 对象包含用于定义图中每种关系的 output_datasetinput_datasets

使用以下查询提取输入和输出数据集以查看世系信息。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

使用自动加载程序监视云文件引入

当自动加载程序处理文件时,Lakeflow 声明性管道将生成事件。 对于自动加载程序事件,event_typeoperation_progressdetails:operation_progress:type 既可以是 AUTO_LOADER_LISTING 也可以是 AUTO_LOADER_BACKFILL。 对象 details:operation_progress 还包括 statusduration_ms字段 auto_loader_details:source_pathauto_loader_details:num_files_listed 字段。

以下示例查询自动加载程序事件以获取最新更新。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

监视数据积压以优化流媒体处理时间

Lakeflow 声明性管道跟踪 details:flow_progress.metrics.backlog_bytes 对象中积压工作的数据量。 包含积压指标的事件具有事件类型 flow_progress。 以下示例查询最近一次流水线更新的积压量指标。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

注释

积压工作指标可能不可用,具体取决于管道的数据源类型和 Databricks Runtime 版本。

监视自动缩放事件以优化经典计算

对于使用经典计算(换句话说,不使用无服务器计算)的 Lakeflow 声明性管道,事件日志会在管道中启用增强的自动缩放时捕获群集大小。 包含有关增强型自动缩放的信息的事件具有事件类型 autoscale。 群集重设大小请求信息存储在 details:autoscale 对象中。

以下示例查询了上次管道更新中增强的自动伸缩群集调整大小请求。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

监视经典计算的计算资源利用率

cluster_resources 事件提供有关集群中任务槽的数量、这些任务槽的使用率以及有多少任务在等待调度的指标。

启用增强型自动缩放后,cluster_resources 事件还包含自动缩放算法的指标,包括 latest_requested_num_executorsoptimal_num_executors。 这些事件还会将算法的状态显示为不同的状态,例如 CLUSTER_AT_DESIRED_SIZESCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSBLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION。 可以结合自动缩放事件一起查看此信息,以全面了解增强型自动缩放的情况。

以下示例查询任务队列大小历史记录、利用率历史记录、执行程序计数历史记录,以及上次管道更新中自动缩放的其他指标和状态。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

审计 Lakeflow 声明性管道

可以使用 Lakeflow 声明性管道事件日志和其他 Azure Databricks 审核日志来全面了解如何在 Lakeflow 声明性管道中更新数据。

Lakeflow 声明式流水线使用管道所有者的认证信息来执行更新。 你可以更改通过更新管道所有者使用的凭据。 Lakeflow 声明性管道记录用户对管道操作的行为,包括管道创建、配置编辑和更新触发。

有关 Unity Catalog 审核事件的参考,请参阅 Unity Catalog 事件

在事件日志中查询用户操作

可以使用事件日志来审核事件,例如用户操作。 包含有关用户操作的信息的事件具有事件类型 user_action

有关操作的信息存储在 user_action 字段的 details 对象中。 请使用以下查询来构造用户事件的审核日志。 这假定你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

运行时信息

可以查看管道更新的运行时信息,例如更新的 Databricks Runtime 版本,假设你已为感兴趣的管道创建了 event_log_raw 视图,如 查询事件日志中所述。

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0