Tutorial: Delta Lake

This tutorial introduces common Delta Lake operations on Azure Databricks, including the following:

You can run the example Python, Scala, and SQL code in this article from within a notebook attached to an Azure Databricks compute resource such as a cluster. You can also run the SQL code in this article from within a query associated with a SQL warehouse in Databricks SQL.

Prepare the source data

This tutorial relies on a dataset called People 10 M. It contains 10 million fictitious records that hold facts about people, like first and last names, date of birth, and salary. This tutorial assumes that this dataset is in a Unity Catalog volume that is associated with your target Azure Databricks workspace.

To get the People 10 M dataset for this tutorial, do the following:

  1. Go to the People 10 M page in Kaggle.
  2. Click Download to download a file named archive.zip to your local machine.
  3. Extract the file named export.csv from the archive.zip file. The export.csv file contains the data for this tutorial.

To upload the export.csv file into the volume, do the following:

  1. On the sidebar, click Catalog.
  2. In Catalog Explorer, browse to and open the volume where you want to upload the export.csv file.
  3. Click Upload to this volume.
  4. Drag and drop, or browse to and select, the export.csv file on your local machine.
  5. Click Upload.

In the following code examples, replace /Volumes/main/default/my-volume/export.csv with the path to the export.csv file in your target volume.

Create a table

All tables created on Azure Databricks use Delta Lake by default. Databricks recommends using Unity Catalog managed tables.

In the previous code example and the following code examples, replace the table name main.default.people_10m with your target three-part catalog, schema, and table name in Unity Catalog.

Note

Delta Lake is the default for all reads, writes, and table creation commands Azure Databricks.

Python

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", TimestampType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

df = spark.read.format("csv").option("header", True).schema(schema).load("/Volumes/main/default/my-volume/export.csv")

# Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

# If you know the table does not already exist, you can call this instead:
# df.saveAsTable("main.default.people_10m")

Scala

import org.apache.spark.sql.types._

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val df = spark.read.format("csv").option("header", "true").schema(schema).load("/Volumes/main/default/my-volume/export.csv")

// Create the table if it does not exist. Otherwise, replace the existing table.
df.writeTo("main.default.people_10m").createOrReplace()

// If you know that the table doesn't exist, call this instead:
// df.saveAsTable("main.default.people_10m")

SQL

CREATE OR REPLACE TABLE main.default.people_10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
);

COPY INTO main.default.people_10m
FROM '/Volumes/main/default/my-volume/export.csv'
FILEFORMAT = CSV
FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' );

The preceding operations create a new managed table. For information about available options when you create a Delta table, see CREATE TABLE.

In Databricks Runtime 13.3 LTS and above, you can use CREATE TABLE LIKE to create a new empty Delta table that duplicates the schema and table properties for a source Delta table. This can be especially useful when promoting tables from a development environment into production, as shown in the following code example:

CREATE TABLE main.default.people_10m_prod LIKE main.default.people_10m

To create an empty table, you can also use the DeltaTableBuilder API in Delta Lake for Python and Scala. Compared to equivalent DataFrameWriter APIs, these APIs make it easier to specify additional information like column comments, table properties, and generated columns.

Important

This feature is in Public Preview.

Python

DeltaTable.createIfNotExists(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Scala

DeltaTable.createOrReplace(spark)
  .tableName("main.default.people_10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

Upsert to a table

To merge a set of updates and insertions into an existing Delta table, you use the DeltaTable.merge method for Python and Scala, and the MERGE INTO statement for SQL. For example, the following example takes data from the source table and merges it into the target Delta table. When there is a matching row in both tables, Delta Lake updates the data column using the given expression. When there is no matching row, Delta Lake adds a new row. This operation is known as an upsert.

Python

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType
from datetime import date

schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("firstName", StringType(), True),
  StructField("middleName", StringType(), True),
  StructField("lastName", StringType(), True),
  StructField("gender", StringType(), True),
  StructField("birthDate", DateType(), True),
  StructField("ssn", StringType(), True),
  StructField("salary", IntegerType(), True)
])

data = [
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', date.fromisoformat('1992-09-17'), '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', date.fromisoformat('1984-05-22'), '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', date.fromisoformat('1968-07-22'), '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', date.fromisoformat('1978-01-14'), '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', date.fromisoformat('1982-10-29'), '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', date.fromisoformat('1981-06-25'), '567-89-0123', 89900)
]

people_10m_updates = spark.createDataFrame(data, schema)
people_10m_updates.createTempView("people_10m_updates")

# ...

from delta.tables import DeltaTable

deltaTable = DeltaTable.forName(spark, 'main.default.people_10m')

(deltaTable.alias("people_10m")
  .merge(
    people_10m_updates.alias("people_10m_updates"),
    "people_10m.id = people_10m_updates.id")
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Timestamp

val schema = StructType(Array(
  StructField("id", IntegerType, nullable = true),
  StructField("firstName", StringType, nullable = true),
  StructField("middleName", StringType, nullable = true),
  StructField("lastName", StringType, nullable = true),
  StructField("gender", StringType, nullable = true),
  StructField("birthDate", TimestampType, nullable = true),
  StructField("ssn", StringType, nullable = true),
  StructField("salary", IntegerType, nullable = true)
))

val data = Seq(
  Row(9999998, "Billy", "Tommie", "Luppitt", "M", Timestamp.valueOf("1992-09-17 00:00:00"), "953-38-9452", 55250),
  Row(9999999, "Elias", "Cyril", "Leadbetter", "M", Timestamp.valueOf("1984-05-22 00:00:00"), "906-51-2137", 48500),
  Row(10000000, "Joshua", "Chas", "Broggio", "M", Timestamp.valueOf("1968-07-22 00:00:00"), "988-61-6247", 90000),
  Row(20000001, "John", "", "Doe", "M", Timestamp.valueOf("1978-01-14 00:00:00"), "345-67-8901", 55500),
  Row(20000002, "Mary", "", "Smith", "F", Timestamp.valueOf("1982-10-29 00:00:00"), "456-78-9012", 98250),
  Row(20000003, "Jane", "", "Doe", "F", Timestamp.valueOf("1981-06-25 00:00:00"), "567-89-0123", 89900)
)

val people_10m_updates = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
people_10m_updates.createOrReplaceTempView("people_10m_updates")

// ...

import io.delta.tables.DeltaTable

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

deltaTable.as("people_10m")
  .merge(
    people_10m_updates.as("people_10m_updates"),
    "people_10m.id = people_10m_updates.id"
  )
  .whenMatched()
  .updateAll()
  .whenNotMatched()
  .insertAll()
  .execute()

SQL

CREATE OR REPLACE TEMP VIEW people_10m_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_10m_updates
ON people_10m.id = people_10m_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

In SQL, if you specify *, this updates or inserts all columns in the target table, assuming that the source table has the same columns as the target table. If the target table doesn't have the same columns, the query throws an analysis error.

You must specify a value for every column in your table when you perform an insert operation (for example, when there is no matching row in the existing dataset). However, you do not need to update all values.

To see the results, query the table.

Python

df = spark.read.table("main.default.people_10m")
df_filtered = df.filter(df["id"] >= 9999998)
display(df_filtered)

Scala

val df = spark.read.table("main.default.people_10m")
val df_filtered = df.filter($"id" >= 9999998)
display(df_filtered)

SQL

SELECT * FROM main.default.people_10m WHERE id >= 9999998

Read a table

You access data in Delta tables by the table name or the table path, as shown in the following examples:

Python

people_df = spark.read.table("main.default.people_10m")
display(people_df)

Scala

val people_df = spark.read.table("main.default.people_10m")
display(people_df)

SQL

SELECT * FROM main.default.people_10m;

Write to a table

Delta Lake uses standard syntax for writing data to tables.

To atomically add new data to an existing Delta table, use the append mode as shown in the following examples:

Python

df.write.mode("append").saveAsTable("main.default.people_10m")

Scala

df.write.mode("append").saveAsTable("main.default.people_10m")

SQL

INSERT INTO main.default.people_10m SELECT * FROM main.default.more_people

To replace all the data in a table, use the overwrite mode as in the following examples:

Python

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

Scala

df.write.mode("overwrite").saveAsTable("main.default.people_10m")

SQL

INSERT OVERWRITE TABLE main.default.people_10m SELECT * FROM main.default.more_people

Update a table

You can update data that matches a predicate in a Delta table. For example, in the example people_10m table, to change an abbreviation in the gender column from M or F to Male or Female, you can run the following:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")
)

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

SQL

UPDATE main.default.people_10m SET gender = 'Female' WHERE gender = 'F';
UPDATE main.default.people_10m SET gender = 'Male' WHERE gender = 'M';

Delete from a table

You can remove data that matches a predicate from a Delta table. For instance, in the example people_10m table, to delete all rows corresponding to people with a value in the birthDate column from before 1955, you can run the following:

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

SQL

DELETE FROM main.default.people_10m WHERE birthDate < '1955-01-01'

Important

Deletion removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vacuum for details.

Display table history

To view the history of a table, you use the DeltaTable.history method for Python and Scala, and the DESCRIBE HISTORY statement in SQL, which provides provenance information, including the table version, operation, user, and so on, for each write to a table.

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
display(deltaTable.history())

SQL

DESCRIBE HISTORY main.default.people_10m

Query an earlier version of the table (time travel)

Delta Lake time travel allows you to query an older snapshot of a Delta table.

To query an older version of a table, specify the table's version or timestamp. For example, to query version 0 or timestamp 2024-05-15T22:43:15.000+00:00Z from the preceding history, use the following:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
# Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
val deltaHistory = deltaTable.history()

display(deltaHistory.where("version == 0"))
// Or:
display(deltaHistory.where("timestamp == '2024-05-15T22:43:15.000+00:00'"))

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

For timestamps, only date or timestamp strings are accepted, for example, "2024-05-15T22:43:15.000+00:00" or "2024-05-15 22:43:15".

DataFrameReader options allow you to create a DataFrame from a Delta table that is fixed to a specific version or timestamp of the table, for example:

Python

df = spark.read.option('versionAsOf', 0).table("main.default.people_10m")
# Or:
df = spark.read.option('timestampAsOf', '2024-05-15T22:43:15.000+00:00').table("main.default.people_10m")

display(df)

Scala

val df = spark.read.option("versionAsOf", 0).table("main.default.people_10m")
// Or:
val df = spark.read.option("timestampAsOf", "2024-05-15T22:43:15.000+00:00").table("main.default.people_10m")

display(df)

SQL

SELECT * FROM main.default.people_10m VERSION AS OF 0
-- Or:
SELECT * FROM main.default.people_10m TIMESTAMP AS OF '2024-05-15T22:43:15.000+00:00'

For details, see Work with Delta Lake table history.

Optimize a table

After you have performed multiple changes to a table, you might have a lot of small files. To improve the speed of read queries, you can use the optimize operation to collapse small files into larger ones:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeCompaction()

SQL

OPTIMIZE main.default.people_10m

Z-order by columns

To improve read performance further, you can collocate related information in the same set of files by z-ordering. Delta Lake data-skipping algorithms use this collocation to dramatically reduce the amount of data that needs to be read. To z-order data, you specify the columns to order on in the z-order by operation. For example, to collocate by gender, run:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.optimize().executeZOrderBy("gender")

SQL

OPTIMIZE main.default.people_10m
ZORDER BY (gender)

For the full set of options available when running the optimize operation, see Optimize data file layout.

Clean up snapshots with VACUUM

Delta Lake provides snapshot isolation for reads, which means that it is safe to run an optimize operation even while other users or jobs are querying the table. Eventually however, you should clean up old snapshots. You can do this by running the vacuum operation:

Python

from delta.tables import *

deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

Scala

import io.delta.tables._

val deltaTable = DeltaTable.forName(spark, "main.default.people_10m")
deltaTable.vacuum()

SQL

VACUUM main.default.people_10m

For details on using the vacuum operation effectively, see Remove unused data files with vacuum.