通过使用 mssparkutils 的链接服务保护凭据
访问来自外部源的数据是一种常见的模式。 除非外部数据源允许匿名访问,否则很可能需要使用凭据、机密或连接字符串来保护连接。
默认情况下,Azure Synapse Analytics 使用 Microsoft Entra 传递进行资源之间的身份验证。 如果需要使用其他凭据连接到资源,请直接使用 mssparkutils。 mssparkutils 包简化了检索存储在链接服务或 Azure 密钥保管库中的 SAS 令牌、Microsoft Entra 令牌、连接字符串和机密的过程。
Microsoft Entra 传递使用你作为 Microsoft Entra ID 中的用户获得的权限,而不是使用分配给 Synapse 或单独服务主体的权限。 例如,若要使用 Microsoft Entra 传递来访问存储帐户中的 Blob,则应转到该存储帐户,将 Blob 参与者角色分配给自己。
从 Azure Key Vault 检索机密时,建议为 Azure Key Vault 创建链接服务。 确保 Synapse 工作区托管服务标识 (MSI) 对你的 Azure Key Vault 具有机密获取特权。 Synapse 将使用 Synapse 工作区托管服务标识向 Azure Key Vault 进行身份验证。 如果没有链接服务而直接连接到 Azure Key Vault,则使用用户 Microsoft Entra 凭据进行身份验证。
有关详细信息,请参阅链接服务。
使用情况
令牌和机密的 mssparkutils 帮助
此函数显示 Synapse 中机密和令牌管理的帮助文档。
mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());
获取结果:
getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
isValidToken(token: String): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
getPropertiesAll(linkedService: String): returns all the properties of the linked service
getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName
访问 Azure Data Lake Storage Gen2
ADLS Gen2 主存储
访问 Azure Data Lake Storage 中的文件时,默认使用 Microsoft Entra 传递进行身份验证,不需要显式使用 mssparkutils。 直通身份验证中使用的标识因几个因素而异。 默认情况下,将会使用用户的标识来执行交互式笔记本,但可以将其更改为工作区托管服务标识 (MSI)。 批处理作业和笔记本的非交互式执行使用工作区 MSI。
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))
ADLS Gen2 存储与链接服务
在连接到 Azure Data Lake Storage Gen2 时,Azure Synapse Analytics 可提供集成的链接服务体验。 可以将链接服务配置为使用帐户密钥、服务主体、托管标识或凭据进行身份验证。
当链接服务身份验证方法设置为“帐户密钥”时,链接服务将使用提供的存储帐户密钥进行身份验证,请求 SAS 密钥,并使用 LinkedServiceBasedSASProvider 将其自动应用于存储请求。
Synapse 支持用户为特定存储帐户设置链接服务。 这样,就可以从单个 Spark 应用程序/查询中的多个存储帐户读取/写入数据。 为要使用的每个存储帐户设置 spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName
后,Synapse 会确定要用于特定读/写操作的链接服务。 但是,如果 Spark 作业仅处理存储帐户,则只需省略存储帐户名称,并使用 spark.storage.synapse.linkedServiceName
。
注意
无法更改默认 ABFS 存储容器的身份验证方法。
val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<DIRECTORY PATH>')
df.show()
如果将链接服务身份验证方法设置为“托管标识”或“服务主体”,则链接服务会将托管标识或服务主体令牌用于 LinkedServiceBasedTokenProvider 提供程序。
val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<DIRECTORY PATH>')
df.show()
通过 Spark 配置设置身份验证设置
还可以通过 Spark 配置而不是运行 spark 语句来指定身份验证设置。 所有 spark 配置都应以 spark.
为前缀,而所有 hadoop 配置应以 spark.hadoop.
为前缀。
Spark 配置名称 | 配置值 |
---|---|
spark.storage.synapse.teststorage.dfs.core.chinacloudapi.cn.linkedServiceName |
链接服务名称 |
spark.hadoop.fs.azure.account.oauth.provider.type.teststorage.dfs.core.chinacloudapi.cn |
microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider |
ADLS Gen2 存储(无链接服务)
使用 SAS 密钥直接连接到 ADLS Gen2 存储。 使用 ConfBasedSASProvider
并为 spark.storage.synapse.sas
配置设置提供 SAS 密钥。 可以在容器级别、帐户级别或全局级别设置 SAS 令牌。 不建议在全局级别设置 SAS 密钥,因为作业将无法从多个存储帐户读取/写入。
每个存储容器的 SAS 配置
%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))
每个存储帐户的 SAS 配置
%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))
所有存储帐户的 SAS 配置
%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))
使用 MSAL 获取令牌(使用自定义应用凭据)
配置 ABFS 存储驱动程序以将 MSAL 直接用于身份验证时,提供程序不会缓存令牌。 这可能会导致可靠性问题。 建议使用 ClientCredsTokenProvider
是 Synapse Spark 的一部分。
%%spark
val source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.$source_full_storage_account_name", "<Entra AppId>")
spark.conf.set("fs.azure.account.oauth2.client.secret.$source_full_storage_account_name", "<Entra app secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.$source_full_storage_account_name", "https://login.partner.microsoftonline.cn/<tenantid>")
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
%%pyspark
source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{source_full_storage_account_name}.linkedServiceName", "<Entra AppId>")
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{source_full_storage_account_name}.linkedServiceName", "<Entra app secret>")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{source_full_storage_account_name}.linkedServiceName", "https://login.partner.microsoftonline.cn/<tenantid>")
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))
使用 SAS 令牌的 ADLS Gen2 存储(从 Azure Key Vault)
使用存储在 Azure Key Vault 机密中的 SAS 令牌连接到 ADLS Gen2 存储。
%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))
其他链接服务的 TokenLibrary
若要连接到其他链接服务,可以直接调用 TokenLibrary。
getConnectionString()
若要检索连接字符串,请使用 getConnectionString
函数并传入链接服务名称。
%%spark
// retrieve connectionstring from mssparkutils
mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils
mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;
string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);
getPropertiesAll()
getPropertiesAll 是 Scala 和 Python 中提供的帮助程序函数,用于获取链接服务的所有属性
%%pyspark
import json
# retrieve connectionstring from mssparkutils
json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))
输出类似于
{
'AuthType': 'Key',
'AuthKey': '[REDACTED]',
'Id': None,
'Type': 'AzureBlobStorage',
'Endpoint': 'https://storageaccount.blob.core.chinacloudapi.cn/',
'Database': None
}
GetSecret()
若要检索存储在 Azure Key Vault 中的机密,建议在 Synapse 工作区内创建到 Azure Key Vault 的链接服务。 需要向 Synapse 工作区托管服务标识授予对 Azure Key Vault 的 GET Secrets 权限。 链接服务将使用托管服务标识来连接到 Azure Key Vault 服务,以检索机密。 否则,直接连接到 Azure Key Vault 将使用用户的 Microsoft Entra 凭据。 在这种情况下,需要向用户授予 Azure Key Vault 中的“获取机密”权限。
请提供 keyvault 的完全限定域名。
mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])
若要从 Azure Key Vault 检索机密,请使用 mssparkutils.credentials.getSecret()
函数。
mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;
string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);
Spark 运行时支持的链接服务连接
虽然 Azure Synapse Analytics 支持来自管道和其他 Azure 产品的各种链接服务连接,但 Spark 运行时并不支持所有这些服务连接。 下面是支持的链接服务列表:
- Azure Blob 存储
- Azure AI 服务
- Azure Cosmos DB
- Azure Database for MySQL
- Azure Database for PostgreSQL
- Azure Key Vault
- Azure 机器学习
- Azure SQL 数据库
- Azure SQL Data Warehouse(专用和无服务器)
- Azure 存储
mssparkutils.credentials.getToken()
如果需要使用 OAuth 持有者令牌直接访问服务,则可以使用 getToken
方法。 支持以下资源:
服务名称 | 要在 API 调用中使用的字符串文本 |
---|---|
Azure Storage |
Storage |
Azure Key Vault |
Vault |
Azure Management |
AzureManagement |
Azure SQL Data Warehouse (Dedicated and Serverless) |
DW |
Azure Synapse |
Synapse |
Azure Data Factory |
ADF |
Azure Database for MySQL |
AzureOSSDB |
Azure Database for MariaDB |
AzureOSSDB |
Azure Database for PostgreSQL |
AzureOSSDB |
不支持从 Spark 运行时进行链接服务访问
Spark 运行时不支持以下访问链接服务的方法:
- 将参数传递给参数化链接服务
- 具有用户分配的托管标识 (UAMI) 的连接
- 当 Notebook/SparkJobDefinition 作为托管标识运行时,将持有者令牌获取到 Keyvault 资源
- 作为替代方法,可以创建指向 Keyvault 的链接服务并从 Notebook/批处理作业获取机密,而不是获取访问令牌
- 对于 Azure Cosmos DB 连接,仅支持基于密钥的访问。 不支持基于令牌的访问。
运行笔记本或 Spark 作业时,使用链接服务获取令牌/机密的请求可能会失败,并显示指示“BadRequest”的错误消息。 这通常是由链接服务的配置问题引起的。 如果看到此错误消息,请检查链接服务的配置。 如有任何问题,请通过 Azure 门户联系 Azure 支持部门。