使用批处理或流式处理清理和验证数据
清理和验证数据对于确保湖屋中的数据资产质量至关重要。 本文概述了专门用于提高数据质量的 Azure Databricks 产品/服务,并提供了关于定义业务逻辑以实现自定义规则的建议。
Delta Lake 提供在写入时强制实施架构和约束检查的语义,从而保证湖屋中表的数据质量。
架构强制确保写入表的数据符合预定义的架构。 架构验证规则因操作而异。 请参阅架构强制。
为了处理架构演变,Delta 提供了用于进行架构更改和演变表的机制。 请务必仔细考虑何时使用架构演变以避免字段丢失或管道失败。 有关手动或自动更新架构的详细信息,请参阅更新 Delta Lake 表架构。
约束可以采用信息性主键和外键约束或强制约束的形式。 请参阅 ADD CONSTRAINT 子句。
Azure Databricks 上的表约束是强制约束或信息性约束。
强制约束包括 NOT NULL
和 CHECK
约束。
信息性约束包括主键和外键约束。
可以在 Delta 表上强制执行 NOT NULL。 仅当列中现有记录都不为 null 时,才可以在现有表上启用 NOT NULL,并阻止将具有 null 值的新记录插入到表中。
正则表达式 (regex) 可用于在数据字段中强制实施预期模式。 这在处理需要遵守特定格式或模式的文本数据时特别有用。
若要使用 regex 强制实施模式,可以使用 SQL 中的 REGEXP
或 RLIKE
函数。 这些函数允许将数据字段与指定的正则表达式模式匹配。
下面举例说明如何在 SQL 中使用 CHECK
约束和正则表达式来执行模式:
CREATE TABLE table_name (
column_name STRING CHECK (column_name REGEXP '^[A-Za-z0-9]+$')
);
约束可用于对表中的列强制实施值范围。 这可确保只允许插入或更新指定范围内的有效值。
若要强制实施值范围约束,可以使用 SQL 中的 CHECK
约束。 使用 CHECK
约束可以定义表中每一行必须为 true 的条件。
下面举例说明如何使用 CHECK
约束在列上执行值范围:
CREATE TABLE table_name (
column_name INT CHECK (column_name >= 0 AND column_name <= 100)
);
增量实时表允许在声明具体化视图或流式处理表时定义期望。 可以选择配置期望来警告你有关违规、删除违规记录或由于违规导致工作负载失败的情况。 请参阅使用 Delta Live Tables 管理数据质量。
Azure Databricks 提供数据质量监视服务,使你可以监视帐户中所有表中数据的统计属性和质量。
在表中插入或更新数据时,Azure Databricks 在可以安全地强制转换数据类型时将执行此操作,而不会丢失信息。
有关强制转换行为的详细信息,请参阅以下文章:
可以使用筛选器和 WHERE
子句来定义自定义逻辑,以隔离错误记录并阻止它们传播到下游表。 CASE WHEN ... OTHERWISE
子句允许你定义条件逻辑,以便以可预测的方式将业务逻辑正常地应用于不符合期望的记录。
DECLARE current_time = now()
INSERT INTO silver_table
SELECT * FROM bronze_table
WHERE event_timestamp <= current_time AND quantity >= 0;
INSERT INTO quarantine_table
SELECT * FROM bronze_table
WHERE event_timestamp > current_time OR quantity < 0;
备注
Databricks 建议始终将筛选后的数据作为单独的写入操作进行处理,尤其是在使用结构化流式处理时。 使用 .foreachBatch
写入多个表可能会导致结果不一致。
例如,上游系统可能无法对 NULL
值进行编码,因此使用占位符值 -1
来表示缺失的数据。 无需为 Azure Databricks 中的所有下游查询编写自定义逻辑来忽略包含 -1
的记录,而是可以使用 case when 语句动态替换这些记录作为转换。
INSERT INTO silver_table
SELECT
* EXCEPT weight,
CASE
WHEN weight = -1 THEN NULL
ELSE weight
END AS weight
FROM bronze_table;