使用批处理或流式处理清理和验证数据

清理和验证数据对于确保湖屋中的数据资产质量至关重要。 本文概述了专门用于提高数据质量的 Azure Databricks 产品/服务,并提供了关于定义业务逻辑以实现自定义规则的建议。

Azure Databricks 上的架构强制

Delta Lake 提供在写入时强制实施架构和约束检查的语义,从而保证湖屋中表的数据质量。

架构强制确保写入表的数据符合预定义的架构。 架构验证规则因操作而异。 请参阅架构强制

为了处理架构演变,Delta 提供了用于进行架构更改和演变表的机制。 请务必仔细考虑何时使用架构演变以避免字段丢失或管道失败。 有关手动或自动更新架构的详细信息,请参阅更新 Delta Lake 表架构

表约束

约束可以采用信息性主键和外键约束或强制约束的形式。 请参阅 ADD CONSTRAINT 子句

Azure Databricks 上的表约束是强制约束或信息性约束。

强制约束包括 NOT NULLCHECK 约束。

信息性约束包括主键和外键约束。

请参阅 Azure Databricks 上的约束

处理 null 或缺失值

可以在 Delta 表上强制执行 NOT NULL。 仅当列中现有记录都不为 null 时,才可以在现有表上启用 NOT NULL,并阻止将具有 null 值的新记录插入到表中。

模式强制

正则表达式 (regex) 可用于在数据字段中强制实施预期模式。 这在处理需要遵守特定格式或模式的文本数据时特别有用。

若要使用 regex 强制实施模式,可以使用 SQL 中的 REGEXPRLIKE 函数。 这些函数允许将数据字段与指定的正则表达式模式匹配。

下面举例说明如何在 SQL 中使用 CHECK 约束和正则表达式来执行模式:

CREATE TABLE table_name (
  column_name STRING CHECK (column_name REGEXP '^[A-Za-z0-9]+$')
);

值强制

约束可用于对表中的列强制实施值范围。 这可确保只允许插入或更新指定范围内的有效值。

若要强制实施值范围约束,可以使用 SQL 中的 CHECK 约束。 使用 CHECK 约束可以定义表中每一行必须为 true 的条件。

下面举例说明如何使用 CHECK 约束在列上执行值范围:

CREATE TABLE table_name (
  column_name INT CHECK (column_name >= 0 AND column_name <= 100)
);

使用增量实时表定义和配置期望。

增量实时表允许在声明具体化视图或流式处理表时定义期望。 可以选择配置期望来警告你有关违规、删除违规记录或由于违规导致工作负载失败的情况。 请参阅使用 Delta Live Tables 管理数据质量

数据监视

Azure Databricks 提供数据质量监视服务,使你可以监视帐户中所有表中数据的统计属性和质量。

强制转换数据类型

在表中插入或更新数据时,Azure Databricks 在可以安全地强制转换数据类型时将执行此操作,而不会丢失信息。

有关强制转换行为的详细信息,请参阅以下文章:

自定义业务逻辑

可以使用筛选器和 WHERE 子句来定义自定义逻辑,以隔离错误记录并阻止它们传播到下游表。 CASE WHEN ... OTHERWISE 子句允许你定义条件逻辑,以便以可预测的方式将业务逻辑正常地应用于不符合期望的记录。

DECLARE current_time = now()

INSERT INTO silver_table
  SELECT * FROM bronze_table
  WHERE event_timestamp <= current_time AND quantity >= 0;

INSERT INTO quarantine_table
  SELECT * FROM bronze_table
  WHERE event_timestamp > current_time OR quantity < 0;

备注

Databricks 建议始终将筛选后的数据作为单独的写入操作进行处理,尤其是在使用结构化流式处理时。 使用 .foreachBatch 写入多个表可能会导致结果不一致。

例如,上游系统可能无法对 NULL 值进行编码,因此使用占位符值 -1 来表示缺失的数据。 无需为 Azure Databricks 中的所有下游查询编写自定义逻辑来忽略包含 -1 的记录,而是可以使用 case when 语句动态替换这些记录作为转换。

INSERT INTO silver_table
  SELECT
    * EXCEPT weight,
    CASE
      WHEN weight = -1 THEN NULL
      ELSE weight
    END AS weight
  FROM bronze_table;