使用 JDBC 查询数据库

Azure Databricks 支持使用 JDBC 连接到外部数据库。 本文提供了用于配置和使用这些连接的基本语法以及 Python、SQL 和 Scala 示例。

重要

本文所述的配置为试验性配置。 试验性功能按原样提供,Databricks 不会通过客户技术支持为它提供支持。 为了获得完整的查询联合支持,应改为使用 Lakehouse 联合身份验证,这使 Azure Databricks 用户能够利用 Unity Catalog 语法和数据治理工具。

重要

本文中的示例不包括 JDBC URL 中的用户名和密码。 Databricks 建议使用机密来存储数据库凭据。 例如: 。

Python

username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")

Scala

val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")

若要使用 SQL 引用 Databricks 机密,必须在群集初始化过程中配置 Spark 配置属性

若要查看机密管理的完整示例,请参阅机密工作流示例

使用 JDBC 读取数据

必须配置一些设置才能使用 JDBC 读取数据。 请注意,每个数据库对 <jdbc-url> 使用不同的格式。

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

Spark 会自动从数据库表中读取架构,并将其类型映射回 Spark SQL 类型。

Python

employees_table.printSchema

SQL

DESCRIBE employees_table_vw

Scala

employees_table.printSchema

可针对此 JDBC 表运行查询:

Python

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

SQL

SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age

Scala

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

使用 JDBC 写入数据

通过 JDBC 将数据保存到表的操作使用与读取操作类似的配置。 请参阅以下示例:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

默认行为会尝试创建一个新表,如果同名的表已存在,则会引发错误。

可以使用以下语法将数据追加到现有表:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()
)

SQL

CREATE TABLE IF NOT EXISTS new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
);

INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("append")
  .save()

可以使用以下语法覆盖现有表:

Python

(employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()
)

SQL

CREATE OR REPLACE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT * FROM employees_table_vw;

Scala

employees_table.write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .mode("overwrite")
  .save()

控制 JDBC 查询的并行度

默认情况下,JDBC 驱动程序仅以单个线程查询源数据库。 若要提高读取性能,需要指定多个选项来控制 Azure Databricks 对数据库进行的并行查询数。 对于小型群集,请将 numPartitions 选项设置为群集中的执行程序核心数,这样可确保所有节点并行查询数据。

警告

在大型群集上将 numPartitions 设置为较高的值可能会导致远程数据库的性能下降,因为同时进行过多的查询可能会使服务不堪重负。 这对于应用数据库尤其麻烦。 在将该值设置为 50 以上时需要非常谨慎。

注意

通过为 partitionColumn 选择在源数据库中计算索引的列来加快查询速度。

以下代码示例演示如何为具有 8 个核心的群集配置并行度:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  # number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>',
  partitionColumn "<partition-key>",
  lowerBound "<min-value>",
  upperBound "<max-value>",
  numPartitions 8
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  // a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition-key>")
  // lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min-value>")
  // max value to pull data for with the partitionColumn
  .option("upperBound", "<max-value>")
  // number of partitions to distribute the data into. Do not set this very large (~hundreds)
  .option("numPartitions", 8)
  .load()

注意

Azure Databricks 支持所有用于配置 JDBC 的 Apache Spark 选项

使用 JDBC 写入数据库时,Apache Spark 使用内存中的分区数来控制并行度。 可以在进行写入之前重新分区数据以控制并行度。 请避免在大型群集上设置数量过多的分区,以防远程数据库负担过重。 以下示例演示如何在进行写入之前重新分区(8 个分区):

Python

(employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()
)

SQL

CREATE TABLE new_employees_table
  USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw

Scala

employees_table.repartition(8)
  .write
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<new-table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .save()

将查询向下推送到数据库引擎

可将整个查询向下推送到数据库,且只返回结果。 table 参数标识要读取的 JDBC 表。 可使用 SQL 查询 FROM 子句中有效的任何内容。

Python

pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "(select * from employees where emp_no < 10008) as emp_alias",
  user '<username>',
  password '<password>'
)

Scala

val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", pushdown_query)
  .option("user", "<username>")
  .option("password", "<password>")
  .load()

控制每个查询提取的行数

JDBC 驱动程序有一个 fetchSize 参数,它控制一次从远程数据库中提取的行数。

设置 结果
过低 由于多次往返导致延迟较高(每个查询返回的行数很少)
过高 内存不足错误(单个查询中返回的数据过多)

最佳值取决于工作负载。 考虑因素包括:

  • 查询返回的列数是多少?
  • 返回哪些数据类型?
  • 每列中的字符串返回要多长时间?

系统可能采用极小的默认值,进行优化可能有好处。 例如:Oracle 的默认 fetchSize 值为 10。 将其增大至 100 可将需要执行的查询总数减少 10 倍。 JDBC 结果是网络流量,因此请避免使用非常大的数字,不过对于许多数据集,最佳值可能在数千左右。

使用 fetchSize 选项,如以下示例所示:

Python

employees_table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()
)

SQL

CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
  url "<jdbc-url>",
  dbtable "<table-name>",
  user '<username>',
  password '<password>'.
  fetchSize 100
)

Scala

val employees_table = spark.read
  .format("jdbc")
  .option("url", "<jdbc-url>")
  .option("dbtable", "<table-name>")
  .option("user", "<username>")
  .option("password", "<password>")
  .option("fetchSize", "100")
  .load()