Delta Lake 允许你更新表的架构。 支持下列类型的更改:
- 添加新列(在任意位置)
- 重新排列现有列
- 重命名现有列
你可以使用 DDL 显式地或使用 DML 隐式地进行这些更改。
重要
对 Delta 表架构的更新是与所有并发 Delta 写入操作冲突的操作。
更新 Delta 表架构时,从该表进行读取的流会终止。 如果你希望流继续进行,必须重启它。 有关建议的方法,请参阅结构化流式处理的生产注意事项。
显式更新架构以添加列
ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
默认情况下,为 Null 性为 true。
若要将列添加到嵌套字段,请使用:
ALTER TABLE table_name ADD COLUMNS (col_name.nested_col_name data_type [COMMENT col_comment] [FIRST|AFTER colA_name], ...)
例如,如果运行 ALTER TABLE boxes ADD COLUMNS (colB.nested STRING AFTER field1) 之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colA
| - colB
| +-field1
| +-nested
| +-field2
注意
仅支持为结构添加嵌套列。 不支持数组和映射。
显式更新架构以更改列注释或排序
ALTER TABLE table_name ALTER [COLUMN] col_name (COMMENT col_comment | FIRST | AFTER colA_name)
若要更改嵌套字段中的列,请使用:
ALTER TABLE table_name ALTER [COLUMN] col_name.nested_col_name (COMMENT col_comment | FIRST | AFTER colA_name)
例如,如果运行 ALTER TABLE boxes ALTER COLUMN colB.field2 FIRST 之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colA
| - colB
| +-field2
| +-field1
显式更新架构以替换列
ALTER TABLE table_name REPLACE COLUMNS (col_name1 col_type1 [COMMENT col_comment1], ...)
例如,运行以下 DDL 时:
ALTER TABLE boxes REPLACE COLUMNS (colC STRING, colB STRUCT<field2:STRING, nested:STRING, field1:STRING>, colA STRING)
如果运行之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colC
| - colB
| +-field2
| +-nested
| +-field1
| - colA
显式更新架构以重命名列
注意
此功能在 Databricks Runtime 10.4 LTS 和更高版本中可用。
若要重命名列且不重写任何列的现有数据,必须为表启用列映射。 请参阅使用 Delta Lake 列映射重命名和删除列。
重命名列:
ALTER TABLE table_name RENAME COLUMN old_col_name TO new_col_name
重命名嵌套字段:
ALTER TABLE table_name RENAME COLUMN col_name.old_nested_field TO new_nested_field
例如,运行以下命令时:
ALTER TABLE boxes RENAME COLUMN colB.field1 TO field001
如果运行之前的架构为:
- root
| - colA
| - colB
| +-field1
| +-field2
则运行之后的架构为:
- root
| - colA
| - colB
| +-field001
| +-field2
显式更新架构以删除列
注意
此功能在 Databricks Runtime 11.3 LTS 和更高版本中可用。
要在不重写任何数据文件的情况下以仅元数据操作删除列,必须为表启用列映射。 请参阅使用 Delta Lake 列映射重命名和删除列。
重要
从元数据中删除列不会删除文件中该列的基础数据。 要清除已删除的列数据,可使用 REORG TABLE 重写文件。 然后可使用 VACUUM 以物理方式删除其中包含已删除的列数据的文件。
删除列:
ALTER TABLE table_name DROP COLUMN col_name
删除多列:
ALTER TABLE table_name DROP COLUMNS (col_name_1, col_name_2)
显式更新架构以更改列类型或名称
可以通过重写表来更改列的类型或名称或者删除列。 为此,请使用 overwriteSchema 选项。
以下示例演示如何更改列类型:
(spark.read.table(...)
  .withColumn("birthDate", col("birthDate").cast("date"))
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)
以下示例演示如何更改列名称:
(spark.read.table(...)
  .withColumnRenamed("dateOfBirth", "birthDate")
  .write
  .mode("overwrite")
  .option("overwriteSchema", "true")
  .saveAsTable(...)
)
启用架构演变
可以通过执行以下操作之一来启用架构演变:
- 将 .option("mergeSchema", "true")设置为 Spark DataFramewrite或writeStream操作。 请参阅为写入操作启用架构演变以添加新列。
- 使用 MERGE WITH SCHEMA EVOLUTION语法。 请参阅“合并的架构演变语法”。
- 将当前 SparkSession 的 Spark conf spark.databricks.delta.schema.autoMerge.enabled设置为true。
Databricks 建议为每个写入操作启用架构演变,而不是设置 Spark conf。
使用选项或语法在写入操作中启用架构演变时,这优先于 Spark conf。
注意
              INSERT INTO 语句没有架构演变子句。
为写入操作启用架构演变以添加新列
启用架构演变时,源查询中存在但目标表中缺少的列会自动添加为写入事务的一部分。 请参阅启用架构演变。
追加新列时,会保留大小写。 新列将添加到表架构的末尾。 如果其他列位于结构中,它们将被追加到目标表中结构的末尾。
以下示例演示如何将 mergeSchema 选项与自动加载程序配合使用。 请参阅什么是自动加载程序?。
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .trigger(availableNow=True)
  .toTable("table_name")
)
以下示例演示如何将 mergeSchema 选项与批处理写入操作配合使用:
(spark.read
  .table(source_table)
  .write
  .option("mergeSchema", "true")
  .mode("append")
  .saveAsTable("table_name")
)
Delta Lake 合并的自动架构演变
通过架构演变,可以解决合并中目标表和源表之间的架构不匹配问题。 此功能可处理以下两种情况:
- 源表中存在列,但不存在目标表。 - 该列将添加到目标架构,并且将从源中的相应列填充其值。 - 这仅适用于合并源中的列名称和结构与目标分配完全匹配时。 
- 必须直接从源分配新列,而无需表达式、重命名或转换。 
 - 这些示例允许架构演变: - UPDATE SET target.newcol = source.newcol -- The column newcol will be added to target. UPDATE SET target.somestruct.newfield = source.somestruct.newfield -- The field newfield will be added in struct column somestruct in target. UPDATE SET * -- Any columns and nested fields in source that don't exist in target will be added to target.- 这些示例不会触发架构演变: - UPDATE SET target.newcol = source.someothercol UPDATE SET target.newcol = source.x + source.y UPDATE SET target.newcol = source.output.newcol
- 目标表中存在列,但不存在源表。 - 目标架构不会更改。 这些列: - 保持不变 - UPDATE SET *。
- 设置为 - NULL.- INSERT *.
- 如果在作子句中分配,仍可能显式修改。 
 - 例如: - UPDATE SET * -- The target columns that are not in the source are left unchanged. INSERT * -- The target columns that are not in the source are set to NULL. UPDATE SET target.onlyintarget = 5 -- The target column is explicitly updated. UPDATE SET target.onlyintarget = source.someothercol -- The target column is explicitly updated from some other source column.
必须手动启用自动架构演变。 请参阅启用架构演变。
注意
在 Databricks Runtime 12.2 LTS 及更高版本中,可以在插入或更新操作中按名称指定源表中的列和结构字段。 在 Databricks Runtime 11.3 LTS 及以下版本中,只有 INSERT * 或 UPDATE SET * 操作可用于通过合并进行架构演变。
在 Databricks Runtime 13.3 LTS 及更高版本中,可以将架构演变与嵌套在映射中的结构一起使用,例如 map<int, struct<a: int, b: int>>。
用于合并的架构演变语法
在 Databricks Runtime 15.4 LTS 及更高版本中,可以使用 SQL 或 Delta 表 API 在合并语句中指定架构演变:
SQL
MERGE WITH SCHEMA EVOLUTION INTO target
USING source
ON source.key = target.key
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE
Python
from delta.tables import *
(targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .whenNotMatchedBySourceDelete()
  .execute()
)
Scala(编程语言)
import io.delta.tables._
targetTable
  .merge(sourceDF, "source.key = target.key")
  .withSchemaEvolution()
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .whenNotMatchedBySource()
  .delete()
  .execute()
使用架构演变的合并操作示例
以下示例展示了在有架构演变和没有架构演变的情况下 merge 操作的效果。
| 列 | 查询(在 SQL 中) | 无架构演变的行为(默认值) | 有架构演变的行为 | 
|---|---|---|---|
| 目标列: key, value源列: key, value, new_value | MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED  THEN UPDATE SET *WHEN NOT MATCHED  THEN INSERT * | 表架构保持不变;仅更新/插入列 key、value。 | 表架构更改为 (key, value, new_value)。 使用源中的value和new_value更新具有匹配项的现有记录。 使用架构(key, value, new_value)插入新行。 | 
| 目标列: key, old_value源列: key, new_value | MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED  THEN UPDATE SET *WHEN NOT MATCHED  THEN INSERT * | UPDATE和INSERT操作会引发错误,因为目标列old_value不在源中。 | 表架构更改为 (key, old_value, new_value)。 使用源中的new_value更新具有匹配项的现有记录,而old_value保持不变。 使用key的指定new_value、NULL和old_value插入新记录。 | 
| 目标列: key, old_value源列: key, new_value | MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN MATCHED  THEN UPDATE SET new_value = s.new_value | UPDATE引发错误,因为目标表中不存在列new_value。 | 表架构更改为 (key, old_value, new_value)。 使用源中的new_value更新具有匹配项的现有记录,old_value保持不变,不匹配的记录包含针对NULL输入的new_value。 参阅注释 (1)。 | 
| 目标列: key, old_value源列: key, new_value | MERGE INTO target_table tUSING source_table sON t.key = s.keyWHEN NOT MATCHED  THEN INSERT (key, new_value) VALUES (s.key, s.new_value) | INSERT引发错误,因为目标表中不存在列new_value。 | 表架构更改为 (key, old_value, new_value)。 使用key的指定new_value、NULL和old_value插入新记录。 现有记录包含针对NULL输入的new_value,而old_value保持不变。 参阅注释 (1)。 | 
(1) 此行为在 Databricks Runtime 12.2 及更高版本中可用;Databricks Runtime 11.3 LTS 及以下版本在这种情况下会出错。
使用 Delta Lake 合并排除列
在 Databricks Runtime 12.2 LTS 及更高版本中,可以在合并条件中使用 EXCEPT 子句显式排除列。 
              EXCEPT 关键字的行为因是否启用架构演变而异。
禁用架构演变后,EXCEPT 关键字将应用于目标表中的列列表,并允许从 UPDATE 或 INSERT 操作中排除列。 排除的列设置为 null。
启用架构演变后,EXCEPT 关键字将应用于源表中的列列表,并允许从架构演变中排除列。 如果 EXCEPT 子句中列出了源中不存在的新列,则不会将其添加到目标架构中。 目标中已存在的排除列设置为 null。
以下示例演示了这些语法:
| 列 | 查询(在 SQL 中) | 无架构演变的行为(默认值) | 有架构演变的行为 | 
|---|---|---|---|
| 目标列: id, title, last_updated源列: id, title, review, last_updated | MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED  THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED  THEN INSERT * EXCEPT (last_updated) | 通过将 last_updated字段设置为当前日期来更新匹配的行。 使用id和title的值插入新行。 排除的字段last_updated设置为null。review字段被忽略,因为它不在目标中。 | 通过将 last_updated字段设置为当前日期来更新匹配的行。 架构已演变为添加review字段。 使用除last_updated设置为null外的所有源字段插入新行。 | 
| 目标列: id, title, last_updated源列: id, title, review, internal_count | MERGE INTO target tUSING source sON t.id = s.idWHEN MATCHED  THEN UPDATE SET last_updated = current_date()WHEN NOT MATCHED  THEN INSERT * EXCEPT (last_updated, internal_count) | INSERT引发错误,因为目标表中不存在列internal_count。 | 通过将 last_updated字段设置为当前日期来更新匹配的行。review字段将添加到目标表,但忽略该internal_count字段。 插入的新行已将last_updated设置为null。 | 
处理架构更新中的 NullType 列
由于 Parquet 不支持 NullType,因此在写入到 Delta 表时会从数据帧中删除 NullType 列,但仍会将其存储在架构中。 如果为该列接收到不同的数据类型,则 Delta Lake 会将该架构合并到新的数据类型。 如果 Delta Lake 接收到现有列的 NullType,则在写入过程中会保留旧架构并删除新列。
不支持流式处理中的 NullType。 由于在使用流式处理时必须设置架构,因此这应该非常罕见。 对于复杂类型(例如 NullType 和 ArrayType),也不会接受 MapType。
替换表架构
默认情况下,覆盖表中的数据不会覆盖架构。 在不使用 mode("overwrite") 的情况下使用 replaceWhere 来覆盖表时,你可能还希望覆盖写入的数据的架构。 你可以通过将 overwriteSchema 选项设置为 true 来替换表的架构和分区:
df.write.option("overwriteSchema", "true")
重要
使用动态分区覆盖时,不能将 overwriteSchema 指定为 true。