使用 JDBC 查询数据库
Azure Databricks 支持使用 JDBC 连接到外部数据库。 本文提供了用于配置和使用这些连接的基本语法以及 Python、SQL 和 Scala 示例。
重要
本文所述的配置为试验性配置。 试验性功能按原样提供,Databricks 不会通过客户技术支持为它提供支持。 为了获得完整的查询联合支持,应改为使用 Lakehouse 联合身份验证,这使 Azure Databricks 用户能够利用 Unity Catalog 语法和数据治理工具。
重要
本文中的示例不包括 JDBC URL 中的用户名和密码。 Databricks 建议使用机密来存储数据库凭据。 例如: 。
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
若要使用 SQL 引用 Databricks 机密,必须在群集初始化过程中配置 Spark 配置属性。
若要查看机密管理的完整示例,请参阅机密工作流示例。
使用 JDBC 读取数据
必须配置一些设置才能使用 JDBC 读取数据。 请注意,每个数据库对 <jdbc-url>
使用不同的格式。
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
Spark 会自动从数据库表中读取架构,并将其类型映射回 Spark SQL 类型。
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
可针对此 JDBC 表运行查询:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
使用 JDBC 写入数据
通过 JDBC 将数据保存到表的操作使用与读取操作类似的配置。 请参阅以下示例:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
默认行为会尝试创建一个新表,如果同名的表已存在,则会引发错误。
可以使用以下语法将数据追加到现有表:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
可以使用以下语法覆盖现有表:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
控制 JDBC 查询的并行度
默认情况下,JDBC 驱动程序仅以单个线程查询源数据库。 若要提高读取性能,需要指定多个选项来控制 Azure Databricks 对数据库进行的并行查询数。 对于小型群集,请将 numPartitions
选项设置为群集中的执行程序核心数,这样可确保所有节点并行查询数据。
警告
在大型群集上将 numPartitions
设置为较高的值可能会导致远程数据库的性能下降,因为同时进行过多的查询可能会使服务不堪重负。 这对于应用数据库尤其麻烦。 在将该值设置为 50 以上时需要非常谨慎。
注意
通过为 partitionColumn
选择在源数据库中计算索引的列来加快查询速度。
以下代码示例演示如何为具有 8 个核心的群集配置并行度:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
注意
Azure Databricks 支持所有用于配置 JDBC 的 Apache Spark 选项。
使用 JDBC 写入数据库时,Apache Spark 使用内存中的分区数来控制并行度。 可以在进行写入之前重新分区数据以控制并行度。 请避免在大型群集上设置数量过多的分区,以防远程数据库负担过重。 以下示例演示如何在进行写入之前重新分区(8 个分区):
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
将查询向下推送到数据库引擎
可将整个查询向下推送到数据库,且只返回结果。 table
参数标识要读取的 JDBC 表。 可使用 SQL 查询 FROM
子句中有效的任何内容。
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
控制每个查询提取的行数
JDBC 驱动程序有一个 fetchSize
参数,它控制一次从远程数据库中提取的行数。
设置 | 结果 |
---|---|
过低 | 由于多次往返导致延迟较高(每个查询返回的行数很少) |
过高 | 内存不足错误(单个查询中返回的数据过多) |
最佳值取决于工作负载。 考虑因素包括:
- 查询返回的列数是多少?
- 返回哪些数据类型?
- 每列中的字符串返回要多长时间?
系统可能采用极小的默认值,进行优化可能有好处。 例如:Oracle 的默认 fetchSize
值为 10。 将其增大至 100 可将需要执行的查询总数减少 10 倍。 JDBC 结果是网络流量,因此请避免使用非常大的数字,不过对于许多数据集,最佳值可能在数千左右。
使用 fetchSize
选项,如以下示例所示:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()