Compartilhar via

通过使用 mssparkutils 的链接服务保护凭据

访问来自外部源的数据是一种常见的模式。 除非外部数据源允许匿名访问,否则您需要使用凭证、密钥或连接字符串来保护连接。

默认情况下,Azure Synapse Analytics 使用 Microsoft Entra 直通认证在资源之间进行身份验证。 如果需要使用其他凭据连接到资源,请直接使用 mssparkutils。 mssparkutils 包简化了检索 SAS 令牌、Microsoft Entra 令牌、连接字符串和存储在链接服务或从Azure Key Vault中的机密的过程。

Microsoft Entra 直通使用分配给 Microsoft Entra ID 用户的权限,而不是分配给 Synapse 或单独服务主体的权限。 例如,如果要使用 Microsoft Entra 传递来访问存储帐户中的 Blob,则应转到该存储帐户,并将 Blob 数据贡献者角色分配给自己。

在从Azure Key Vault 检索机密时,我们建议创建一个链接至 Azure Key Vault 的链接服务。 确保 Synapse 工作区托管服务标识(MSI)对Azure Key Vault拥有机密获取权限。 Synapse 将使用 Synapse 工作区托管服务标识向Azure Key Vault进行身份验证。 如果直接连接到没有链接服务的Azure Key Vault,请使用用户Microsoft Entra 凭据进行身份验证。

有关详细信息,请参阅链接服务

使用情况

mssparkutils 令牌和机密帮助

此函数显示 Synapse 中机密和令牌管理的帮助文档。

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());

获取结果:

 getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
 isValidToken(token: String): returns true if token hasn't expired
 getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
 getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
 getPropertiesAll(linkedService: String): returns all the properties of the linked service
 getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
 getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
 getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
 putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
 putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
 putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName

Access Azure Data Lake Storage Gen2

ADLS Gen2 主存储

默认情况下,从主 Azure Data Lake Storage 访问文件会使用 Microsoft Entra 直通进行身份验证,并且不需要显式使用 mssparkutils。 直通式身份验证中使用的标识因几个因素而异。 默认情况下,将会使用用户的标识来执行交互式笔记本,但可以将其更改为工作区托管服务标识 (MSI)。 批处理作业和笔记本的非交互式执行使用 Workspace MSI。

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))

包含链接服务的 ADLS Gen2 存储解决方案

Azure Synapse Analytics 在连接到Azure Data Lake Storage Gen2时提供集成的链接服务体验。 可以将链接服务配置为使用帐户密钥服务主体托管标识凭据进行身份验证。

当链接服务身份验证方法设置为 Account Key 时,链接服务将使用提供的storage帐户密钥进行身份验证,请求 SAS 密钥,并使用 LinkedServiceBasedSASProvider 自动将其应用到storage请求。

Synapse 允许用户为特定存储帐户设置链接的服务。 这样就可以在单个 spark 应用程序/查询中从 multiple storage 帐户读取/写入数据。 一旦为要使用的每个存储帐户设置spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName后,Synapse 会确定用于特定读/写操作的链接服务。 但是,如果 Spark 作业仅处理单个存储帐户,则可以省略存储帐户名并使用 spark.storage.synapse.linkedServiceName

注意事项

无法更改默认 ABFS storage 容器的身份验证方法。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<DIRECTORY PATH>')

df.show()

如果将链接服务身份验证方法设置为“托管标识”或“服务主体”,则链接服务会将托管标识或服务主体令牌用于 LinkedServiceBasedTokenProvider 提供程序。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") 
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<DIRECTORY PATH>')

df.show()

通过 Spark 配置设置身份验证设置

还可以通过 Spark 配置而不是运行 spark 语句来指定身份验证设置。 所有 spark 配置都应以 spark. 为前缀,而所有 hadoop 配置应以 spark.hadoop. 为前缀。

Spark 配置名称 配置值
spark.storage.synapse.teststorage.dfs.core.chinacloudapi.cn.linkedServiceName 链接服务名称
spark.hadoop.fs.azure.account.oauth.provider.type.teststorage.dfs.core.chinacloudapi.cn microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider

没有关联服务的 ADLS Gen2 存储

使用 SAS 密钥直接连接到 ADLS Gen2 storage。 使用 ConfBasedSASProvider,并为 spark.storage.synapse.sas 配置设置提供 SAS 密钥。 可以在容器级别、帐户级别或全局级别设置 SAS 令牌。 我们不建议在全局级别设置 SAS 密钥,因为作业将无法从多个存储帐户读取或写入。

每个storage容器的 SAS 配置

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')

display(df.limit(10))

每个存储帐户的SAS配置

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.chinacloudapi.cn", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.chinacloudapi.cn.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')

display(df.limit(10))

所有存储帐户的 SAS 配置

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')

display(df.limit(10))

使用 MSAL 获取令牌(使用自定义应用凭据)

当 ABFS storage 驱动程序配置直接使用 MSAL 进行身份验证时,提供程序不会缓存令牌。 这可能会导致可靠性问题。 建议使用的 ClientCredsTokenProvider 是 Synapse Spark 的一部分。

%%spark
val source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.$source_full_storage_account_name", "<Entra AppId>")
spark.conf.set("fs.azure.account.oauth2.client.secret.$source_full_storage_account_name", "<Entra app secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.$source_full_storage_account_name", "https://login.partner.microsoftonline.cn/<tenantid>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")

display(df.limit(10))
%%pyspark
source_full_storage_account_name = "teststorage.dfs.core.chinacloudapi.cn"
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{source_full_storage_account_name}.linkedServiceName", "<Entra AppId>")
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{source_full_storage_account_name}.linkedServiceName", "<Entra app secret>")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{source_full_storage_account_name}.linkedServiceName", "https://login.partner.microsoftonline.cn/<tenantid>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')
display(df.limit(10))

使用 SAS 令牌的 ADLS Gen2 存储(来自 Azure Key Vault)

使用存储在 Azure Key Vault 密钥中的 SAS 令牌连接到 ADLS Gen2 存储。

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>")

display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.chinacloudapi.cn/<FILE PATH>')

display(df.limit(10))

其他链接服务的TokenLibrary

若要连接到其他链接服务,可以直接调用 TokenLibrary。

getConnectionString()

若要检索connection string,请使用 getConnectionString 函数并传入 链接的服务名称

%%spark
// retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary

using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);

getPropertiesAll()

getPropertiesAll 是 Scala 和 Python 中提供的帮助程序函数,用于获取链接服务的所有属性

%%pyspark
import json
# retrieve connectionstring from mssparkutils

json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))

输出将类似于

{
    'AuthType': 'Key',
    'AuthKey': '[REDACTED]',
    'Id': None,
    'Type': 'AzureBlobStorage',
    'Endpoint': 'https://storageaccount.blob.core.chinacloudapi.cn/',
    'Database': None
}

GetSecret()

若要检索存储在 Azure Key Vault 中的机密,我们建议在 Synapse 工作区创建到 Azure Key Vault 的链接服务。 需要向 Synapse 工作区托管服务标识授予 GET 查看机密的权限,以访问 Azure Key Vault。 链接服务将使用托管服务标识连接到Azure Key Vault服务以检索机密。 否则,连接到 Azure Key Vault 时会直接使用用户的 Microsoft Entra 凭据。 在这种情况下,需要在Azure Key Vault中向用户授予“获取机密”权限。

请提供密钥保管库的完全限定域名。

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])

若要从Azure Key Vault检索机密,请使用 mssparkutils.credentials.getSecret() 函数。


mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);

Spark 运行时支持的链接服务连接

虽然 Azure Synapse Analytics 支持各种链服务连接(从 pipelines 以及其他 Azure 产品),但并非所有链服务连接都受 Spark 运行时支持。 下面是支持的链接服务列表:

  • Azure Blob Storage (Azure对象存储)
  • Foundry 工具
  • Azure Cosmos DB
  • 适用于 MySQL 的 Azure 数据库
  • Azure Database for PostgreSQL
  • Azure Key Vault
  • Azure Machine Learning
  • Azure SQL Database
  • Azure SQL Data Warehouse(专用和无服务器)
  • Azure Storage

mssparkutils.credentials.getToken()

当需要 OAuth Bearer 令牌来直接访问服务时,可以使用 getToken 方法。 支持以下资源:

服务名称 要在 API 调用中使用的字符串文本
Azure Storage Storage
Azure Key Vault Vault
Azure Management AzureManagement
Azure SQL Data Warehouse (Dedicated and Serverless) DW
Azure Synapse Synapse
Azure Data Factory ADF

Spark 运行时不支持访问的链接服务

Spark 运行时不支持以下访问链接服务的方法:

  • 将参数传递给参数化链接服务
  • 具有用户分配的托管标识 (UAMI) 的连接
  • 当 Notebook/SparkJobDefinition 作为托管标识运行时,获取 Bearer 令牌以访问 Keyvault 资源
    • 或者,可以创建指向 Keyvault 的链接服务,从 Notebook 或批处理作业获取密钥,而不是获取访问令牌。
  • 对于 Azure Cosmos DB 连接,仅支持基于密钥的访问。 不支持基于令牌的访问。

运行笔记本或 Spark 作业时,使用链接服务获取令牌/机密的请求可能会失败,并显示指示“BadRequest”的错误消息。 这通常是由链接服务的配置问题引起的。 如果看到此错误消息,请检查链接服务的配置。 如有任何问题,请通过 Azure portal 联系 Azure 支持。