Compartir a través de

适用于 SQL 数据库的 Spark 连接器(预览版)

重要

此功能处于预览状态。

适用于 SQL 数据库的 Spark 连接器是一个高性能库,可用于读取和写入SQL Server、Azure SQL数据库和 Fabric SQL 数据库。 该连接器提供以下功能:

  • 使用 Spark 在 Azure SQL Database、Azure SQL Managed Instance 和 SQL Server Azure VM 上运行大型写入和读取作。
  • 使用表或视图时,连接器支持在 SQL 引擎级别设置的安全模型。 这些模型包括对象级安全性 (OLS)、行级别安全性 (RLS) 和列级别安全性 (CLS)。

连接器预安装在 Synapse Spark 3.5 运行时中,因此无需单独安装。

Authentication

Microsoft Entra 身份验证与Azure Synapse集成。

  • 登录到 Synapse 工作区并在笔记本中使用它时,凭据会自动传递到 SQL 引擎进行身份验证和授权。
  • 需要在 SQL 数据库引擎上启用和配置 Microsoft Entra ID。
  • 如果设置了 Microsoft Entra ID,Spark 代码中不需要额外的配置。 凭据会自动映射。

还可以使用 SQL 身份验证方法(通过指定 SQL 用户名和密码)或服务主体(通过为基于应用的身份验证提供 Azure 访问令牌)。

Permissions

若要使用 Spark 连接器,身份(无论是用户还是应用)必须具有目标 SQL 引擎所需的数据库权限。 读取或写入表和视图需要这些权限。

对于Azure VM 上的Azure SQL Database、Azure SQL Managed Instance和SQL Server:

  • 运行操作的标识通常需要 db_datawriterdb_datareader 权限,可选 db_owner 用于完全控制。

注释

如果使用服务主体,则可以以应用(无用户上下文)或用户身份运行(如果用户模拟已启用)。 服务主体必须具有要执行的作所需的数据库权限。

用法和代码示例

在本部分中,我们提供了代码示例,演示如何有效地使用适用于 SQL 数据库的 Spark 连接器。 这些示例涵盖了各种方案,包括从 SQL 表读取和写入,以及配置连接器选项。

支持的选项

所需的最小选项是url作为"jdbc:sqlserver://<server>:<port>;database=<database>;"或设置为spark.mssql.connector.default.url

  • url被提供时:

    • url始终用作第一个首选项。
    • 如果 spark.mssql.connector.default.url 未被设置,连接器会进行设置并在将来重复使用。
  • 当未提供 url 时:

    • 如果 spark.mssql.connector.default.url 已设置,连接器将使用 spark 配置中的值。
    • 如果 spark.mssql.connector.default.url 未设置,则会引发错误,因为所需的详细信息不可用。

此连接器支持此处定义的选项: SQL DataSource JDBC 选项

连接器还支持以下选项:

选项 默认值 Description
reliabilityLevel “最佳努力 (BEST_EFFORT)” 控制插入操作的可靠性。 可能的值: BEST_EFFORT (默认值,最快,如果执行程序重启,可能会导致重复行), NO_DUPLICATES (较慢,确保即使执行程序重新启动,也不会插入重复行)。 根据对重复和性能需求的容忍度进行选择。
isolationLevel “READ_COMMITTED” 设置 SQL 操作的事务隔离级别。 可能的值: READ_COMMITTED (默认值,阻止读取未提交的数据)、READ_UNCOMMITTEDREPEATABLE_READSNAPSHOT、 。 SERIALIZABLE 更高的隔离级别可以减少并发性,但可以提高数据一致性。
tableLock "false" 控制在插入作业期间是否使用 SQL Server TABLOCK 表级锁提示。 可能的值: true (启用 TABLOCK,这可以提高大容量写入性能), false (默认值,不使用 TABLOCK)。 将true设置为某个值可能会增加大型插入的吞吐量,但可能会减少表上其他操作的并发性。
schemaCheckEnabled “true” 控制是否在 Spark DataFrame 和 SQL 表之间强制实施严格的架构验证。 可能的值: true (默认值,强制实施严格的架构匹配), false (允许更大的灵活性,并可能跳过一些架构检查)。 将false设置可以帮助解决模式不匹配问题,但如果结构差异很大,可能会导致意外结果。

其他 批量 API 选项 可以设置为 DataFrame 上的选项,并在写入时传递给批量复制 API。

写入和读取示例

以下代码演示如何使用具有自动Microsoft Entra ID身份验证的 mssql("<schema>.<table>") 方法编写和读取数据。

小窍门

数据是内联创建的,用于演示目的。 在生产方案中,通常从现有源读取数据或创建更复杂的 DataFrame

url = "jdbc:sqlserver://<server>:<port>;database=<database>;"
row_data = [("Alice", 1),("Bob", 2),("Charlie", 3)]
column_header = ["Name", "Age"]
df = spark.createDataFrame(row_data, column_header)
df.write.mode("overwrite").option("url", url).mssql("dbo.publicExample")
spark.read.option("url", url).mssql("dbo.publicExample").show()

url = "jdbc:sqlserver://<server>:<port>;database=<database2>;" # different database
df.write.mode("overwrite").option("url", url).mssql("dbo.tableInDatabase2") # default url is updated
spark.read.mssql("dbo.tableInDatabase2").show() # no url option specified and will use database2

还可以从SQL数据库引擎读取数据时选择列、应用筛选器,并使用其他选项。

身份验证示例

以下示例演示如何使用除Microsoft Entra ID以外的身份验证方法,例如服务主体(access令牌)和 SQL 身份验证。

注释

如前所述,登录到 Synapse 工作区时会自动处理Microsoft Entra ID身份验证,因此,仅当方案需要这些方法时,才需要使用这些方法。

url = "jdbc:sqlserver://<server>:<port>;database=<database>;"
row_data = [("Alice", 1),("Bob", 2),("Charlie", 3)]
column_header = ["Name", "Age"]
df = spark.createDataFrame(row_data, column_header)

from azure.identity import ClientSecretCredential
credential = ClientSecretCredential(tenant_id="", client_id="", client_secret="") # service principal app
scope = "https://database.chinacloudapi.cn/.default"
token = credential.get_token(scope).token

df.write.mode("overwrite").option("url", url).option("accessToken", token).mssql("dbo.publicExample")
spark.read.option("accessToken", token).mssql("dbo.publicExample").show()

支持的数据帧保存模式

将数据从 Spark 写入 SQL 数据库时,可以从多个保存模式中进行选择。 保存模式控制在目标表已存在时写入数据的方式,并可能影响架构、数据和索引编制。 了解这些模式有助于避免意外数据丢失或更改。

此连接器支持此处定义的选项: Spark 保存函数

  • ErrorIfExists (默认保存模式):如果目标表存在,则会中止写入并返回异常。 否则,会使用数据创建新表。

  • 忽略:如果目标表存在,写入将忽略请求,并且不返回错误。 否则,会使用数据创建新表。

  • 覆盖:如果目标表存在,则会删除该表、重新创建并追加新数据。

    注释

    使用 overwrite时,原始表架构(尤其是 MSSQL 独占数据类型)和表索引会丢失并替换为从 Spark 数据帧推断的架构。 若要避免丢失架构和索引,请添加 .option("truncate", true).

  • 追加:如果目标表存在,则会向其追加新数据。 否则,会使用数据创建新表。

故障排除

该过程完成后,Spark 读取操作的输出将显示在单元格的输出区域中。 来自 com.microsoft.sqlserver.jdbc.SQLServerException 的错误直接来自SQL Server。 可以在 Spark 应用程序日志中找到详细的错误信息。