适用于 Apache Spark 的 Azure SQL 数据库和 SQL Server 连接器
借助适用于 Azure SQL 数据库和 SQL Server 的 Apache Spark 连接器,这些数据库可以充当 Apache Spark 作业的输入数据源和输出数据接收器。 由此,可在大数据分析中使用实时事务数据,并保留临时查询或报告的结果。
与内置 JDBC 连接器相比,此连接器能够将数据批量插入 SQL 数据库。 它的性能可以比逐行插入快 10 倍到 20 倍。 适用于 SQL Server 和 Azure SQL 数据库的 Spark 连接器还支持 Microsoft Entra 身份验证,让你可以从 Azure Synapse Analytics 安全地连接到 Azure SQL 数据库。
本文介绍如何使用 DataFrame API 连接到使用 MS SQL 连接器的 SQL 数据库。 本文提供了使用 PySpark API 的详细示例。 有关使用 MS SQL 连接器连接到 SQL 数据库的所有受支持的参数和示例,请参阅 Azure 数据 SQL 示例。
连接详细信息
本示例将使用 Microsoft Spark 实用工具来帮助从预配置的 Key Vault 获取机密。 若要了解有关 Microsoft Spark 实用工具的详细信息,请访问 Microsoft Spark 实用工具简介。
# The servername is in the format "jdbc:sqlserver://<AzureSQLServerName>.database.chinacloudapi.cn:1433"
servername = "<< server name >>"
dbname = "<< database name >>"
url = servername + ";" + "databaseName=" + dbname + ";"
dbtable = "<< table name >> "
user = "<< username >>"
principal_client_id = "<< service principal client id >>"
principal_secret = "<< service principal secret ho>>"
password = mssparkutils.credentials.getSecret('azure key vault name','secret name')
注意
目前,Azure SQL 连接器没有链接服务或 Microsoft Entra 直通支持。
使用 Azure SQL 和 SQL Server 连接器
读取数据
#Read from SQL table using MS SQL Connector
print("read data from SQL server table ")
jdbcDF = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", dbtable) \
.option("user", user) \
.option("password", password).load()
jdbcDF.show(5)
写入数据
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("overwrite") \
.option("url", url) \
.option("dbtable", dbtable) \
.option("user", user) \
.option("password", password) \
.save()
except ValueError as error :
print("MSSQL Connector write failed", error)
print("MSSQL Connector write(overwrite) succeeded ")
追加数据
try:
df.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
except ValueError as error :
print("Connector write failed", error)
Microsoft Entra 身份验证
使用服务主体的 Python 示例
import msal
# Located in App Registrations from Azure Portal
tenant_id = "<< tenant id >> "
# Located in App Registrations from Azure Portal
resource_app_id_url = "https://database.chinacloudapi.cn/"
# Define scope of the Service for the app registration before requesting from AAD
scope ="https://database.chinacloudapi.cn/.default"
# Authority
authority = "https://login.chinacloudapi.cn/" + tenant_id
# Get service principal
service_principal_id = mssparkutils.credentials.getSecret('azure key vault name','principal_client_id')
service_principal_secret = mssparkutils.credentials.getSecret('azure key vault name','principal_secret')
context = msal.ConfidentialClientApplication(
service_principal_id, service_principal_secret, authority
)
token = app.acquire_token_silent([scope])
access_token = token["access_token"]
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", dbtable) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.chinacloudapi.cn") \
.load()
使用 Active Directory 密码的 Python 示例
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.chinacloudapi.cn") \
.load()
重要
- 必须安装必需的依赖项,才能使用 Active Directory 进行身份验证。
- 使用 ActiveDirectoryPassword 时,
user
的格式应为 UPN 格式,例如username@domainname.com
。- 对于 Scala,将需要安装
com.microsoft.aad.adal4j
项目。 - 对于 Python,将需要安装
adal
库。 这可通过 pip 获得。
- 对于 Scala,将需要安装
- 查看示例笔记本了解示例,如需最新的驱动程序和版本,请访问 Apache Spark 连接器:SQL Server 和 Azure SQL。
支持
用于 Azure SQL 和 SQL Server 的 Apache Spark 连接器是一个开源项目。 此连接器不提供任何 Azure 支持。 有关连接器的疑问或问题,请在此项目存储库中创建问题。 连接器社区处于活动状态,并对提交情况进行监视。