利用期望来应用质量约束,从而在数据流经 ETL 管道时验证数据。 通过预期可以更深入地了解数据质量指标,并在检测到无效记录时拒绝更新或删除记录。
本文概述了预期,包括语法示例和行为选项。 有关更高级用例和建议的最佳做法,请参阅 期望建议和高级模式。
什么是期望?
预期是管道具体化视图、流式处理表或视图创建语句中的可选子句,用于对通过查询传递的每个记录应用数据质量检查。 期望使用标准 SQL 布尔语句来指定约束。 可以将单个数据集的多个期望组合在一起,并在管道中的所有数据集声明中设置预期。
以下部分介绍了期望的三个组成部分,并提供语法示例。
预期名称
每个期望都必须有一个名称,该名称用作标识符来跟踪和监视预期。 选择一个用于传达正在验证的指标的名称。 以下示例定义预期 valid_customer_age
确认年龄在 0 到 120 岁之间:
重要
预期名称对于给定数据集必须是唯一的。 可以在管道中的多个数据集中重复使用预期。 请参阅可移植且可重用的预期。
Python语言
@dlt.table
@dlt.expect("valid_customer_age", "age BETWEEN 0 AND 120")
def customers():
return spark.readStream.table("datasets.samples.raw_customers")
SQL
CREATE OR REFRESH STREAMING TABLE customers(
CONSTRAINT valid_customer_age EXPECT (age BETWEEN 0 AND 120)
) AS SELECT * FROM STREAM(datasets.samples.raw_customers);
要评估的约束
约束子句是一个 SQL 条件语句,每个记录的计算结果必须为 true 或 false。 该约束包含正在验证的内容的实际逻辑。 当一条记录未通过此条件时,将触发预期条件。
约束必须使用有效的 SQL 语法,并且不能包含以下项:
- 自定义 Python 函数
- 外部服务调用
- 引用其他表的子查询
下面是可添加到数据集创建语句的约束示例:
Python语言
# Simple constraint
@dlt.expect("non_negative_price", "price >= 0")
# SQL functions
@dlt.expect("valid_date", "year(transaction_date) >= 2020")
# CASE statements
@dlt.expect("valid_order_status", """
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
""")
# Multiple constraints
@dlt.expect("non_negative_price", "price >= 0")
@dlt.expect("valid_purchase_date", "date <= current_date()")
# Complex business logic
@dlt.expect(
"valid_subscription_dates",
"""start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'"""
)
# Complex boolean logic
@dlt.expect("valid_order_state", """
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
""")
SQL
-- Simple constraint
CONSTRAINT non_negative_price EXPECT (price >= 0)
-- SQL functions
CONSTRAINT valid_date EXPECT (year(transaction_date) >= 2020)
-- CASE statements
CONSTRAINT valid_order_status EXPECT (
CASE
WHEN type = 'ORDER' THEN status IN ('PENDING', 'COMPLETED', 'CANCELLED')
WHEN type = 'REFUND' THEN status IN ('PENDING', 'APPROVED', 'REJECTED')
ELSE false
END
)
-- Multiple constraints
CONSTRAINT non_negative_price EXPECT (price >= 0)
CONSTRAINT valid_purchase_date EXPECT (date <= current_date())
-- Complex business logic
CONSTRAINT valid_subscription_dates EXPECT (
start_date <= end_date
AND end_date <= current_date()
AND start_date >= '2020-01-01'
)
-- Complex boolean logic
CONSTRAINT valid_order_state EXPECT (
(status = 'ACTIVE' AND balance > 0)
OR (status = 'PENDING' AND created_date > current_date() - INTERVAL 7 DAYS)
)
对无效记录执行的操作
必须指定一个操作来确定记录未能通过验证检查时会发生什么情况。 下表描述了可用操作:
行动 | SQL 语法 | Python 语法 | 结果 |
---|---|---|---|
警告(默认值) | EXPECT |
dlt.expect |
将无效记录写入目标。 将有效记录和无效记录计数与其他数据集指标一起记录。 |
drop | EXPECT ... ON VIOLATION DROP ROW |
dlt.expect_or_drop |
在将数据写入目标之前,将删除无效记录。 已删除记录的计数与其他数据集指标一起记录。 |
失败 | EXPECT ... ON VIOLATION FAIL UPDATE |
dlt.expect_or_fail |
无效记录会阻止更新成功完成。 在重新处理之前需要手动干预。 此预期会导致单个流失败,并且不会导致管道中的其他流失败。 |
还可以运用高级逻辑来隔离无效记录,而不会失败或删除数据。 请参阅隔离无效记录。
期望值跟踪指标
可以从管道 UI 查看 warn
或 drop
操作的跟踪指标。 由于 fail
导致检测到无效记录时更新失败,因此不会记录指标。
若要查看预期指标,请完成以下步骤:
单击边栏中的“DLT”。
单击管道的名称。
请点击已定义期望的数据集。
在右侧栏中选择“数据质量”选项卡。
可以通过查询 DLT 事件日志来查看数据质量指标。 请参阅从事件日志查询数据质量。
保留无效记录
保留无效记录是预期的默认行为。 如果您希望保留违反期望的记录,同时收集关于有多少记录通过或未通过约束的指标,请使用 expect
运算符。 不符合预期的记录将与有效记录一起添加到目标数据集中:
Python语言
@dlt.expect("valid timestamp", "timestamp > '2012-01-01'")
SQL
CONSTRAINT valid_timestamp EXPECT (timestamp > '2012-01-01')
删除无效记录
使用 expect_or_drop
运算符可防止进一步处理无效记录。 不符合预期的记录将从目标数据集中删除:
Python语言
@dlt.expect_or_drop("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
SQL
CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL) ON VIOLATION DROP ROW
出现无效记录时失败
当无效记录不可接受时,使用 expect_or_fail
运算符在记录未通过验证时立即停止执行。 如果这项操作是表更新,系统会自动回滚事务:
Python语言
@dlt.expect_or_fail("valid_count", "count > 0")
SQL
CONSTRAINT valid_count EXPECT (count > 0) ON VIOLATION FAIL UPDATE
重要
如果在管道中定义了多个并行流,则单个流的失败不会导致其他流失败。
通过预期排查更新失败的问题
当管道因预期违规而失败时,必须在重新运行管道之前修复管道代码,以正确处理无效数据。
配置为拒绝管道的预期会修改转换的 Spark 查询计划,以跟踪检测和报告违规所需的信息。 可以使用此信息来识别导致多次查询违规的输入记录。 DLT 提供专用错误消息来报告此类违规。 下面是预期冲突错误消息的示例:
[EXPECTATION_VIOLATION.VERBOSITY_ALL] Flow 'sensor-pipeline' failed to meet the expectation. Violated expectations: 'temperature_in_valid_range'. Input data: '{"id":"TEMP_001","temperature":-500,"timestamp_ms":"1710498600"}'. Output record: '{"sensor_id":"TEMP_001","temperature":-500,"change_time":"2024-03-15 10:30:00"}'. Missing input data: false
多项期望管理
注释
虽然 SQL 和 Python 都支持单个数据集中的多个预期,但只有 Python 允许对多个期望进行分组并指定集体作。
可以将多个期望组合在一起,并使用函数 expect_all
、expect_all_or_drop
和 expect_all_or_fail
指定集体操作。
这些修饰器接受 Python 字典作为参数,其中键是预期名称,值是预期约束。 可以在管道中的多个数据集中重复使用相同的预期集。 下面显示了每个 expect_all
Python 运算符的示例:
valid_pages = {"valid_count": "count > 0", "valid_current_page": "current_page_id IS NOT NULL AND current_page_title IS NOT NULL"}
@dlt.table
@dlt.expect_all(valid_pages)
def raw_data():
# Create a raw dataset
@dlt.table
@dlt.expect_all_or_drop(valid_pages)
def prepared_data():
# Create a cleaned and prepared dataset
@dlt.table
@dlt.expect_all_or_fail(valid_pages)
def customer_facing_data():
# Create cleaned and prepared to share the dataset
局限性
- 由于仅流式处理表和具体化视图支持预期,因此仅支持这些对象类型的数据质量指标。
- 数据质量指标在以下情况下不可用:
- 查询上未定义任何期望值。
- 流使用不支持期望的运算符。
- 流类型(如 DLT 接收器)不支持预期。
- 给定流运行没有对关联的流表或具体化视图的更新。
- 管道配置不包含捕获指标所需的必要设置,例如
pipelines.metrics.flowTimeReporter.enabled
。
- 在某些情况下,
COMPLETED
流可能不包含指标。 相反,指标在状态为flow_progress
的RUNNING
事件的每个微批处理中报告。