教程:Delta Lake
本教程介绍 Azure Databricks 上的常见 Delta Lake 操作,包括:
可以从附加到 Azure Databricks 群集的笔记本中运行本文中的示例 Python、R、Scala 和 SQL 代码。 还可以从与 Databricks SQL 中的 SQL 仓库关联的查询中运行本文中的 SQL 代码。
注意
下面的一些代码示例使用由架构(也称为数据库)和表或视图(例如 default.people10m
)组成的两级命名空间表示法。 若要将这些示例与 Unity Catalog 结合使用,请将两级命名空间替换为由目录、架构和表或视图(例如 main.default.people10m
)组成的 Unity Catalog 三级命名空间表示法。
创建表
默认情况下,在 Azure Databricks 上创建的所有表都使用 Delta Lake。
注意
Delta Lake 是 Azure Databricks 所有读取、写入和表创建命令的默认值。
Python
# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)
R
library(SparkR)
sparkR.session()
# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")
# Write the data to a table.
table_name = "people_10m"
saveAsTable(
df = df,
tableName = table_name
)
Scala
// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")
// Write the data to a table.
val table_name = "people_10m"
people.write.saveAsTable("people_10m")
SQL
DROP TABLE IF EXISTS people_10m;
CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;
上述操作使用从数据中推理出的架构来创建新的托管表。 有关创建 Delta 表时的可用选项的信息,请参阅 CREATE TABLE。
对于托管表,Azure Databricks 决定数据的位置。 若要获取位置,可以使用 DESCRIBE DETAIL 语句,例如:
Python
display(spark.sql('DESCRIBE DETAIL people_10m'))
R
display(sql("DESCRIBE DETAIL people_10m"))
Scala
display(spark.sql("DESCRIBE DETAIL people_10m"))
SQL
DESCRIBE DETAIL people_10m;
有时你可能希望通过指定架构来创建表,然后再插入数据。 你可以使用以下 SQL 命令完成此操作:
CREATE TABLE IF NOT EXISTS people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
CREATE OR REPLACE TABLE people10m (
id INT,
firstName STRING,
middleName STRING,
lastName STRING,
gender STRING,
birthDate TIMESTAMP,
ssn STRING,
salary INT
)
在 Databricks Runtime 13.0 及更高版本中,可以使用 CREATE TABLE LIKE
创建一个新的空 Delta 表,该表会复制源 Delta 表的架构和表属性。 这在将表从开发环境提升到生产环境时特别有用,如以下代码示例所示:
CREATE TABLE prod.people10m LIKE dev.people10m
也可以使用 Delta Lake 中的 DeltaTableBuilder
API 创建表。 与 DataFrameWriter API 相比,此 API 可以更轻松地指定其他信息,例如列注释、表属性和生成的列 。
重要
此功能目前以公共预览版提供。
Python
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("default.people10m") \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("id", "INT") \
.addColumn("firstName", "STRING") \
.addColumn("middleName", "STRING") \
.addColumn("lastName", "STRING", comment = "surname") \
.addColumn("gender", "STRING") \
.addColumn("birthDate", "TIMESTAMP") \
.addColumn("ssn", "STRING") \
.addColumn("salary", "INT") \
.property("description", "table with people data") \
.location("/tmp/delta/people10m") \
.execute()
Scala
// Create table in the metastore
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/tmp/delta/people10m")
.execute()
在表中更新插入
若要将一组更新和插入合并到现有 Delta 表中,请使用 MERGE INTO 语句。 例如,下面的语句从源表中获取数据并将其合并到目标 Delta 表中。 如果两个表中有一个匹配行,Delta Lake 会使用给定的表达式更新数据列。 如果没有匹配行,Delta Lake 会添加一个新行。 此操作称为“upsert”。
CREATE OR REPLACE TEMP VIEW people_updates (
id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
(9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
(9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
(10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
(20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
(20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
(20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);
MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
如果指定 *
,这将更新或插入目标表中的所有列。 此操作假定源表的列与目标表的列相同,否则查询将引发分析错误。
执行 INSERT
操作时必须为表中的每个列指定一个值(例如,当现有数据集中没有匹配行时,必须这样做)。 但是,你不需要更新所有值。
若要查看结果,请查询表。
SELECT * FROM people_10m WHERE id >= 9999998
读取表
可以按表名或表路径访问 Delta 表中的数据,如以下示例中所示:
Python
people_df = spark.read.table(table_name)
display(people_df)
## or
people_df = spark.read.load(table_path)
display(people_df)
R
people_df = tableToDF(table_name)
display(people_df)
Scala
val people_df = spark.read.table(table_name)
display(people_df)
\\ or
val people_df = spark.read.load(table_path)
display(people_df)
SQL
SELECT * FROM people_10m;
SELECT * FROM delta.`<path-to-table`;
写入到表
Delta Lake 使用标准语法将数据写入表中。
若要以原子方式将新数据添加到现有 Delta 表,请使用 append
模式,如以下示例中所示:
SQL
INSERT INTO people10m SELECT * FROM more_people
Python
df.write.mode("append").saveAsTable("people10m")
Scala
df.write.mode("append").saveAsTable("people10m")
若要以原子方式替换表中的所有数据,请使用 overwrite
模式,如以下示例中所示:
SQL
INSERT OVERWRITE TABLE people10m SELECT * FROM more_people
Python
df.write.mode("overwrite").saveAsTable("people10m")
Scala
df.write.mode("overwrite").saveAsTable("people10m")
更新表
可以在 Delta 表中更新与谓词匹配的数据。 例如,在名为 people10m
的表或 /tmp/delta/people-10m
处的路径中,要将 gender
列中的缩写从 M
或 F
更改为 Male
或 Female
,可以运行以下命令:
SQL
UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
condition = "gender = 'F'",
set = { "gender": "'Female'" }
)
# Declare the predicate by using Spark SQL functions.
deltaTable.update(
condition = col('gender') == 'M',
set = { 'gender': lit('Male') }
)
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
"gender = 'F'",
Map("gender" -> "'Female'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
col("gender") === "M",
Map("gender" -> lit("Male")));
从表中删除
可以从 Delta 表中删除与谓词匹配的数据。 例如,在名为 people10m
的表或 /tmp/delta/people-10m
处的路径中,要删除与 1955
之前的 birthDate
列中具有值的人对应的所有行,可以运行以下命令:
SQL
DELETE FROM people10m WHERE birthDate < '1955-01-01'
DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'
Python
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')
# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')
Scala
import io.delta.tables._
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")
// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")
import org.apache.spark.sql.functions._
import spark.implicits._
// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")
重要
delete
可以从最新版本的 Delta 表中删除数据,但是直到显式删除旧的版本后才能从物理存储中删除数据。 有关详细信息,请参阅清空。
显示表历史记录
若要查看表的历史记录,请使用 DESCRIBE HISTORY 语句,该语句提供对表进行的每次写入的出处信息,包括表版本、操作、用户等。
DESCRIBE HISTORY people_10m
查询表的旧版本(按时间顺序查看)
Delta Lake 按时间顺序查看允许你查询 Delta 表的旧快照。
若要查询较早版本的表,请在 SELECT
语句中指定版本或时间戳。 例如,若要从上述历史记录中查询版本 0,请使用:
SELECT * FROM people_10m VERSION AS OF 0
或
SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'
对于时间戳,仅接受日期或时间戳字符串,例如 "2019-01-01"
和 "2019-01-01'T'00:00:00.000Z"
。
使用 DataFrameReader 选项,可以从固定到表的特定版本的 Delta 表创建数据帧,例如在 Python 中:
df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")
display(df1)
或者:
df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")
display(df2)
有关详细信息,请参阅使用 Delta Lake 表历史记录。
优化表
对表执行多个更改后,可能会有很多小文件。 为了提高读取查询的速度,你可以使用 OPTIMIZE
将小文件折叠为较大的文件:
OPTIMIZE people_10m
按列进行 Z 排序
为了进一步提高读取性能,你可以通过 Z 排序将相关的信息放置在同一组文件中。 Delta Lake 数据跳过算法会自动使用此并置,大幅减少需要读取的数据量。 若要对数据进行 Z 排序,请在 ZORDER BY
子句中指定要排序的列。 例如,若要按 gender
并置,请运行:
OPTIMIZE people_10m
ZORDER BY (gender)
有关运行 OPTIMIZE
时可用的完整选项集,请参阅在 Delta Lake 上使用优化压缩数据文件。
使用 VACUUM
清理快照
Delta Lake 为读取提供快照隔离,这意味着即使其他用户或作业正在查询表,也可以安全地运行 OPTIMIZE
。 不过,最终你应该清除旧快照。 可以运行 VACUUM
命令来执行此操作:
VACUUM people_10m
有关有效使用 VACUUM
的详细信息,请参阅使用 vacuum 删除未使用的数据文件。