Lakehouse 联邦性能建议

本文提供如何提高 Lakehouse 联合查询性能的指导。

使用 AND 运算符合并多个谓词

Databricks Runtime 尝试将谓词向下推送到远程数据库引擎,以减少通过网络提取的记录数。 如果无法向下推送谓词,则对远程数据库引擎执行的查询将排除谓词,因此必须使用 Databricks Runtime 执行筛选。 但是,如果筛选器的某个部分无法向下推送,通过AND运算符连接的其他部分仍然可以向下推送。

示例 1

Databricks 查询:

SELECT * FROM foreign_catalog.schema.table WHERE name ILIKE 'john'

ILIKE无法将表达式向下推送到远程数据库(例如 MySQL),因为没有适当的翻译。 必须使用 Databricks Runtime 完成筛选。

发送到远程数据库的查询返回所有记录:

SELECT * FROM catalog.schema.table

示例 2

Databricks 查询:

SELECT * FROM foreign_catalog.schema.table WHERE name ILIKE 'john' AND date > '2025-05-01'

ILIKE 表达式无法下推到远程数据库(例如 MySQL),因为没有合适的转换,但日期比较可以下推。 名称筛选仍必须使用 Databricks Runtime 完成,但日期比较应减少正在提取的记录数。

发送到远程数据库的查询返回一部分记录:

SELECT * FROM catalog.schema.table WHERE date > '2025-05-01'

检查将在远程数据库上运行的查询

若要查看将发送到远程数据库的查询,请运行 EXPLAIN FORMATTED 命令。

重要

由于EXPLAIN FORMATTED,实际查询可能与输出中的查询不同。

设置从远程数据库提取的批大小

可以配置以下连接器(使用 JDBC 传输协议)来控制它们如何从远程系统提取数据。

  • Databricks
  • Microsoft SQL Server
  • Azure Synapse
  • MySQL
  • Oracle
  • PostgreSQL
  • Redshift
  • Salesforce Data 360
  • Teradata

JDBC 提取大小确定每个往返提取的行数。 默认情况下,大多数 JDBC 连接器以原子方式提取数据。 这可能会导致数据量超过可用内存。

若要避免内存不足错误,请设置 fetchSize 参数。 当设置为非零值时 fetchSize ,连接器会分批读取数据。 每个批处理的最大行数等于值 fetchSize。 Databricks 建议指定一 fetchSize 个大值(例如 100,000),因为如果批中的行数太小,则可以延长整个查询执行时间。

此参数允许工作器节点分批读取数据,但不能并行读取数据。

计算要求:

  • 必须在 Databricks Runtime 16.1 或更高版本上使用计算。 SQL 仓库必须是 Pro,并且必须使用 2024.50。
SELECT * FROM mySqlCatalog.schema.table WITH ('fetchSize' 100000)

设置分区大小参数 (Snowflake)

Snowflake 允许提取多个分区中的数据,从而能够参与多个执行器和并行处理。 必须通过设置 partition_size_in_mb 参数来选择适当的分区大小。 此参数指定每个分区的建议未压缩大小。 若要减少分区数,请指定更大的值。 默认值为 100 (MB)。

参数 partition_size_in_mb 设置建议的大小;分区的实际大小可能会有所不同。

计算要求:

  • 必须在 Databricks Runtime 16.1 或更高版本上使用计算。 SQL 仓库必须是 Pro,并且必须使用 2024.50。
SELECT * FROM snowflakeCatalog.schema.table WITH ('partition_size_in_mb' 1000)

为 JDBC 连接器启用并行读取

支持 JDBC 传输协议的连接器可以通过对查询进行分区来并行读取数据。 可以为以下连接器配置并行读取:

  • Databricks
  • Microsoft SQL Server
  • Azure Synapse
  • MySQL
  • Oracle
  • PostgreSQL
  • Redshift
  • Salesforce Data 360
  • Teradata

这允许多个执行程序同时提取数据,从而显著提高大型表的性能。

若要启用并行读取,请指定以下参数:

  • numPartitions:要用于并行的分区数
  • partitionColumn:用于对查询进行分区的数字列的名称
  • lowerBound:用于确定分区步幅的 partitionColumn 最小值
  • upperBound:用于决定分区步幅的 partitionColumn 最大值

重要

lowerBoundupperBound值仅用于决定分区步幅,而不是用于筛选表中的行。 将分区并返回表中的所有行。

分区列应为:

  • 数字列
  • 均匀分布于范围
  • 索引列以提高性能

计算要求:

  • 必须在 Databricks Runtime 17.1 或更高版本上使用计算。 SQL 仓库必须是专业或无服务器,并且必须使用 2025.25。

在以下示例中,查询将基于 id 列拆分为 4 个并行分区,每个分区处理一个大约 250 个 ID 的范围(假设每个 id11000ID 之间都有一条记录)。

SELECT * FROM mySqlCatalog.schema.table WITH (
  'numPartitions' 4,
  'partitionColumn' 'id',
  'lowerBound' 1,
  'upperBound' 1000
)

加入 Lakehouse 联合身份验证的下推

重要

此功能目前以公共预览版提供。

了解联接下推在 Databricks Lakehouse 联邦架构中的工作原理。

加入下推概述

联接下推技术是一种查询优化技术,其中,Databricks 将联接操作推送到远程数据库引擎,而不是提取数据并在本地执行联接。 这可以显著减少网络流量,并通过利用远程数据库的内置联接功能提高查询性能。

支持的数据源

以下数据源支持联接下推:

  • Oracle
  • PostgreSQL
  • MySQL
  • SQL Server
  • Teradata
  • Redshift
  • Snowflake
  • BigQuery

此功能现已普遍可用,并在默认情况下为 Redshift、Snowflake 和 BigQuery 启用。 以下限制和要求仅适用于 Oracle、PostgreSQL、MySQL、SQL Server 和 Teradata 连接器。

要求

  • 必须在 Databricks Runtime 17.2 或更高版本上使用计算。
  • SQL 仓库必须是 Pro,并且必须使用 2025.30。
  • 在 Databricks UI 的“预览”页上,您必须打开“联合查询的下推连接(公共预览)”

局限性

  • 仅支持内部、左外和右外连接。
  • 联接的子级中的别名仅在 DBR 17.3 及更高版本中受支持。

节点层次结构要求

若要将联接推下,左右子分支中的所有节点都必须可推下。 下列规则适用:

  • 支持的子节点:只有联接、筛选器、示例和扫描节点才会显示在查询计划中的联接下方,以便成功下推。
  • 不支持的子节点:如果连接下方的左侧或右侧分支中进行限制、偏移量或聚合操作,则无法向下推送连接。
  • 联接之上的操作:聚合、限制和偏移操作可以在联接之上应用时被下推。

例子

-- Supported: Join two table scans
SELECT *
FROM table1
INNER JOIN table2
ON col_from_table1 = col_from_table2 + 1

-- Supported: Join two table scans with a nested select query
SELECT *
FROM (SELECT a FROM table1) q1
INNER JOIN (SELECT a FROM table2) q2
ON q1.a = q2.a + 1

-- Supported: Child subqueries with aliases in projection (DBR 17.3+)
SELECT *
FROM (SELECT a AS a1 FROM table1) t1
INNER JOIN (SELECT a AS a2 FROM table2) t2
ON t1.a1 = t2.a2 + 1

-- Supported: Join with filters below
SELECT *
FROM (SELECT * FROM table1 WHERE a > 10) t1
INNER JOIN (SELECT * FROM table2 WHERE b < 20) t2
ON t1.id = t2.id

-- Supported: Aggregate on top of join
SELECT COUNT(*)
FROM table1 t1
INNER JOIN table2 t2
ON t1.id = t2.id

-- Not supported: Join on top of aggregate
SELECT *
FROM (SELECT id, COUNT(*) as cnt FROM table1 GROUP BY id) t1
INNER JOIN table2 t2
ON t1.id = t2.id

-- Not supported: Join on top of limit
SELECT *
FROM (SELECT * FROM table1 LIMIT 100) t1
INNER JOIN table2 t2
ON t1.id = t2.id

Observability

使用 EXPLAIN FORMATTED 来验证是否正在向下推送连接:

EXPLAIN FORMATTED
SELECT *
FROM foreign_catalog.schema.table1 t1
INNER JOIN foreign_catalog.schema.table2 t2
ON t1.id = t2.id

示例输出显示连接下推成功:

== Physical Plan ==
*(1) Scan JDBCRelation
PushedFilters: [id = id_1],
PushedJoins:
   [L]: Relation: foreign_catalog.schema.table1
        PushedFilters: [ID IS NOT NULL]
   [R]: Relation: foreign_catalog.schema.table2
        PushedFilters: [ID IS NOT NULL]

在此输出中:

  • id_1 是 Databricks 自动生成的别名,用于在列具有重复名称时解析歧义。
  • 上述PushedFiltersPushedJoins表示推送到远程数据库的实际联接条件。
  • 每个 PushedFilters 关系([L] 和 [R])展示了应用于每个表的附加筛选谓词。