教程:Delta Lake

本教程介绍 Azure Databricks 上的常见 Delta Lake 操作,包括:

可以从附加到 Azure Databricks 群集笔记本中运行本文中的示例 Python、R、Scala 和 SQL 代码。 还可以从与 Databricks SQL 中的 SQL 仓库关联的查询中运行本文中的 SQL 代码。

注意

下面的一些代码示例使用由架构(也称为数据库)和表或视图(例如 default.people10m)组成的两级命名空间表示法。 若要将这些示例与 Unity Catalog 结合使用,请将两级命名空间替换为由目录、架构和表或视图(例如 main.default.people10m)组成的 Unity Catalog 三级命名空间表示法。

创建表

默认情况下,在 Azure Databricks 上创建的所有表都使用 Delta Lake。

注意

Delta Lake 是 Azure Databricks 所有读取、写入和表创建命令的默认值。

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

Scala

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

上述操作使用从数据中推理出的架构来创建新的托管表。 有关创建 Delta 表时的可用选项的信息,请参阅 CREATE TABLE

对于托管表,Azure Databricks 决定数据的位置。 若要获取位置,可以使用 DESCRIBE DETAIL 语句,例如:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

Scala

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

有时你可能希望通过指定架构来创建表,然后再插入数据。 你可以使用以下 SQL 命令完成此操作:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

在 Databricks Runtime 13.0 及更高版本中,可以使用 CREATE TABLE LIKE 创建一个新的空 Delta 表,该表会复制源 Delta 表的架构和表属性。 这在将表从开发环境提升到生产环境时特别有用,如以下代码示例所示:

CREATE TABLE prod.people10m LIKE dev.people10m

也可以使用 Delta Lake 中的 DeltaTableBuilder API 创建表。 与 DataFrameWriter API 相比,此 API 可以更轻松地指定其他信息,例如列注释、表属性和生成的列

重要

此功能目前以公共预览版提供。

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

Scala

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

在表中更新插入

若要将一组更新和插入合并到现有 Delta 表中,请使用 MERGE INTO 语句。 例如,下面的语句从源表中获取数据并将其合并到目标 Delta 表中。 如果两个表中有一个匹配行,Delta Lake 会使用给定的表达式更新数据列。 如果没有匹配行,Delta Lake 会添加一个新行。 此操作称为“upsert”。

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

如果指定 *,这将更新或插入目标表中的所有列。 此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。

执行 INSERT 操作时必须为表中的每个列指定一个值(例如,当现有数据集中没有匹配行时,必须这样做)。 但是,你不需要更新所有值。

若要查看结果,请查询表。

SELECT * FROM people_10m WHERE id >= 9999998

读取表

可以按表名或表路径访问 Delta 表中的数据,如以下示例中所示:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

Scala

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

写入到表

Delta Lake 使用标准语法将数据写入表中。

若要以原子方式将新数据添加到现有 Delta 表,请使用 append 模式,如以下示例中所示:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

Scala

df.write.mode("append").saveAsTable("people10m")

若要以原子方式替换表中的所有数据,请使用 overwrite 模式,如以下示例中所示:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

Scala

df.write.mode("overwrite").saveAsTable("people10m")

更新表

可以在 Delta 表中更新与谓词匹配的数据。 例如,在名为 people10m 的表或 /tmp/delta/people-10m 处的路径中,要将 gender 列中的缩写从 MF 更改为 MaleFemale,可以运行以下命令:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

从表中删除

可以从 Delta 表中删除与谓词匹配的数据。 例如,在名为 people10m 的表或 /tmp/delta/people-10m 处的路径中,要删除与 1955 之前的 birthDate 列中具有值的人对应的所有行,可以运行以下命令:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

重要

delete 可以从最新版本的 Delta 表中删除数据,但是直到显式删除旧的版本后才能从物理存储中删除数据。 有关详细信息,请参阅清空

显示表历史记录

若要查看表的历史记录,请使用 DESCRIBE HISTORY 语句,该语句提供对表进行的每次写入的出处信息,包括表版本、操作、用户等。

DESCRIBE HISTORY people_10m

查询表的旧版本(按时间顺序查看)

Delta Lake 按时间顺序查看允许你查询 Delta 表的旧快照。

若要查询较早版本的表,请在 SELECT 语句中指定版本或时间戳。 例如,若要从上述历史记录中查询版本 0,请使用:

SELECT * FROM people_10m VERSION AS OF 0

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

对于时间戳,仅接受日期或时间戳字符串,例如 "2019-01-01""2019-01-01'T'00:00:00.000Z"

使用 DataFrameReader 选项,可以从固定到表的特定版本的 Delta 表创建数据帧,例如在 Python 中:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

或者:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

有关详细信息,请参阅使用 Delta Lake 表历史记录

优化表

对表执行多个更改后,可能会有很多小文件。 为了提高读取查询的速度,你可以使用 OPTIMIZE 将小文件折叠为较大的文件:

OPTIMIZE people_10m

按列进行 Z 排序

为了进一步提高读取性能,你可以通过 Z 排序将相关的信息放置在同一组文件中。 Delta Lake 数据跳过算法会自动使用此并置,大幅减少需要读取的数据量。 若要对数据进行 Z 排序,请在 ZORDER BY 子句中指定要排序的列。 例如,若要按 gender 并置,请运行:

OPTIMIZE people_10m
ZORDER BY (gender)

有关运行 OPTIMIZE 时可用的完整选项集,请参阅在 Delta Lake 上使用优化压缩数据文件

使用 VACUUM 清理快照

Delta Lake 为读取提供快照隔离,这意味着即使其他用户或作业正在查询表,也可以安全地运行 OPTIMIZE。 不过,最终你应该清除旧快照。 可以运行 VACUUM 命令来执行此操作:

VACUUM people_10m

有关有效使用 VACUUM 的详细信息,请参阅使用 vacuum 删除未使用的数据文件