使用管道预期管理数据质量

利用期望来应用质量约束,从而在数据流经 ETL 管道时验证数据。 通过预期可以更深入地了解数据质量指标,并在检测到无效记录时拒绝更新或删除记录。

本文概述了预期,包括语法示例和行为选项。 有关更高级用例和建议的最佳做法,请参阅 期望建议和高级模式

DLT 期望流图

什么是期望?

预期是管道具体化视图、流式处理表或视图创建语句中的可选子句,用于对通过查询传递的每个记录应用数据质量检查。 期望使用标准 SQL 布尔语句来指定约束。 可以将单个数据集的多个期望组合在一起,并在管道中的所有数据集声明中设置预期。

以下部分介绍了期望的三个组成部分,并提供语法示例。

预期名称

每个期望都必须有一个名称,该名称用作标识符来跟踪和监视预期。 选择一个用于传达正在验证的指标的名称。 以下示例定义预期 valid_customer_age 确认年龄在 0 到 120 岁之间:

重要

预期名称对于给定数据集必须是唯一的。 可以在管道中的多个数据集中重复使用预期。 请参阅可移植且可重用的预期

Python语言

@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
  return spark.readStream.table("datasets.samples.raw_customers")

SQL

CREATE OR REFRESH STREAMING TABLE customers(
  CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);

要评估的约束

约束子句是一个 SQL 条件语句,每个记录的计算结果必须为 true 或 false。 该约束包含正在验证的内容的实际逻辑。 当一条记录未通过此条件时,将触发预期条件。

约束必须使用有效的 SQL 语法,并且不能包含以下项:

  • 自定义 Python 函数
  • 外部服务调用
  • 引用其他表的子查询

下面是可添加到数据集创建语句的约束示例:

Python语言

# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")

# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")

# CASE statements
@dlt.expect("valid_order_status", """
   CASE
     WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
     WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
     ELSE false
   END
""")

# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")

# Complex business logic
@dlt.expect(
  "valid_subscription_dates",
  """start_date <= end_date
    AND end_date <= current_date()
    AND start_date >= '2020-01-01'"""
)

# Complex boolean logic
@dlt.expect("valid_order_state", """
   (status = 'ACTIVE' AND balance > 0)
   OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")

SQL

-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)

-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)

-- CASE statements
CONSTRAINT valid_order_status EXPECT (
  CASE
    WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
    WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
    ELSE false
  END
)

-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())

-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
  start_date <= end_date
  AND end_date <= current_date()
  AND start_date >= '2020-01-01'
)

-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
  (status = 'ACTIVE' AND balance > 0)
  OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)

对无效记录执行的操作

必须指定一个操作来确定记录未能通过验证检查时会发生什么情况。 下表描述了可用操作:

行动 SQL 语法 Python 语法 结果
警告(默认值) EXPECT dlt.expect 将无效记录写入目标。 将有效记录和无效记录计数与其他数据集指标一起记录。
drop EXPECT ... ON VIOLATION DROP ROW dlt.expect_or_drop 在将数据写入目标之前,将删除无效记录。 已删除记录的计数与其他数据集指标一起记录。
失败 EXPECT ... ON VIOLATION FAIL UPDATE dlt.expect_or_fail 无效记录会阻止更新成功完成。 在重新处理之前需要手动干预。 此预期会导致单个流失败,并且不会导致管道中的其他流失败。

还可以运用高级逻辑来隔离无效记录,而不会失败或删除数据。 请参阅隔离无效记录

期望值跟踪指标

可以从管道 UI 查看 warndrop 操作的跟踪指标。 由于 fail 导致检测到无效记录时更新失败,因此不会记录指标。

若要查看预期指标,请完成以下步骤:

  1. 单击边栏中的“DLT”。

  2. 单击管道的名称

  3. 请点击已定义期望的数据集。

  4. 在右侧栏中选择“数据质量”选项卡

可以通过查询 DLT 事件日志来查看数据质量指标。 请参阅从事件日志查询数据质量

保留无效记录

保留无效记录是预期的默认行为。 如果您希望保留违反期望的记录,同时收集关于有多少记录通过或未通过约束的指标,请使用 expect 运算符。 不符合预期的记录将与有效记录一起添加到目标数据集中:

Python语言

@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")

SQL

CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')

删除无效记录

使用 expect_or_drop 运算符可防止进一步处理无效记录。 不符合预期的记录将从目标数据集中删除:

Python语言

@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")

SQL

CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW

出现无效记录时失败

当无效记录不可接受时,使用 expect_or_fail 运算符在记录未通过验证时立即停止执行。 如果这项操作是表更新,系统会自动回滚事务:

Python语言

@dlt.expect_or_fail("valid_count", "count > 0")

SQL

CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE

重要

如果在管道中定义了多个并行流,则单个流的失败不会导致其他流失败。

DLT 流故障说明图

通过预期排查更新失败的问题

当管道因预期违规而失败时,必须在重新运行管道之前修复管道代码,以正确处理无效数据。

配置为拒绝管道的预期会修改转换的 Spark 查询计划,以跟踪检测和报告违规所需的信息。 可以使用此信息来识别导致多次查询违规的输入记录。 DLT 提供专用错误消息来报告此类违规。 下面是预期冲突错误消息的示例:

[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false

多项期望管理

注释

虽然 SQL 和 Python 都支持单个数据集中的多个预期,但只有 Python 允许对多个期望进行分组并指定集体作。

具有多个预期的 DLT 流图

可以将多个期望组合在一起,并使用函数 expect_allexpect_all_or_dropexpect_all_or_fail指定集体操作。

这些修饰器接受 Python 字典作为参数,其中键是预期名称,值是预期约束。 可以在管道中的多个数据集中重复使用相同的预期集。 下面显示了每个 expect_all Python 运算符的示例:

valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}

@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
  # Create a raw dataset

@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
  # Create a cleaned and prepared dataset

@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
  # Create cleaned and prepared to share the dataset

局限性

  • 由于仅流式处理表和具体化视图支持预期,因此仅支持这些对象类型的数据质量指标。
  • 数据质量指标在以下情况下不可用:
    • 查询上未定义任何期望值。
    • 流使用不支持期望的运算符。
    • 流类型(如 DLT 接收器)不支持预期。
    • 给定流运行没有对关联的流表或具体化视图的更新。
    • 管道配置不包含捕获指标所需的必要设置,例如 pipelines.metrics.flowTimeReporter.enabled
  • 在某些情况下, COMPLETED 流可能不包含指标。 相反,指标在状态为 flow_progressRUNNING 事件的每个微批处理中报告。