Azure Databricks 中的架构演变

架构演变 是指系统能够适应一段时间内数据结构的变化。 在使用半结构化数据、事件流或第三方源时,这些更改很常见,尤其是当新字段被添加、数据类型发生转换或嵌套结构演变的时候。

常见更改包括:

  • 新列:以前未定义的其他字段,有时使用自定义回填值。
  • 列重命名:更改列名称,例如,从 name 中更改为 full_name
  • 已删除的列:从表架构中删除列。
  • 类型扩大:将列的类型更改为更广泛的类型。 例如,字段 INT 变为 DOUBLE.
  • 其他类型更改:更改列的类型。 例如,字段 INT 变为 STRING.

支持架构演变对于构建可复原且长时间运行的管道至关重要,这些管道可以适应不断变化的数据,而无需频繁的手动更新。

Components

Azure Databricks 架构演变涉及四个主要组件类别,每个组件分别处理架构更改:

  1. 连接器:从外部源引入数据的组件。 其中包括自动加载程序、Kafka、Kinesis 和 Lakeflow 连接器。
  2. 格式分析器:解码原始格式的函数,包括from_jsonfrom_avrofrom_xmlfrom_protobuf
  3. 引擎:处理执行查询的引擎,包括结构化流式处理。
  4. 数据集:流式处理表、具体化视图、增量表和保存和提供数据的视图。

架构演变

数据工程体系结构架构演变中的每个组件都是独立的。 你负责在各个组件中配置架构演变,以实现数据处理流中所需的行为。

例如,使用自动加载程序将数据引入 Delta 表时,有两个持久架构 - 一个由自动加载程序在其架构位置进行管理,另一个是目标 Delta 表的架构。 在稳定状态下,这两者是相同的。 当自动加载程序根据传入数据发展其架构时,Delta 表还必须改进其架构,否则查询将失败。 在这种情况下,可以(a)通过启用架构演变或使用直接 DDL 命令来更新目标 Delta 表架构,或(b)对目标 Delta 表进行完全重写。

连接器对模式演变的支持

以下部分详细介绍了每个 Azure Databricks 组件如何处理不同类型的架构更改。

自动加载器

自动加载程序支持列更改,但不支持类型更改。 使用 cloudFiles.schemaEvolutionModerescuedDataColumn. 配置自动架构演变。 可以手动设置 schemaHints 或不可变 schema。 自动演变架构时,流最初会失败。 重启时,将使用不断发展的架构。 请参阅自动加载程序架构演变的工作原理?

  • 新列:支持,具体取决于所选的schemaEvolutionMode。 失败,需要手动重启才能将新列添加到架构。
  • 列重命名:支持的,具体取决于所选的 schemaEvolutionMode 。 重命名的列被视为新增的列,旧列在新行中将填充NULL。 操作失败,需要手动重启以更新架构。
  • 已删除的列:受支持。 被视为软删除,即为已删除的列创建新行,并将其设置为NULL
  • 类型扩展:不受支持。 如果已设置rescuedDataColumn并将rescueDataColumn 设置为schemaEvolutionMode,则在rescue中捕获类型更改。 否则,需要手动更改架构。
  • 其他类型更改:不支持。 如果已设置rescuedDataColumn并将rescueDataColumn 设置为schemaEvolutionMode,则在rescue中捕获类型更改。 否则,需要手动更改架构。

Delta 连接器

Delta 连接器可以支持架构演变。 如果从启用了 列映射schemaTrackingLocation 的 Delta 表进行读取,则它支持列重命名、删除列和类型更改的架构演变。 您可以通过表属性type widening启用类型扩大。 必须为每个相应的更改设置正确的 Spark 配置,以改进架构,而无需停止流。 否则,流会在检测到更改时调整其跟踪的架构,然后停止。 然后,您必须手动重启流查询以恢复处理。

  • 新列:支持。 查询失败,必须重启流才能将新列添加到架构,但 Delta 表不需要重写。
  • 列重命名:受支持。 可以使用 Spark 配置 spark.databricks.delta.streaming.allowSourceColumnRename在流式处理查询中改进架构。
  • 已删除的列:受支持。 可以使用 Spark 配置 spark.databricks.delta.streaming.allowSourceColumnDrop在流式处理查询中改进架构。
  • 类型扩大:受支持。 可以使用 Spark 配置 spark.databricks.delta.streaming.allowSourceColumnTypeChange在流式处理查询中改进架构。
  • 其他类型更改:受支持,但需要重新编写 Delta 表。 可以在具有相同 Spark 配置的 spark.databricks.delta.streaming.allowSourceColumnTypeChange流式处理查询中改进架构。

SaaS 和 CDC 连接器

当列更改时,SaaS 和 CDC 连接器会自动演变架构。 检测到更改时,将通过自动重启来处理此问题。 类型更改需要完全刷新。

  • 新列:支持。 查询会自动重启以解决架构不匹配问题。
  • 列重命名:受支持。 查询会自动重启以解决架构不匹配问题。 重命名的列被视为添加的新列。
  • 已删除的列:受支持。 删除的列被视为软删除,其中已删除列的新行设置为 NULL
  • 类型扩展:不受支持。 更新架构需要完全刷新。
  • 其他类型更改:不支持。 更新架构需要完全刷新。

Kinesis、Kafka、Pub/Sub 和 Pulsar 连接器

不支持本地模式演变。 每个连接器函数返回一个二进制 BLOB。 架构演变由格式分析器处理。

  • 新列:由格式分析程序处理。
  • 列重命名:由格式分析器处理。
  • 已删除的列:由格式分析器处理。
  • 类型加宽:由格式分析器处理。
  • 其他类型更改:由格式分析器处理。

格式解析器对架构演变的支持

from_json 解析 器

分析 from_json 器不支持架构演变。 必须手动更新架构。 在 Lakeflow Spark 声明性管道中使用 from_json 时,可以使用 schemaLocationKeyschemaEvolutionMode启用自动架构演变。

  • 新列:启用自动架构演变后,其行为类似于自动加载程序。
  • 列重命名:启用自动架构演变后,其行为类似于自动加载程序。
  • 已删除的列:启用自动架构演变后,其行为类似于自动加载程序。
  • 类型扩大:启用自动架构演变后,其行为类似于自动加载程序。
  • 其他类型更改:启用自动架构演变后,其行为类似于自动加载程序。

from_avro 解析器和 from_protobuf 解析器

from_avrofrom_protobuf解析器的行为相同。 可以从 Confluent 架构注册表提取架构,或者用户可以提供架构,并且必须手动更新架构。 在from_avrofrom_protobuf函数中没有架构演变的概念;它必须由执行引擎和架构注册表来处理。

  • 新列:由 Confluent 架构注册表支持。 否则,用户必须手动更新架构。
  • 列重命名:Confluent 架构注册表支持。 否则,用户必须手动更新架构。
  • 已删除的列:Confluent 架构注册表支持。 否则,用户必须手动更新架构。
  • 类型扩展:受 Confluent Schema Registry 支持。 否则,用户必须手动更新架构。
  • 其他类型更改:Confluent 架构注册表支持。 否则,用户必须手动更新架构。

from_csv 解析器和 from_xml 解析器

from_csvfrom_xml解析器不支持模式演化。

  • 新列:不支持
  • 列重命名:不支持
  • 已删除的列:不支持
  • 类型拓宽:不支持
  • 其他类型更改:不支持

引擎的架构演变支持

结构化流式处理

流式处理查询的架构在规划阶段处于锁定状态,并且所有微批处理在未重新规划的情况下重复使用该计划。 如果源架构在执行中更改,则查询会失败,并且用户必须重启流式处理查询,以便 Spark 可以针对新架构重新规划。

流写入的数据集还必须支持架构演变。

  • 新列:支持。 查询失败,必须重启流才能解决架构不匹配问题。
  • 列重命名:受支持。 查询失败,必须重启流才能解决架构不匹配问题。
  • 已删除的列:受支持。 查询失败,必须重启流才能解决架构不匹配问题。
  • 类型扩大:受支持。 查询失败,必须重启流才能解决架构不匹配问题。
  • 其他类型更改:受支持。 查询失败,必须重启流才能解决架构不匹配问题。

按数据集进行的架构演变

流式处理表

默认情况下,流式表支持合并架构演变行为。 更新架构不需要手动重启,但任意架构更改需要完全刷新。

  • 新列:支持。 查询会自动重启以解析架构不匹配。
  • 列重命名:受支持。 查询将重启以解决架构不匹配问题。 重命名的列被视为添加的新列。
  • 已删除的列:受支持。 删除的列被视为软删除,其中已删除列的新行设置为 NULL。
  • 类型扩展:不受支持。 更新架构需要完全刷新。
  • 其他类型更改:不支持。 更新架构需要完全刷新。

具体化视图

对架构或定义查询的任何更新都触发具体化视图的完整重新计算。

  • 新列:已触发完全重新计算。
  • 列重命名:触发了完全重新计算。
  • 已删除的列:触发了完全重新计算。
  • 类型加宽:触发了完全重新计算。
  • 其他类型更改:已触发完全重新计算。

Delta 表

Delta 表支持各种配置来更新表架构,包括重命名、删除和扩大列类型,而无需重写表数据。 支持的配置包括 合并架构演变列映射类型扩大overwriteSchema

  • 新列:支持。 启用合并架构演变时自动演变,而无需重新编写 Delta 表。 如果未启用合并架构演变,更新将失败。
  • 列重命名:受支持。 可以通过启用了列映射的手动 ALTER TABLE DDL 命令进行重命名。 不需要 Delta 表重写。
  • 已删除的列:受支持。 可以通过启用了列映射的手动 ALTER TABLE DDL 命令删除列。 不需要 Delta 表重写。
  • 类型扩大:受支持。 启用类型扩大和合并架构演变时 ,会自动 处理某些类型。 启用类型宽化时,可以通过手动 ALTER TABLE DDL 命令来加宽列。 如果未配置其中任何一个,操作将失败。
  • 其他类型更改:受支持,但需要完全重写 Delta 表。 必须启用 overwriteSchema,才能完全重写 Delta 表。 否则,操作会失败。

Views

如果视图中包含与新架构不匹配的 column_list,或包含无法解析的查询,该视图将会变为无效。 如果不起作用,您可以使用 SCHEMA TYPE EVOLUTION 为类型更改启用架构演变,并使用 SCHEMA EVOLUTION 为类型更改、新列、重命名列和删除列启用架构演变(这是对类型演变的超集)。

  • 新列:支持。 使用 SCHEMA EVOLUTION 模式时,视图会自动演变,如果没有显式 column_list干预,则无需任何手动干预。 否则,视图可能变得无效,并且用户无法查询它。
  • 重命名列:支持。 使用 SCHEMA EVOLUTION 模式时,视图会自动演变,如果没有显式 column_list干预,则无需任何手动干预。 否则,视图可能变得无效。
  • 已删除的列:受支持。 使用 SCHEMA EVOLUTION 模式时,视图会自动演变,如果没有显式 column_list干预,则无需任何手动干预。 否则,视图可能变得无效。
  • 类型扩大:受支持。 使用 SCHEMA TYPE EVOLUTION 模式时,视图会自动适应各种类型的更改。 使用 SCHEMA EVOLUTION 模式时,视图会自动演变,如果没有显式 column_list干预,则无需任何手动干预。 否则,视图可能变得无效。
  • 其他类型更改:受支持。 使用 SCHEMA TYPE EVOLUTION 模式时,视图会自动适应各种类型的更改。 使用 SCHEMA EVOLUTION 模式时,视图会自动演变,如果没有显式 column_list干预,则无需任何手动干预。 否则,视图可能变得无效。

Example

以下示例演示如何使用在 Confluent 架构注册表中注册的 Avro 编码有效负载引入 Kafka 主题,并将其写入启用了架构演变的托管 Delta 表。

说明的要点:

  • 与 Kafka 连接器集成。
  • 在 Kafka 架构注册表中使用from_avro解码 Avro 记录。
  • 通过设置 avroSchemaEvolutionMode处理架构演变。
  • 写入已启用的 Delta 表 mergeSchema ,以允许累加更改。

该代码假定你有一个使用 Confluent 架构注册表的 Kafka 主题,并输出 Avro 编码的数据。

# ----- CONFIG: fill these in -----
# Catalog and schema:
CATALOG = "<catalog_name>"
SCHEMA = "<schema_name>"
# Schema Registry:
# (This is where the producer evolves the schema)
SCHEMA_REG = "<schema registry endpoint>"
SR_USER    = "<api key>"
SR_PASS    = "<api secret>"
# Confluent Cloud: SASL_SSL broker:
BOOTSTRAP = "<server:ip>"
# Kafka topic:
TOPIC = "<topic>"
# ----- end: config -----

BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.bronze_users"
CHECKPOINT = f"/Volumes/{CATALOG}/{SCHEMA}/checkpoints/bronze_users"

# Kafka auth (example for Confluent Cloud SASL/PLAIN over SSL)
KAFKA_OPTS = {
  "kafka.security.protocol": "SASL_SSL",
  "kafka.sasl.mechanism": "PLAIN",
  "kafka.sasl.jaas.config": f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{SR_USER}' password='{SR_PASS}';"
}

# ----- Evolution knobs -----
# spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", value = True)

from pyspark.sql.functions import col
from pyspark.sql.avro.functions import from_avro

# Build reader
reader = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", BOOTSTRAP)
  .option("subscribe", TOPIC)
  .option("startingOffsets", "earliest")
)

# Attach Kafka auth options
for k, v in KAFKA_OPTS.items():
    reader = reader.option(k, v)

# --- No native schema evolution supported. Returns a binary blob. ---
raw_df = reader.load()

# Decode Avro with Schema Registry
# --- The format parser handles updating the schema using the schema registry ---
decoded = from_avro(
    data=col("value"),
    jsonFormatSchema=None, # using SR
    subject=f"{TOPIC}-value",
    schemaRegistryAddress=SCHEMA_REG,
    options={
      "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info": f"{SR_USER}:{SR_PASS}",
      # Behavior on schema changes:
      "avroSchemaEvolutionMode": "restart", # fail-fast so you can restart and adopt new fields
      "mode": "FAILFAST"
    }
).alias("payload")

bronze_df = raw_df.select(decoded, "timestamp").select("payload.*", "timestamp")

# Write to a managed Delta table as a STREAM
# --- Need to enable schema evolution separately for streaming to a Delta separately with mergeSchema --
(bronze_df.writeStream
  .format("delta")
  .option("checkpointLocation", CHECKPOINT)
  .option("ignoreChanges", "true")
  .outputMode("append")
  .option("mergeSchema", "true") # only supports adding new columns. Renaming, dropping, and type changes need to be handled separately.
  .trigger(availableNow=True) # Use availableNow trigger for Databricks SQL/Unity Catalog
  .toTable(BRONZE_TABLE)
)