Delta Lake 快速入门Delta Lake quickstart

Delta Lake 快速入门概述了使用 Delta Lake 的基础知识。The Delta Lake quickstart provides an overview of the basics of working with Delta Lake. 本快速入门展示了如何生成将 JSON 数据读取到 Delta 表中的管道,以及如何修改表、读取表、显示表历史记录和优化表。The quickstart shows how to build pipeline that reads JSON data into a Delta table, modify the table, read the table, display table history, and optimize the table.

有关演示这些功能的 Azure Databricks 笔记本,请参阅介绍性笔记本For Azure Databricks notebooks that demonstrate these features, see Introductory notebooks.

创建表Create a table

若要创建 Delta 表,可以使用现有的 Apache Spark SQL 代码,并将格式从 parquetcsvjson 等更改为 deltaTo create a Delta table, you can use existing Apache Spark SQL code and change the format from parquet, csv, json, and so on, to delta.

对于所有文件类型,都需要将文件读入数据帧并以 delta 格式写出:For all file types, you read the files into a DataFrame and write out in delta format:

PythonPython

events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.format("delta").save("/mnt/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")

RR

library(SparkR)
sparkR.session()

events <- read.json("/databricks-datasets/structured-streaming/events/")
write.df(events, source = "delta", path = "/mnt/delta/events")
sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")

SQLSQL

CREATE TABLE events
USING delta
AS SELECT *
FROM json.`/data/events/`

这些操作使用从 JSON 数据中推断出的架构来创建新的非托管表。These operations create a new unmanaged table using the schema that was inferred from the JSON data. 有关创建新的 Delta 表时可用的完整选项集,请参阅创建表写入到表For the full set of options available when you create a new Delta table, see Create a table and Write to a table.

如果源文件采用 Parquet 格式,则可以使用 SQL Convert to Delta 语句就地转换文件,以创建非托管表:If your source files are in Parquet format, you can use the SQL Convert to Delta statement to convert files in place to create an unmanaged table:

CONVERT TO DELTA parquet.`/mnt/delta/events`

将数据分区Partition data

若要加速其谓词涉及分区列的查询,可以对数据进行分区。To speed up queries that have predicates involving the partition columns, you can partition data.

PythonPython

events = spark.read.json("/databricks-datasets/structured-streaming/events/")
events.write.partitionBy("date").format("delta").save("/mnt/delta/events")
spark.sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")

RR

events <- read.json("/databricks-datasets/structured-streaming/events/")
write.df(events, source = "delta", path = "/mnt/delta/events", partitionBy = "date")
sql("CREATE TABLE events USING DELTA LOCATION '/mnt/delta/events/'")

SQLSQL

若要在使用 SQL 创建 Delta 表时对数据进行分区,请指定 PARTITIONED BY 列。To partition data when you create a Delta table using SQL, specify PARTITIONED BY columns.

CREATE TABLE events (
  date DATE,
  eventId STRING,
  eventType STRING,
  data STRING)
USING delta
PARTITIONED BY (date)

修改表Modify a table

Delta Lake 支持使用一组丰富的操作来修改表。Delta Lake supports a rich set of operations to modify tables.

流式处理到表的写入Stream writes to a table

你可以使用结构化流式处理将数据写入 Delta 表。You can write data into a Delta table using Structured Streaming. 即使有针对表并行运行的其他流或批处理查询,Delta Lake 事务日志也可确保仅处理一次。The Delta Lake transaction log guarantees exactly-once processing, even when there are other streams or batch queries running concurrently against the table. 默认情况下,流在追加模式下运行,这会将新记录添加到表中。By default, streams run in append mode, which adds new records to the table.

PythonPython

from pyspark.sql.types import *

inputPath = "/databricks-datasets/structured-streaming/events/"

jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

eventsDF = (
  spark
    .readStream
    .schema(jsonSchema) # Set the schema of the JSON data
    .option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
    .json(inputPath)
)

(eventsDF.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")
)

RR

inputPath <- "/databricks-datasets/structured-streaming/events/"
tablePath <- "/mnt/delta/events/"

jsonSchema <- structType(structField("time", "timestamp", T), structField("action", "string", T))
eventsStream <- read.stream(
  "json",
  path = inputPath,
  schema = jsonSchema,
  maxFilesPerTrigger = 1)

write.stream(
  eventsStream,
  path = tablePath,
  mode = "append",
  checkpointLocation = "/mnt/delta/events/_checkpoints/etl-from-json")

若要详细了解 Delta Lake 与结构化流式处理的集成,请参阅表流读取和写入For more information about Delta Lake integration with Structured Streaming, see Table streaming reads and writes.

批量 upsertBatch upserts

若要将一组更新和插入合并到现有表中,请使用 MERGE INTO 语句。To merge a set of updates and insertions into an existing table, you use the MERGE INTO statement. 例如,下面的语句将获取一个更新流,并将其合并到 events 表中。For example, the following statement takes a stream of updates and merges it into the events table. 如果已存在具有相同 eventId 的事件,Delta Lake 会使用给定的表达式更新数据列。When there is already an event present with the same eventId, Delta Lake updates the data column using the given expression. 如果没有匹配的事件,Delta Lake 会添加一个新行。When there is no matching event, Delta Lake adds a new row.

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

执行 INSERT 时必须为表中的每个列指定一个值(例如,当现有数据集中没有匹配行时,必须这样做)。You must specify a value for every column in your table when you perform an INSERT (for example, when there is no matching row in the existing dataset). 但是,你不需要更新所有值。However, you do not need to update all values.

读取表Read a table

本节内容:In this section:

可以通过指定 DBFS 上的路径 ("/mnt/delta/events") 或表名 ("events") 来访问 Delta 表中的数据:You access data in Delta tables either by specifying the path on DBFS ("/mnt/delta/events") or the table name ("events"):

ScalaScala

val events = spark.read.format("delta").load("/mnt/delta/events")

or

val events = spark.table("events")

RR

events <- read.df(path = "/mnt/delta/events", source = "delta")

or

events <- tableToDF("events")

SQLSQL

SELECT * FROM delta.`/mnt/delta/events`

or

SELECT * FROM events

显示表历史记录Display table history

若要查看表的历史记录,请使用 DESCRIBE HISTORY 语句,该语句提供对表进行的每次写入的出处信息,包括表版本、操作、用户等。To view the history of a table, use the DESCRIBE HISTORY statement, which provides provenance information, including the table version, operation, user, and so on, for each write to a table. 请参阅描述历史记录See Describe History.

查询较早版本的表(按时间顺序查看)Query an earlier version of the table (time travel)

Delta Lake 按时间顺序查看允许你查询 Delta 表的旧快照。Delta Lake time travel allows you to query an older snapshot of a Delta table.

对于 timestamp_string,只接受日期或时间戳字符串。For timestamp_string, only date or timestamp strings are accepted. 例如,"2019-01-01""2019-01-01'T'00:00:00.000Z"For example, "2019-01-01" and "2019-01-01'T'00:00:00.000Z".

若要查询较早版本的表,请在 SELECT 语句中指定版本或时间戳。To query an older version of a table, specify a version or timestamp in a SELECT statement. 例如,若要从上述历史记录中查询版本 0,请使用:For example, to query version 0 from the history above, use:

SELECT * FROM events VERSION AS OF 0

or

SELECT * FROM events TIMESTAMP AS OF '2019-01-29 00:37:58'

备注

由于版本 1 位于时间戳 '2019-01-29 00:38:10' 处,因此,若要查询版本 0,可以使用范围 '2019-01-29 00:37:58''2019-01-29 00:38:09'(含)中的任何时间戳。Because version 1 is at timestamp '2019-01-29 00:38:10', to query version 0 you can use any timestamp in the range '2019-01-29 00:37:58' to '2019-01-29 00:38:09' inclusive.

使用 DataFrameReader 选项,可以从固定到表的特定版本的 Delta 表创建数据帧。DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version of the table.

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/mnt/delta/events")
df2 = spark.read.format("delta").option("versionAsOf", version).load("/mnt/delta/events")

有关详细信息,请参阅查询表的旧快照(按时间顺序查看)For details, see Query an older snapshot of a table (time travel).

优化表Optimize a table

对表执行多个更改后,可能会有很多小文件。Once you have performed multiple changes to a table, you might have a lot of small files. 为了提高读取查询的速度,你可以使用 OPTIMIZE 将小文件折叠为较大的文件:To improve the speed of read queries, you can use OPTIMIZE to collapse small files into larger ones:

OPTIMIZE delta.`/mnt/delta/events`

or

OPTIMIZE events

按列进行 Z 排序Z-order by columns

为了进一步提高读取性能,你可以通过 Z 排序将相关的信息放置在同一组文件中。To improve read performance further, you can co-locate related information in the same set of files by Z-Ordering. Delta Lake 数据跳过算法会自动使用此并置,大幅减少需要读取的数据量。This co-locality is automatically used by Delta Lake data-skipping algorithms to dramatically reduce the amount of data that needs to be read. 若要对数据进行 Z 排序,请在 ZORDER BY 子句中指定要排序的列。To Z-Order data, you specify the columns to order on in the ZORDER BY clause. 例如,若要按 eventType 并置,请运行:For example, to co-locate by eventType, run:

OPTIMIZE events
  ZORDER BY (eventType)

有关运行 OPTIMIZE 时可用的完整选项集,请参阅压缩(装箱)For the full set of options available when running OPTIMIZE, see Compaction (bin-packing).

清理快照 Clean up snapshots

Delta Lake 为读取提供快照隔离,这意味着即使其他用户或作业正在查询表,也可以安全地运行 OPTIMIZEDelta Lake provides snapshot isolation for reads, which means that it is safe to run OPTIMIZE even while other users or jobs are querying the table. 不过,最终你应该清除旧快照。Eventually however, you should clean up old snapshots. 可以运行 VACUUM 命令来执行此操作:You can do this by running the VACUUM command:

VACUUM events

使用 RETAIN <N> HOURS 选项控制最新保留的快照的期限:You control the age of the latest retained snapshot by using the RETAIN <N> HOURS option:

VACUUM events RETAIN 24 HOURS

若要详细了解如何有效地使用 VACUUM,请参阅清空For details on using VACUUM effectively, see Vacuum.