本文提供如何提高 Lakehouse 联合查询性能的指导。
使用 AND 运算符合并多个谓词
Databricks Runtime 尝试将谓词向下推送到远程数据库引擎,以减少通过网络提取的记录数。 如果无法向下推送谓词,则对远程数据库引擎执行的查询将排除谓词,因此必须使用 Databricks Runtime 执行筛选。 但是,如果筛选器的某个部分无法向下推送,通过AND运算符连接的其他部分仍然可以向下推送。
示例 1
Databricks 查询:
SELECT * FROM foreign_catalog.schema.table WHERE name ILIKE 'john'
ILIKE无法将表达式向下推送到远程数据库(例如 MySQL),因为没有适当的翻译。 必须使用 Databricks Runtime 完成筛选。
发送到远程数据库的查询返回所有记录:
SELECT * FROM catalog.schema.table
示例 2
Databricks 查询:
SELECT * FROM foreign_catalog.schema.table WHERE name ILIKE 'john' AND date > '2025-05-01'
ILIKE 表达式无法下推到远程数据库(例如 MySQL),因为没有合适的转换,但日期比较可以下推。 名称筛选仍必须使用 Databricks Runtime 完成,但日期比较应减少正在提取的记录数。
发送到远程数据库的查询返回一部分记录:
SELECT * FROM catalog.schema.table WHERE date > '2025-05-01'
检查将在远程数据库上运行的查询
若要查看将发送到远程数据库的查询,请运行 EXPLAIN FORMATTED 命令。
设置从远程数据库提取的批大小
可以配置以下连接器(使用 JDBC 传输协议)来控制它们如何从远程系统提取数据。
- Databricks
- Microsoft SQL Server
- Azure Synapse
- MySQL
- Oracle
- PostgreSQL
- Redshift
- Salesforce Data 360
- Teradata
JDBC 提取大小确定每个往返提取的行数。 默认情况下,大多数 JDBC 连接器以原子方式提取数据。 这可能会导致数据量超过可用内存。
若要避免内存不足错误,请设置 fetchSize 参数。 当设置为非零值时 fetchSize ,连接器会分批读取数据。 每个批处理的最大行数等于值 fetchSize。 Databricks 建议指定一 fetchSize 个大值(例如 100,000),因为如果批中的行数太小,则可以延长整个查询执行时间。
此参数允许工作器节点分批读取数据,但不能并行读取数据。
计算要求:
- 必须在 Databricks Runtime 16.1 或更高版本上使用计算。 SQL 仓库必须是 Pro,并且必须使用 2024.50。
SELECT * FROM mySqlCatalog.schema.table WITH ('fetchSize' 100000)
设置分区大小参数 (Snowflake)
Snowflake 允许提取多个分区中的数据,从而能够参与多个执行器和并行处理。
必须通过设置 partition_size_in_mb 参数来选择适当的分区大小。
此参数指定每个分区的建议未压缩大小。 若要减少分区数,请指定更大的值。
默认值为 100 (MB)。
参数 partition_size_in_mb 设置建议的大小;分区的实际大小可能会有所不同。
计算要求:
- 必须在 Databricks Runtime 16.1 或更高版本上使用计算。 SQL 仓库必须是 Pro,并且必须使用 2024.50。
SELECT * FROM snowflakeCatalog.schema.table WITH ('partition_size_in_mb' 1000)
为 JDBC 连接器启用并行读取
支持 JDBC 传输协议的连接器可以通过对查询进行分区来并行读取数据。 可以为以下连接器配置并行读取:
- Databricks
- Microsoft SQL Server
- Azure Synapse
- MySQL
- Oracle
- PostgreSQL
- Redshift
- Salesforce Data 360
- Teradata
这允许多个执行程序同时提取数据,从而显著提高大型表的性能。
若要启用并行读取,请指定以下参数:
-
numPartitions:要用于并行的分区数 -
partitionColumn:用于对查询进行分区的数字列的名称 -
lowerBound:用于确定分区步幅的partitionColumn最小值 -
upperBound:用于决定分区步幅的partitionColumn最大值
重要
和lowerBoundupperBound值仅用于决定分区步幅,而不是用于筛选表中的行。 将分区并返回表中的所有行。
分区列应为:
- 数字列
- 均匀分布于范围
- 索引列以提高性能
计算要求:
- 必须在 Databricks Runtime 17.1 或更高版本上使用计算。 SQL 仓库必须是专业或无服务器,并且必须使用 2025.25。
在以下示例中,查询将基于 id 列拆分为 4 个并行分区,每个分区处理一个大约 250 个 ID 的范围(假设每个 id11000ID 之间都有一条记录)。
SELECT * FROM mySqlCatalog.schema.table WITH (
'numPartitions' 4,
'partitionColumn' 'id',
'lowerBound' 1,
'upperBound' 1000
)
加入 Lakehouse 联合身份验证的下推
重要
此功能目前以公共预览版提供。
了解联接下推在 Databricks Lakehouse 联邦架构中的工作原理。
加入下推概述
联接下推技术是一种查询优化技术,其中,Databricks 将联接操作推送到远程数据库引擎,而不是提取数据并在本地执行联接。 这可以显著减少网络流量,并通过利用远程数据库的内置联接功能提高查询性能。
支持的数据源
以下数据源支持联接下推:
- Oracle
- PostgreSQL
- MySQL
- SQL Server
- Teradata
- Redshift
- Snowflake
- BigQuery
此功能现已普遍可用,并在默认情况下为 Redshift、Snowflake 和 BigQuery 启用。 以下限制和要求仅适用于 Oracle、PostgreSQL、MySQL、SQL Server 和 Teradata 连接器。
要求
- 必须在 Databricks Runtime 17.2 或更高版本上使用计算。
- SQL 仓库必须是 Pro,并且必须使用 2025.30。
- 在 Databricks UI 的“预览”页上,您必须打开“联合查询的下推连接(公共预览)”。
局限性
- 仅支持内部、左外和右外连接。
- 联接的子级中的别名仅在 DBR 17.3 及更高版本中受支持。
节点层次结构要求
若要将联接推下,左右子分支中的所有节点都必须可推下。 下列规则适用:
- 支持的子节点:只有联接、筛选器、示例和扫描节点才会显示在查询计划中的联接下方,以便成功下推。
- 不支持的子节点:如果连接下方的左侧或右侧分支中进行限制、偏移量或聚合操作,则无法向下推送连接。
- 联接之上的操作:聚合、限制和偏移操作可以在联接之上应用时被下推。
例子
-- Supported: Join two table scans
SELECT *
FROM table1
INNER JOIN table2
ON col_from_table1 = col_from_table2 + 1
-- Supported: Join two table scans with a nested select query
SELECT *
FROM (SELECT a FROM table1) q1
INNER JOIN (SELECT a FROM table2) q2
ON q1.a = q2.a + 1
-- Supported: Child subqueries with aliases in projection (DBR 17.3+)
SELECT *
FROM (SELECT a AS a1 FROM table1) t1
INNER JOIN (SELECT a AS a2 FROM table2) t2
ON t1.a1 = t2.a2 + 1
-- Supported: Join with filters below
SELECT *
FROM (SELECT * FROM table1 WHERE a > 10) t1
INNER JOIN (SELECT * FROM table2 WHERE b < 20) t2
ON t1.id = t2.id
-- Supported: Aggregate on top of join
SELECT COUNT(*)
FROM table1 t1
INNER JOIN table2 t2
ON t1.id = t2.id
-- Not supported: Join on top of aggregate
SELECT *
FROM (SELECT id, COUNT(*) as cnt FROM table1 GROUP BY id) t1
INNER JOIN table2 t2
ON t1.id = t2.id
-- Not supported: Join on top of limit
SELECT *
FROM (SELECT * FROM table1 LIMIT 100) t1
INNER JOIN table2 t2
ON t1.id = t2.id
Observability
使用 EXPLAIN FORMATTED 来验证是否正在向下推送连接:
EXPLAIN FORMATTED
SELECT *
FROM foreign_catalog.schema.table1 t1
INNER JOIN foreign_catalog.schema.table2 t2
ON t1.id = t2.id
示例输出显示连接下推成功:
== Physical Plan ==
*(1) Scan JDBCRelation
PushedFilters: [id = id_1],
PushedJoins:
[L]: Relation: foreign_catalog.schema.table1
PushedFilters: [ID IS NOT NULL]
[R]: Relation: foreign_catalog.schema.table2
PushedFilters: [ID IS NOT NULL]
在此输出中:
-
id_1是 Databricks 自动生成的别名,用于在列具有重复名称时解析歧义。 - 上述
PushedFiltersPushedJoins表示推送到远程数据库的实际联接条件。 - 每个
PushedFilters关系([L] 和 [R])展示了应用于每个表的附加筛选谓词。