重要
写入 Unity Catalog 托管 Delta 表的事务目前处于公共预览阶段。
事务支持两种模式:非交互式模式和交互式模式。 本页介绍何时使用每个模式并包括实现示例。
有关事务的要求和概述,请参阅 事务。 有关这两种模式的实践实践,请参阅 教程:跨表协调事务。
非交互式事务
非交互式事务使用SQL 脚本和ATOMIC关键字。
ATOMIC 复合语句块将所有语句作为单个原子单元运行。 要么共同成功,要么共同失败。
支持的计算:任何运行 Databricks Runtime 18.0 及更高版本的 SQL 仓库、 无服务器计算或 群集 。
支持的语法:支持 SQL、Scala spark.sql 块和 PySpark spark.sql 块。
注释
在结构化流式处理forEachBatch中通过调用spark.sql("BEGIN ATOMIC ... END;")使用非交互式事务。 但是,结构化流式处理检查点无法以事务方式推进。
Syntax
BEGIN ATOMIC
statement1;
statement2;
statement3;
END;
如果所有语句成功,Azure Databricks 会自动提交所有更改。 如果任何语句失败,Azure Databricks 会自动回滚所有更改。
在 SQL 编辑器中使用
直接在 SQL 编辑器中运行非交互式事务。 选择整个 ATOMIC 复合语句 块并将其作为单个语句运行:
BEGIN ATOMIC
DELETE FROM staging_sales WHERE load_date < current_date() - INTERVAL 7 DAYS;
INSERT INTO staging_sales
SELECT * FROM raw_sales WHERE load_date = current_date();
MERGE INTO sales AS target
USING staging_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
在笔记本中使用
使用 SQL 单元格或编程 API 在笔记本中运行非交互式事务。
SQL
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
Python
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
Scala
spark.sql("""
BEGIN ATOMIC
UPDATE inventory SET quantity = quantity - 10 WHERE product_id = 2001;
UPDATE inventory SET quantity = quantity + 10 WHERE product_id = 2002;
INSERT INTO inventory_moves (from_product, to_product, quantity, move_date)
VALUES (2001, 2002, 10, current_date());
END;
""")
在计划作业中使用
非交互式事务在计划作业中运行良好,因为它们会自动处理提交和回滚:
BEGIN ATOMIC
-- Clear previous staging data
DELETE FROM staging_daily_sales WHERE load_date = current_date();
-- Load new data
INSERT INTO staging_daily_sales
SELECT sale_id, customer_id, amount, sale_date, current_date() as load_date
FROM raw_sales
WHERE sale_date = current_date() - INTERVAL 1 DAY;
-- Validate row count (fails transaction if no data)
IF (SELECT COUNT(*) FROM staging_daily_sales WHERE load_date = current_date()) = 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'No sales data loaded for yesterday';
END IF;
-- Merge into production
MERGE INTO daily_sales AS target
USING staging_daily_sales AS source
ON target.sale_id = source.sale_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
END;
如果任何语句失败,包括断言,则整个事务会自动回滚。
与 JDBC 配合使用
外部客户端可以运行非交互式事务。
JDBC
String sql = """
BEGIN ATOMIC
INSERT INTO orders (order_id, total) VALUES (1001, 500.00);
UPDATE customers SET last_order = CURRENT_DATE() WHERE customer_id = 5001;
END;
""";
Statement stmt = conn.createStatement();
stmt.execute(sql);
与语句执行 API 配合使用
使用 语句执行 API 运行非交互式事务:
import requests
sql = """
BEGIN ATOMIC
INSERT INTO sales (sale_id, amount) VALUES (3001, 750.00);
UPDATE daily_totals SET total = total + 750.00 WHERE sale_date = CURRENT_DATE();
END;
"""
response = requests.post(
f"{workspace_url}/api/2.0/sql/statements",
headers={"Authorization": f"Bearer {token}"},
json={
"warehouse_id": warehouse_id,
"statement": sql,
"wait_timeout": "30s"
}
)
ETL 模式
以下模式演示了使用非交互式事务的常见 ETL 工作流。
暂存和验证模式
此模式将数据加载到临时区域、验证数据质量并将已验证的记录合并到生产表中:
BEGIN ATOMIC
-- Load into staging
INSERT INTO staging_customers
SELECT * FROM external_source
WHERE ingest_date = current_date();
-- Validate data quality
IF (SELECT COUNT(*) FROM staging_customers WHERE email NOT LIKE '%@%') > 0 THEN
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Invalid email addresses found';
END IF;
-- Merge validated data
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Update metadata
UPDATE etl_metadata
SET last_load_date = current_date(),
rows_processed = (SELECT COUNT(*) FROM staging_customers)
WHERE table_name = 'customers';
END;
维度表和事实表模式
此模式在加载事实数据表以维护引用完整性之前更新维度表:
BEGIN ATOMIC
-- Update dimension tables first
MERGE INTO dim_products AS target
USING staging_products AS source
ON target.product_id = source.product_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
MERGE INTO dim_customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- Then load fact table with foreign key references
INSERT INTO fact_sales
SELECT s.sale_id, p.product_key, c.customer_key, s.sale_amount, s.sale_date
FROM staging_sales s
JOIN dim_products p ON s.product_id = p.product_id
JOIN dim_customers c ON s.customer_id = c.customer_id;
END;
错误处理
当一个语句在 BEGIN ATOMIC ... END; 块内失败时,Azure Databricks 将回滚所有更改并返回错误消息。
调试提示:
- 查看错误消息以确定哪个语句失败。
- 在事务块之外单独测试语句。
- 使用
SIGNAL添加验证检查以在失败时显示自定义错误消息。 - 查询事务历史记录以获取更多上下文。
交互式事务
交互式事务可让你显式控制事务边界。 手动开始事务、运行语句并显式 提交 或 回滚。
支持的计算:仅限 SQL 仓库 。
支持的语法:仅限 SQL。
Syntax
BEGIN TRANSACTION;
statement1;
statement2;
COMMIT;
-- or: ROLLBACK;
提交前进行验证
在提交之前,使用交互式事务验证结果:
BEGIN TRANSACTION;
-- Load staging data
INSERT INTO staging_customers
SELECT * FROM external_customers
WHERE load_date = current_date();
-- Validate and commit or rollback
BEGIN
DECLARE duplicate_count INT;
SET duplicate_count = (
SELECT COUNT(*) FROM (
SELECT customer_id, COUNT(*) as cnt
FROM staging_customers
WHERE load_date = current_date()
GROUP BY customer_id
HAVING COUNT(*) > 1
)
);
IF duplicate_count > 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Duplicate customers found in staging data';
ELSE
MERGE INTO customers AS target
USING staging_customers AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
COMMIT;
END IF;
END;
显式回滚
当验证失败或因业务逻辑需要放弃更改时,应回滚事务:
BEGIN TRANSACTION;
UPDATE inventory
SET quantity = quantity - 50
WHERE product_id = 2001;
-- Check if quantity would go negative
BEGIN
DECLARE new_quantity INT;
SET new_quantity = (SELECT quantity FROM inventory WHERE product_id = 2001);
IF new_quantity < 0 THEN
ROLLBACK;
SIGNAL SQLSTATE '45000' SET MESSAGE_TEXT = 'Insufficient inventory for product 2001';
ELSE
COMMIT;
END IF;
END;
与 JDBC 配合使用
JDBC 驱动程序支持在事务中使用 executeUpdate() 运行 DML 语句。 有关支持的 DML 语句的列表,请参阅 支持的操作。
JDBC 客户端通过禁用自动提交模式使用交互式事务:
Connection conn = DriverManager.getConnection(jdbcUrl, properties);
try {
conn.setAutoCommit(false); // Start transaction mode
Statement stmt = conn.createStatement();
stmt.executeUpdate("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)");
stmt.executeUpdate("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001");
conn.commit(); // Commit the transaction
} catch (SQLException e) {
conn.rollback(); // Roll back on error
throw e;
} finally {
conn.close();
}
不支持的 JDBC 操作
交互式事务中不支持以下 JDBC 操作:
| 类别 | 不支持 |
|---|---|
| 目录或架构切换 |
Connection.setCatalog() 和 Connection.setSchema() |
| 会话配置更改 |
Connection.setClientInfo() 用于会话级属性,例如 TIMEZONE 和 ANSI_MODE |
| 所有 DatabaseMetaData (所有协议) | 所有 DatabaseMetaData.* 方法 |
| PreparedStatement 元数据 | PreparedStatement.getMetaData() |
| 存储过程 | CALL procedure_name() |
与 ODBC 一起使用
ODBC 驱动程序支持使用 SQLExecute() 和 SQLExecDirect() 在事务中运行 DML 语句。 有关支持的 DML 语句的列表,请参阅 支持的操作。
ODBC 客户端可以通过 Azure Databricks ODBC 驱动程序,并使用标准 ODBC 事务管理功能,执行交互式事务。
不支持的 ODBC 操作
交互式事务中不支持以下 ODBC 操作:
| 类别 | 不支持 |
|---|---|
| 所有目录函数 |
SQLTables、SQLColumns、SQLStatistics、SQLSpecialColumns、SQLPrimaryKeys、SQLForeignKeys、SQLTablePrivileges、SQLColumnPrivileges、SQLProcedures、SQLProcedureColumns |
| 设置连接属性 | 使用 SQLSetConnectAttr() 切换目录、更改隔离级别和访问模式 |
| SQL 翻译 | SQLNativeSql |
与用于 Python 的 Databricks SQL 连接器配合使用
用于 Python 的 Databricks SQL 连接器支持在事务中使用 cursor.execute() DML 语句。 有关支持的 DML 语句的列表,请参阅 支持的操作。
Python 应用程序可以通过设置将交互式事务与autocommit=False配合使用:
from databricks import sql
with sql.connect(
server_hostname="dbc-a1b2345c-d6e7.cloud.databricks.com",
http_path="sql/1.0/warehouses/abc123def456",
access_token="your-access-token",
autocommit=False
) as connection:
with connection.cursor() as cursor:
cursor.execute("INSERT INTO accounts (account_id, balance) VALUES (1001, 5000)")
cursor.execute("UPDATE accounts SET balance = balance - 100 WHERE account_id = 1001")
connection.commit()
不支持的 Python 连接器操作
交互式事务中不支持以下 Python 连接器操作:
| 类别 | 不支持 |
|---|---|
| 所有元数据 |
cursor.catalogs()、cursor.schemas()、cursor.tables()、cursor.columns() |
交互式事务的驱动程序限制
使用交互式事务时,以下限制适用于所有驱动程序。
交互式事务中不支持元数据操作。 无论驱动程序或协议如何,事务中的以下操作都可能会失败:
| 驱动程序/协议 | 类型 | 方法 |
|---|---|---|
| JDBC | DatabaseMetaData |
getCatalogs()、getSchemas()、getTables()、getColumns()、getTypeInfo() |
| ODBC | 目录函数 |
SQLTables、SQLColumns、SQLGetTypeInfo |
| Python 连接器 | 元数据方法 |
cursor.catalogs()、cursor.schemas()、cursor.tables()、cursor.columns() |
| SQL | 元数据命令 |
SHOW TABLES、SHOW DATABASES、DESCRIBE TABLE、USE CATALOG、USE SCHEMA |
| SQL | information_schema |
SELECT 针对表的 information_schema 查询 |
在事务之外运行所有元数据操作。
警告
在单个驱动程序连接对象上的多个线程上运行事务会导致未定义的行为。 在每个连接对象上一次只运行一个事务。
隔离行为
交互式事务中未提交的更改仅对会话可见。 其他会话将表状态视为事务开始前的状态。
注释
交互式事务使用比非交互式事务更保守的冲突检测,并且可以在表级别上发生冲突(无条件追加除外)。 对于行级冲突检测,请使用非交互式事务(BEGIN ATOMIC ... END;)。
- 若要验证隔离,请创建示例表(如果不存在):
CREATE TABLE IF NOT EXISTS sample_accounts (
id INT,
account_name STRING,
balance DECIMAL(10,2)
) USING DELTA
TBLPROPERTIES ('delta.feature.catalogManaged' = 'supported');
在同一 会话中,启动事务并进行更改:
BEGIN TRANSACTION; INSERT INTO sample_accounts VALUES (10, 'Test', 100.00);在单独的 SQL 编辑器选项卡或笔记本会话(不是同一笔记本中的新单元格)中,查询表:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;这会返回 0 行,因为未提交的更改在您的第一个会话之外不可见。
返回到第一个会话并提交:
COMMIT;再次查询第二个会话的数据:
-- Run this in the SECOND session SELECT * FROM sample_accounts WHERE id = 10;该行之所以可见,是因为事务已被提交。
此隔离可防止其他用户读取可能回滚的数据。
选择事务模式
| 情景 | 建议的模式 |
|---|---|
| 计划的 ETL 作业 | 非互动式 - 自动提交或回滚简化了错误处理 |
| 固定语句序列 | 非交互式 — 更简单的语法,无需手动提交 |
| 提交前的数据验证 | 交互式 - 检查结果并决定是否提交 |
| 需要手动控制的 JDBC 应用程序 | 交互式 - 标准数据库事务模式 |
后续步骤
相关的 SQL 参考
- ATOMIC 复合语句(非交互式事务):以单个原子事务的形式运行多个 SQL 语句,并自动提交和回滚。
- BEGIN TRANSACTION (交互式事务):使用手动提交和回滚控件开始交互式事务。
- COMMIT:提交交互式事务并使所有更改永久生效。
- ROLLBACK:回滚交互式事务并放弃所有更改。