使用 JDBC 的 SQL 数据库 SQL Databases using JDBC

Databricks Runtime 包含 Microsoft SQL ServerAzure SQL 数据库的 JDBC 驱动程序。Databricks Runtime contains JDBC drivers for Microsoft SQL Server and Azure SQL Database. 若要查看 Databricks Runtime 中包含的 JDBC 库的完整列表,请参阅 Databricks Runtime 发行说明See the Databricks runtime release notes for the complete list of JDBC libraries included in Databricks Runtime.

本文介绍如何使用数据帧 API 连接到使用 JDBC 的 SQL 数据库,以及如何控制通过 JDBC 接口进行的读取操作的并行度。This article covers how to use the DataFrame API to connect to SQL databases using JDBC and how to control the parallelism of reads through the JDBC interface. 本文提供了使用 Scala API 的详细示例,并在末尾提供了 Python 和 Spark SQL 的简略示例。This article provides detailed examples using the Scala API, with abbreviated Python and Spark SQL examples at the end. 若要查看用于连接到使用 JDBC 的 SQL 数据库的所有受支持的参数,请参阅使用 JDBC 连接到其他数据库For all of the supported arguments for connecting to SQL databases using JDBC, see JDBC To Other Databases.

备注

还有一种连接到 SQL Server 和 Azure SQL 数据库的方法是使用 Apache Spark 连接器Another option for connecting to SQL Server and Azure SQL Database is the Apache Spark connector. 它可提供更快的批量插入,让你能够使用 Azure Active Directory 标识进行连接。It can provide faster bulk inserts and lets you connect using your Azure Active Directory identity.

重要

本文中的示例不包括 JDBC URL 中的用户名和密码。The examples in this article do not include usernames and passwords in JDBC URLs. 而是预设你按照机密管理用户指南中的内容将自己的数据库凭据存储为机密,然后在笔记本中利用它们填充 java.util.Properties 对象中的凭据。Instead it expects that you follow the Secret management user guide to store your database credentials as secrets, and then leverage them in a notebook to populate your credentials in a java.util.Properties object. 例如: 。For example:

val jdbcUsername = dbutils.secrets.get(scope = "jdbc", key = "username")
val jdbcPassword = dbutils.secrets.get(scope = "jdbc", key = "password")

若要查看机密管理的完整示例,请参阅机密工作流示例For a full example of secret management, see Secret workflow example.

建立与 SQL Server 的连接Establish connectivity to SQL Server

此示例使用 SQL Server 的 JDBC 驱动程序对其进行查询。This example queries SQL Server using its JDBC driver.

步骤 1:检查 JDBC 驱动程序是否可用Step 1: Check that the JDBC driver is available

Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver")

步骤 2:创建 JDBC URLStep 2: Create the JDBC URL

val jdbcHostname = "<hostname>"
val jdbcPort = 1433
val jdbcDatabase = "<database>"

// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase}"

// Create a Properties() object to hold the parameters.
import java.util.Properties
val connectionProperties = new Properties()

connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")

步骤 3:检查与 SQLServer 数据库之间的连接Step 3: Check connectivity to the SQLServer database

val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

从 JDBC 读取数据Read data from JDBC

本部分从数据库表加载数据。This section loads data from a database table. 这会使用单个 JDBC 连接将表提取到 Spark 环境中。This uses a single JDBC connection to pull the table into the Spark environment. 若要详细了解并行读取,请参阅管理平行度For parallel reads, see Manage parallelism.

val employees_table = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)

Spark 会自动从数据库表中读取架构,并将其类型映射回 Spark SQL 类型。Spark automatically reads the schema from the database table and maps its types back to Spark SQL types.

employees_table.printSchema

可针对此 JDBC 表运行查询:You can run queries against this JDBC table:

display(employees_table.select("age", "salary").groupBy("age").avg("salary"))

将数据写入 JDBCWrite data to JDBC

本部分说明如何通过名为 diamonds 的现有 Spark SQL 表将数据写入数据库。This section shows how to write data to a database from an existing Spark SQL table named diamonds.

select * from diamonds limit 5

下面的代码将数据保存到名为 diamonds 的数据库表中。The following code saves the data into a database table named diamonds. 若将列命名为保留关键字,可能会触发异常。Using column names that are reserved keywords can trigger an exception. 示例表包含名为 table 的列,因此可在将其推送到 JDBC API 之前,先将其重命名为 withColumnRenamed()The example table has column named table, so you can rename it with withColumnRenamed() prior to pushing it to the JDBC API.

spark.table("diamonds").withColumnRenamed("table", "table_number")
     .write
     .jdbc(jdbcUrl, "diamonds", connectionProperties)

Spark 会自动创建一个数据库表,其中包含根据数据帧架构确定的相应架构。Spark automatically creates a database table with the appropriate schema determined from the DataFrame schema.

默认行为是创建一个新表,并在已存在同名的表时引发错误消息。The default behavior is to create a new table and to throw an error message if a table with the same name already exists. 可使用 Spark SQL SaveMode 功能更改此行为。You can use the Spark SQL SaveMode feature to change this behavior. 以下示例介绍了如何在表中追加更多行:For example, here’s how to append more rows to the table:

import org.apache.spark.sql.SaveMode

spark.sql("select * from diamonds limit 10").withColumnRenamed("table", "table_number")
  .write
  .mode(SaveMode.Append) // <--- Append to the existing table
  .jdbc(jdbcUrl, "diamonds", connectionProperties)

还可覆盖现有表:You can also overwrite an existing table:

spark.table("diamonds").withColumnRenamed("table", "table_number")
  .write
  .mode(SaveMode.Overwrite) // <--- Overwrite the existing table
  .jdbc(jdbcUrl, "diamonds", connectionProperties)

将查询向下推送到数据库引擎Push down a query to the database engine

可将整个查询向下推送到数据库,且只返回结果。You can push down an entire query to the database and return just the result. table 参数标识要读取的 JDBC 表。The table parameter identifies the JDBC table to read. 可使用 SQL 查询 FROM 子句中有效的任何内容。You can use anything that is valid in a SQL query FROM clause.

// Note: The parentheses are required.
val pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
val df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

向下推送优化Push down optimization

除了引入整个表之外,还可将查询向下推送到数据库,从而利用它进行处理且只返回结果。In addition to ingesting an entire table, you can push down a query to the database to leverage it for processing, and return only the results.

// Explain plan with no column selection returns all columns
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).explain(true)

可通过 DataFrame 方法删除列并将查询谓词下推到数据库。You can prune columns and pushdown query predicates to the database with DataFrame methods.

// Explain plan with column selection will prune columns and just return the ones specified
// Notice that only the 3 specified columns are in the explain plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").explain(true)

// You can push query predicates down too
// Notice the filter at the top of the physical plan
spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").where("cut = 'Good'").explain(true)

管理并行度 Manage parallelism

在 Spark UI 中,numPartitions 指示的是已启动的任务数。In the Spark UI, you can see that the numPartitions dictate the number of tasks that are launched. 每个任务分布在各执行器中,这可能会增加通过 JDBC 接口进行读取和写入的并行度。Each task is spread across the executors, which can increase the parallelism of the reads and writes through the JDBC interface. 若要了解其他有助于提高性能的参数(例如 fetchsize),请参阅 Spark SQL 编程指南See the Spark SQL programming guide for other parameters, such as fetchsize, that can help with performance.

JDBC 读取JDBC reads

可基于数据集的列值来提供拆分边界。You can provide split boundaries based on the dataset’s column values.

这些选项可指定读取的并行度。These options specify the parallelism on read. 如果指定了这些选项中的任何一个,则必须指定全部选项。These options must all be specified if any of them is specified. lowerBoundupperBound 决定分区步幅,不会筛选表中的行。lowerBound and upperBound decide the partition stride, but do not filter the rows in table. 因此,Spark 分区会返回表中的所有行。Therefore, Spark partitions and returns all rows in the table.

下面的示例使用 columnNamelowerBoundupperBoundnumPartitions 参数在 emp_no 列上的执行器之间拆分表读取。The following example splits the table read across executors on the emp_no column using the columnName, lowerBound, upperBound, and numPartitions parameters.

val df = (spark.read.jdbc(url=jdbcUrl,
  table="employees",
  columnName="emp_no",
  lowerBound=1L,
  upperBound=100000L,
  numPartitions=100,
  connectionProperties=connectionProperties))
display(df)

JDBC 写入JDBC writes

Spark 的分区指示用于通过 JDBC API 推送数据的连接数。Spark’s partitions dictate the number of connections used to push data through the JDBC API. 可通过调用 coalesce(<N>)repartition(<N>) 来控制并行度,具体取决于现有的分区数。You can control the parallelism by calling coalesce(<N>) or repartition(<N>) depending on the existing number of partitions. 减少分区数时调用 coalesce,增加分区数时调用 repartitionCall coalesce when reducing the number of partitions, and repartition when increasing the number of partitions.

import org.apache.spark.sql.SaveMode

val df = spark.table("diamonds")
println(df.rdd.partitions.length)

// Given the number of partitions above, you can reduce the partition value by calling coalesce() or increase it by calling repartition() to manage the number of connections.
df.repartition(10).write.mode(SaveMode.Append).jdbc(jdbcUrl, "diamonds", connectionProperties)

Python 示例Python example

下面的 Python 示例介绍了一些与 Scala 执行的相同的任务。The following Python examples cover some of the same tasks as those provided for Scala.

创建 JDBC URL Create the JDBC URL

jdbcHostname = "<hostname>"
jdbcDatabase = "employees"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)

与前面的 Scala 示例类似,可传入包含凭据和驱动程序类的字典。You can pass in a dictionary that contains the credentials and driver class similar to the preceding Scala example.

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

将查询向下推送到数据库引擎Push down a query to the database engine

pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

从 JDBC 连接跨多个工作器节点进行读取Read from JDBC connections across multiple workers

df = spark.read.jdbc(url=jdbcUrl, table="employees", column="emp_no", lowerBound=1, upperBound=100000, numPartitions=100)
display(df)

Spark SQL 示例Spark SQL example

可定义一个使用 JDBC 连接的 Spark SQL 表或视图。You can define a Spark SQL table or view that uses a JDBC connection. 有关详细信息,请参阅创建表创建视图For details, see Create Table and Create View.

CREATE TABLE <jdbcTable>
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:<databaseServerType>://<jdbcHostname>:<jdbcPort>",
  dbtable "<jdbcDatabase>.atable",
  user "<jdbcUsername>",
  password "<jdbcPassword>"
)

使用 Spark SQL 将数据追加到数据库表中:Append data into the database table using Spark SQL:

INSERT INTO diamonds
SELECT * FROM diamonds LIMIT 10 -- append 10 records to the table

SELECT count(*) record_count FROM diamonds --count increased by 10

使用 Spark SQL 覆盖数据库中的数据。Overwrite data in the database table using Spark SQL. 这会让数据库删除并创建 diamonds 表:This causes the database to drop and create the diamonds table:

INSERT OVERWRITE TABLE diamonds
SELECT carat, cut, color, clarity, depth, TABLE AS table_number, price, x, y, z FROM diamonds

SELECT count(*) record_count FROM diamonds --count returned to original value (10 less)

优化读取数据时的性能Optimize performance when reading data

如果尝试从外部 JDBC 数据库中读取数据,但速度较慢,可参考本部分中介绍的一些性能改进建议。If you’re attempting to read data from an external JDBC database and it’s slow, this section contains some suggestions to improve performance.

确定 JDBC 卸载是否并行进行Determine whether the JDBC unload is occurring in parallel

若要并行加载数据,Spark JDBC 数据源必须配置有适当的分区信息,以便能够向外部数据库发起多个并行查询。In order to load data in parallel, the Spark JDBC data source must be configured with appropriate partitioning information so that it can issue multiple concurrent queries to the external database. 如果没有配置分区,则将在驱动程序上使用单个 JDBC 查询提取所有数据,这会导致驱动程序引发 OOM 异常。If you neglect to configure partitioning, then all data will be fetched on the driver using a single JDBC query which runs the risk of causing the driver to throw an OOM exception.

下面是未配置分区的 JDBC 读取示例:Here’s an example of a JDBC read without partitioning configured:

无分区的 JDBC 读取JDBC read without partitioning

有两个 API 用于指定分区:高级别和低级别。There are two APIs for specifying partitioning, high level and low level.

高级别 API 使用数值列的名称 (columnName)、两个范围终结点(lowerBoundupperBound)以及目标 numPartitions,并通过将指定范围平均拆分给 numPartitions 个任务来生成 Spark 任务。The high level API takes the name of a numeric column (columnName), two range endpoints (lowerBound, upperBound) and a target numPartitions and generates Spark tasks by evenly splitting the specified range into numPartitions tasks. 如果数据库表具有已编制索引的数值列且值均匀分布,则非常使用此类 API;如果数值列的值极不均匀,则不建议使用它,因为这会导致拆分出的任务不均衡。This work well if your database table has an indexed numeric column with fairly evenly-distributed values, such as an auto-incrementing primary key; it works somewhat less well if the numeric column is extremely skewed, leading to imbalanced tasks.

低级别 API 可在 Scala 中访问,它接受可用于定义自定义分区的 WHERE 条件的数组:这适用于对非数值列进行分区或用于处理分布倾斜的情况。The low level API, accessible in Scala, accepts an array of WHERE conditions that can be used to define custom partitions: this is useful for partitioning on non-numeric columns or for dealing with skew. 定义自定义分区时,如果分区列可为空,请记得考虑 NULLWhen defining custom partitions, do not forget to consider NULL when the partition columns are Nullable. 建议不要使用超过两列来手动定义分区,因为编写边界谓词需要的逻辑要复杂得多。We do not suggest that you manually define partitions using more than two columns since writing the boundary predicates require much more complex logic.

下面是已配置分区的 JDBC 读取示例。Here’s an example of a JDBC read with partitioning configured. 请注意添加了一个数值列(partitionColumn:这是将 columnName 作为 JDBC 源选项进行传递的方式)、两个范围终结点(lowerBoundupperBound),以及用于指定最大分区数的 numPartitions 参数。Note the addition of a numeric column (partitionColumn – which is how columnName is passed as a JDBC source option), two range endpoints (lowerBound, upperBound) and the numPartitions parameter specifying the maximum number of partitions.

带有分区的 JDBC 读取JDBC read with partitioning

有关详细信息,请参阅管理并行度For more information, see Manage parallelism.

优化 JDBC fetchSize 参数Tune the JDBC fetchSize parameter

JDBC 驱动程序有一个 fetchSize 参数,它控制一次从远程 JDBC 数据库中提取的行数。JDBC drivers have a fetchSize parameter that controls the number of rows fetched at a time from the remote JDBC database. 如果此值设置得太低,那么为了获取完整的结果集,工作负载可能会因为 Spark 和外部数据库之间往返的请求数过多而出现延迟。If this value is set too low then your workload may become latency-bound due to a high number of roundtrip requests between Spark and the external database in order to fetch the full result set. 如果此值过高,则可能会引发 OOM。If this value is too high then you risk OOMs. 最佳值将依赖于工作负载(因为它依赖于结果架构、结果中的字符串大小等因素),但在默认值的基础上稍微提高一些就可能获得巨大的性能提升。The optimal value will be workload dependent (since it depends on the result schema, sizes of strings in results, and so on), but increasing it even slightly from the default can result in huge performance gains.

Oracle 的 fetchSize 默认为 10。Oracle’s default fetchSize is 10. 稍微增加一点,提高到 100,就能实现大幅性能提升,如果继续增加到更高的值(如 2000),还可以提高性能。Increasing it even slightly, to 100, gives massive performance gains, and going up to a higher value, like 2000, gives an additional improvement. 例如: 。For example:

PreparedStatement stmt = null;
ResultSet rs = null;

try {
  stmt = conn. prepareStatement("select a, b, c from table");
  stmt.setFetchSize(100);

  rs = stmt.executeQuery();
  while (rs.next()) {
    ...
  }
}

请参阅加快 Java 运行速度,更深入地了解 Oracle JDBC 驱动程序的这一优化参数。See Make your java run faster for a more general discussion of this tuning parameter for Oracle JDBC drivers.

考虑索引的影响Consider the impact of indexes

如果要进行并行读取(使用分区技术之一),Spark 会对 JDBC 数据库发出并发查询。If you are reading in parallel (using one of the partitioning techniques) Spark issues concurrent queries to the JDBC database. 如果这些查询最终需要进行全表扫描,则可能会在远程数据库中遭遇瓶颈,且会极大地减缓运行速度。If these queries end up requiring full table scans this could end up bottlenecking in the remote database and become extremely slow. 因此,在选择分区依据列时应考虑索引的影响,以便在选择后能合理有效地并行执行各个分区的查询。Thus you should consider the impact of indexes when choosing a partitioning column and pick a column such that the individual partitions’ queries can be executed reasonably efficiently in parallel.

重要

请确保数据库在分区依据列上有索引。Make sure that the database has an index on the partitioning column.

如果未在源表上定义单列索引,那么仍然可以选择组合索引中的前导(最左)列作为分区依据列。When a single-column index is not defined on the source table, you still can choose the leading(leftmost) column in a composite index as the partitioning column. 当只有组合索引可用时,大多数数据库在使用前导(最左)列进行搜索时可以使用串联索引。When only composite indexes are available, most databases can use a concatenated index when searching with the leading (leftmost) columns. 因此,多列索引中的前导列也可用作分区依据列。Thus, the leading column in a multi-column index can also be used as a partitioning column.

考虑分区数是否合适Consider whether the number of partitions is appropriate

在从外部数据库中读取数据时,如果分区数过多,可能会由于查询过多而导致数据库重载。Using too many partitions when reading from the external database risks overloading that database with too many queries. 大多数 DBMS 系统对并发连接数都是有限制的。Most DBMS systems have limits on the concurrent connections. 首先,将分区数设置为接近 Spark 集群中核心/任务槽数量的值,以便尽可能地提高并行度,同时将查询总数限制在合理的限度内。As a starting point, aim to have the number of partitions be close to the number of cores / task slots in your Spark cluster in order to maximize parallelism but keep the total number of queries capped at a reasonable limit. 如果在提取 JDBC 行后对并行度有极大的需求(因为在 Spark 中执行一些占用 CPU 的操作),但又不想向数据库发出过多的并发查询,那么请考虑使用更低的 numPartitions 来读取 JDBC,然后在 Spark 中执行显式 repartition()If you need lots of parallelism after fetching the JDBC rows (because you’re doing something CPU-bound in Spark) but don’t want to issue too many concurrent queries to your database then consider using a lower numPartitions for the JDBC read and then doing an explicit repartition() in Spark.

考虑特定于数据库的优化技术Consider database-specific tuning techniques

数据库供应商可能提供了关于 ETL 和批量访问工作负载的性能优化指南。The database vendor may have a guide on tuning performance for ETL and bulk access workloads.