在 Azure Synapse 分析中查询数据

可以使用 Azure Synapse 连接器从 Azure Databricks 访问 Azure Synapse,该连接器使用 Azure Synapse 中的 COPY 语句通过一个用于暂存数据的 Azure Data Lake Storage Gen2 存储帐户在 Azure Databricks 群集和 Azure Synapse 实例之间有效传输大量数据。

重要

本文所述的配置为试验性配置。 试验性功能按原样提供,Databricks 不会通过客户技术支持为它提供支持。 为了获得完整的查询联合支持,应改为使用 Lakehouse 联合身份验证,这使 Azure Databricks 用户能够利用 Unity Catalog 语法和数据治理工具。

Azure Synapse Analytics 是基于云的企业数据仓库,可利用大规模并行处理 (MPP) 对数 PB 的数据快速运行复杂的查询。

重要

此连接器仅可用于 Synapse 专用池实例,与其他 Synapse 组件不兼容。

注意

COPY 仅适用于 Azure Data Lake Storage Gen2 实例。 如果要查找有关使用 Polybase 的详细信息,请参阅将 Azure Databricks 和 Azure Synapse 与 PolyBase 连接(旧版)

Synapse 的示例语法

可以在 Scala、Python、SQL 和 R 中查询 Synapse。以下代码示例使用存储帐户密钥并将存储凭据从 Azure Databricks 转发到 Synapse。

注意

使用 Azure 门户提供的连接字符串,该字符串使得通过 JDBC 连接在 Spark 驱动程序和 Azure Synapse 实例之间发送的所有数据都可以进行安全套接字层 (SSL) 加密。 若要验证是否已启用 SSL 加密,可以在连接字符串中搜索 encrypt=true

重要

Unity Catalog 中定义的外部位置不支持 tempDir 位置。

Scala


// Set up the storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
  "<your-storage-account-access-key>")

// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
val df: DataFrame = spark.read
  .format("sqldw")
  .option("host", "hostname")
  .option("port", "port") /* Optional - will use default port 1433 if not specified. */
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") /* If schemaName not provided, default to "dbo". */
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .load()

// Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .load()

// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select x, count(*) as cnt from table group by x")
  .load()

// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.

df.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("dbTable", "<your-table-name>")
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
  .save()

Python


# Set up the storage account access key in the notebook session conf.
spark.conf.set(
  "fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
  "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 11.3 LTS and above.
df = spark.read
  .format("sqldw")
  .option("host", "hostname")
  .option("port", "port") # Optional - will use default port 1433 if not specified.
  .option("user", "username")
  .option("password", "password")
  .option("database", "database-name")
  .option("dbtable", "schema-name.table-name") # If schemaName not provided, default to "dbo".
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
  .option("forwardSparkAzureStorageCredentials", "true")
  .load()

# Get some data from an Azure Synapse table. The following example applies to Databricks Runtime 10.4 LTS and below.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .load()

# Load data from an Azure Synapse query.
df = spark.read \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("query", "select x, count(*) as cnt from table group by x") \
  .load()

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.

df.write \
  .format("com.databricks.spark.sqldw") \
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
  .option("forwardSparkAzureStorageCredentials", "true") \
  .option("dbTable", "<your-table-name>") \
  .option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
  .save()

SQL


-- Set up the storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn=<your-storage-account-access-key>;

-- Read data using SQL. The following example applies to Databricks Runtime 11.3 LTS and above.
CREATE TABLE example_table_in_spark_read
USING sqldw
OPTIONS (
  host '<hostname>',
  port '<port>' /* Optional - will use default port 1433 if not specified. */
  user '<username>',
  password '<password>',
  database '<database-name>'
  dbtable '<schema-name>.<table-name>', /* If schemaName not provided, default to "dbo". */
  forwardSparkAzureStorageCredentials 'true',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>'
);

-- Read data using SQL. The following example applies to Databricks Runtime 10.4 LTS and below.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbtable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>'
);

-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:

CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
  url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
  forwardSparkAzureStorageCredentials 'true',
  dbTable '<your-table-name>',
  tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;

R

# Load SparkR
library(SparkR)

# Set up the storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn", "<your-storage-account-access-key>")

# Get some data from an Azure Synapse table.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   dbTable = "<your-table-name>",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")

# Load data from an Azure Synapse query.
df <- read.df(
   source = "com.databricks.spark.sqldw",
   url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
   forward_spark_azure_storage_credentials = "true",
   query = "select x, count(*) as cnt from table group by x",
   tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")

# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.

write.df(
  df,
  source = "com.databricks.spark.sqldw",
  url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
  forward_spark_azure_storage_credentials = "true",
  dbTable = "<your-table-name>",
  tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")

Azure Databricks 和 Synapse 之间如何进行身份验证?

Azure Synapse 连接器使用三种类型的网络连接:

  • Spark 驱动程序到 Azure Synapse
  • 从 Spark 群集连接到 Azure 存储帐户
  • Azure Synapse 到 Azure 存储帐户

配置对 Azure 存储的访问

Azure Databricks 和 Synapse 都需要对一个用作临时数据存储的 Azure 存储帐户拥有访问特权。

Azure Synapse 不支持使用 SAS 来访问存储帐户。 可以通过以下操作之一配置这两项服务的访问权限:

所需的 Azure Synapse 权限

由于 Azure Synapse 连接器在后台使用 COPY,因此它要求 JDBC 连接用户有权在连接的 Azure Synapse 实例中运行以下命令:

如果目标表不存在于 Azure Synapse 中,则需要运行以下命令以及上述命令的权限:

下表汇总了使用 COPY 进行写入所需的权限:

权限(插入到现有表中) 权限(插入到新表中)
ADMINISTER DATABASE BULK OPERATIONS

INSERT
ADMINISTER DATABASE BULK OPERATIONS

INSERT

CREATE TABLE

ALTER ON SCHEMA :: dbo

网络配置

如果在 Azure Synapse 上配置防火墙,必须将网络设置配置为允许 Azure Databricks 访问 Azure Synapse。 首先,确保按照在 Azure 虚拟网络中部署 Azure Databricks(VNet 注入),将 Azure Databricks 工作区部署在你自己的虚拟网络中。 然后,可以在 Azure Synpase 上配置 IP 防火墙规则,以允许从子网连接到 Synapse 帐户。 请参阅 Azure Synapse Analytics IP 防火墙规则

配置使用服务主体通过 OAuth 2.0 从 Azure Databricks 连接到 Synapse

可以使用有权访问基础存储帐户的服务主体对 Azure Synapse Analytics 进行身份验证。 有关使用服务主体凭据访问 Azure 存储帐户的详细信息,请参阅连接到 Azure Data Lake Storage Gen2 和 Blob 存储。 必须在连接配置中根据 Azure Databricks Synapse 连接器选项参考enableServicePrincipalAuth 选项设置为 true,才能让连接器能够使用服务主体进行身份验证。

可以选择对 Azure Synapse Analytics 连接使用不同的服务主体。 以下示例为存储帐户配置服务主体凭据,并为 Synapse 配置可选的服务主体凭据:

ini

; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.chinacloudapi.cn/<directory-id>/oauth2/token

; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>

Scala

// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.chinacloudapi.cn/<directory-id>/oauth2/token")

// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

Python

# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.chinacloudapi.cn/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

R

# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")

# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.chinacloudapi.cn/<directory-id>/oauth2/token")

# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")

批量写入支持的保存模式

Azure Synapse 连接器支持 ErrorIfExistsIgnoreAppendOverwrite 保存模式,默认模式为 ErrorIfExists。 有关 Apache Spark 中支持的保存模式的详细信息,请参阅有关保存模式的 Spark SQL 文档

Azure Databricks Synapse 连接器选项参考

Spark SQL 中提供的 OPTIONS 支持以下设置:

参数 必须 默认 说明
dbTable 是,除非指定了 query 无默认值 要在 Azure Synapse 中创建或读取的表。 将数据保存回 Azure Synapse 时,此参数是必需的。

你还可以使用 {SCHEMA NAME}.{TABLE NAME} 来访问采用给定架构的表。 如果未提供架构名称,则会使用与 JDBC 用户关联的默认架构。

先前支持的 dbtable 变体已弃用,在将来的版本中会被忽略。 请改用“混合大小写”名称。
query 是,除非指定了 dbTable 无默认值 要从 Azure Synapse 中进行读取的查询。

对于在查询中引用的表,你还可以使用 {SCHEMA NAME}.{TABLE NAME} 来访问采用给定架构的表。 如果未提供架构名称,则会使用与 JDBC 用户关联的默认架构。
user 无默认值 Azure Synapse 用户名。 必须与 password 选项一起使用。 使用它的前提是未在 URL 中传递用户和密码。 同时传递这两项会导致错误。
password 无默认值 Azure Synapse 密码。 必须与 user 选项一起使用。 使用它的前提是未在 URL 中传递用户和密码。 同时传递这两项会导致错误。
url 无默认值 一个 JDBC URL,其中的 sqlserver 设置为子协议。 建议使用 Azure 门户提供的连接字符串。 设置
强烈建议使用 encrypt=true,因为它允许对 JDBC 连接进行 SSL 加密。 如果单独设置了 userpassword,则不需要在 URL 中包含它们。
jdbcDriver 取决于 JDBC URL 的子协议 要使用的 JDBC 驱动程序的类名。 此类必须位于类路径中。 在大多数情况下无需指定此选项,因为相应的驱动程序类名会由 JDBC URL 的子协议自动确定。

先前支持的 jdbc_driver 变体已弃用,在将来的版本中会被忽略。 请改用“混合大小写”名称。
tempDir 无默认值 一个 abfss URI。 建议将专用 Blob 存储容器用于 Azure Synapse。

先前支持的 tempdir 变体已弃用,在将来的版本中会被忽略。 请改用“混合大小写”名称。

不能将 Unity Catalog中定义的外部位置用作 tempDir 位置。
tempCompression SNAPPY 由 Spark 和 Azure Synapse 用来进行临时编码/解码的压缩算法。 目前支持的值为 UNCOMPRESSEDSNAPPYGZIP
forwardSparkAzureStorageCredentials false 如果此项为 true,库会自动发现 Spark 用来连接到 Blob 存储容器的存储帐户访问密钥凭据,并会通过 JDBC 将这些凭据转发到 Azure Synapse。 这些凭据作为 JDBC 查询的一部分发送。 因此,在使用此选项时,强烈建议你启用对 JDBC 连接进行 SSL 加密的功能。

配置存储身份验证时,必须将 useAzureMSIforwardSparkAzureStorageCredentials 中的一个(且只能是一个)设置为 true。 或者,可以将 enableServicePrincipalAuth 设置为 true,并将服务主体同时用于 JDBC 和存储身份验证。 forwardSparkAzureStorageCredentials 选项不支持使用托管服务标识或服务主体对存储进行身份验证。 仅支持存储帐户访问密钥。

先前支持的 forward_spark_azure_storage_credentials 变体已弃用,在将来的版本中会被忽略。 请改用“混合大小写”名称。
useAzureMSI false 如果此项为 true,则库会为它创建的数据库范围凭据指定 IDENTITY = 'Managed Service Identity' 并且不指定 SECRET

配置存储身份验证时,必须将 useAzureMSIforwardSparkAzureStorageCredentials 中的一个(且只能是一个)设置为 true。 或者,可以将 enableServicePrincipalAuth 设置为 true,并将服务主体同时用于 JDBC 和存储身份验证。
enableServicePrincipalAuth false 如果为 true,则库将使用提供的服务主体凭据通过 JDBC 连接到 Azure 存储帐户和 Azure Synapse Analytics。

如果 forward_spark_azure_storage_credentialsuseAzureMSI 设置为 true,则该选项将优先于存储身份验证中的服务主体。
tableOptions CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN 一个用于指定表选项的字符串。创建通过 dbTable 设置的 Azure Synapse 表时需要使用这些选项。 此字符串会以文本形式传递到针对 Azure Synapse 发出的 CREATE TABLE SQL 语句的 WITH 子句。

先前支持的 table_options 变体已弃用,在将来的版本中会被忽略。 请改用“混合大小写”名称。
preActions 无默认值(空字符串) 在将数据写入 Azure Synapse 实例之前要在 Azure Synapse 中执行的 SQL 命令的列表,其中各命令之间以 ; 分隔。 这些 SQL 命令必须是 Azure Synapse 接受的有效命令。

如果这些命令中的任何一个失败,系统会将其视为错误,并且不会执行写入操作。
postActions 无默认值(空字符串) 在连接器成功将数据写入 Azure Synapse 实例后要在 Azure Synapse 中执行的 SQL 命令的列表,其中各命令之间以 ; 分隔。 这些 SQL 命令必须是 Azure Synapse 接受的有效命令。

如果这些命令中的任何一个失败,系统会将其视为错误,并且,当数据成功写入 Azure Synapse 实例后,会出现异常。
maxStrLength 256 Spark 中的 StringType 会映射到 Azure Synapse 中的 NVARCHAR(maxStrLength) 类型。 你可以使用 maxStrLength 为所有 NVARCHAR(maxStrLength) 类型列设置字符串长度,这些列位于 Azure Synapse 内名为
dbTable 的表中。

先前支持的 maxstrlength 变体已弃用,在将来的版本中会被忽略。 请改用“混合大小写”名称。
applicationName Databricks-User-Query 每个查询的连接的标记。 如果未指定此项,或者值为空字符串,则会将标记的默认值添加到 JDBC URL。 默认值可防止 Azure DB 监视工具针对查询引发虚假 SQL 注入警报。
maxbinlength 无默认值 控制 BinaryType 列的列长度。 此参数会转换为 VARBINARY(maxbinlength)
identityInsert false 设置为 true 将启用 IDENTITY_INSERT 模式,该模式将数据帧提供的值插入 Azure Synapse 表的标识列中。

请参阅将值显式插入到 IDENTITY 列
externalDataSource 无默认值 预先预配的外部数据源,用于从 Azure Synapse 读取数据。 外部数据源只能与 PolyBase 一起使用,并删除 CONTROL 权限要求,因为连接器不需要创建作用域凭据和外部数据源来加载数据。

有关使用外部数据源时所需的权限的示例用法和列表,请参阅具有外部数据源选项的 PolyBase 所需的 Azure Synapse 权限
maxErrors 0 在取消加载操作之前,在读取和写入期间可拒绝的最大行数。 将忽略拒绝的行。 例如,如果 10 条记录中有 2 条出错,则只处理 8 条记录。

请参阅 CREATE EXTERNAL TABLE 中的 REJECT_VALUE 文档COPY 中的 MAXERRORS 文档
inferTimestampNTZType false 如果为 true,则 Azure Synapse TIMESTAMP 类型的值在读取期间将被解释为 TimestampNTZType(不含时区的时间戳)。 否则,所有时间戳都将被解释为 TimestampType,而不考虑基础 Azure Synapse 表中的类型。

注意

  • 仅当将数据从 Azure Databricks 写入 Azure Synapse 中的新表时,tableOptionspreActionspostActionsmaxStrLength 才适用。
  • 即使所有数据源选项名称不区分大小写,也建议你为了清楚起见,以“混合大小写”方式指定这些名称。

将查询下推到 Azure Synapse 中

Azure Synapse 连接器实施了一组将以下运算符下推到 Azure Synapse 中的优化规则:

  • Filter
  • Project
  • Limit

ProjectFilter 运算符支持以下表达式:

  • 大多数布尔逻辑运算符
  • 比较
  • 基本算术运算
  • 数值和字符串强制转换

对于 Limit 运算符,仅在未指定排序的情况下才支持下推。 例如: 。

SELECT TOP(10) * FROM table(而不是 SELECT TOP(10) * FROM table ORDER BY col)。

注意

Azure Synapse 连接器不下推针对字符串、日期或时间戳进行运算的表达式。

默认启用通过 Azure Synapse 连接器构建的查询下推。 可以通过将 spark.databricks.sqldw.pushdown 设置为 false 来禁用它。

临时数据管理

Azure Synapse 连接器不会删除它在 Azure 存储容器中创建的临时文件。 Databricks 建议你定期删除用户提供的 tempDir 位置下的临时文件。

为了便于进行数据清理,Azure Synapse 连接器不会直接在 tempDir 下存储数据文件,而是创建如下格式的子目录:<tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/。 可以通过设置定期作业(使用 Azure Databricks 的作业功能或其他功能进行设置),以递归方式删除其创建时间早于给定阈值(例如 2 天)的任何子目录(假设 Spark 作业的运行时间不能超过该阈值)。

一个更简单的替代方法是,定期删除整个容器,然后使用同一名称创建一个新容器。 这要求你将专用容器用于 Azure Synapse 连接器生成的临时数据,并且你可以找到一个时间窗口,在该窗口中,你可以保证任何涉及连接器的查询均未在运行。

临时对象管理

Azure Synapse 连接器在 Azure Databricks 群集和 Azure Synapse 实例之间自动进行数据传输。 为了从 Azure Synapse 表或查询中读取数据或将数据写入 Azure Synapse 表,Azure Synapse 连接器会创建临时对象,其中包括幕后的 DATABASE SCOPED CREDENTIALEXTERNAL DATA SOURCEEXTERNAL FILE FORMATEXTERNAL TABLE。 这些对象只在相应 Spark 作业的整个持续时间内生存,并且会自动删除。

当群集使用 Azure Synapse 连接器运行查询时,如果 Spark 驱动程序进程崩溃或被强制重启,或者群集被强制终止或重启,则可能不会删除临时对象。 为了便于识别并手动删除这些对象,Azure Synapse 连接器会使用以下格式的标记为在 Azure Synapse 实例中创建的所有中间临时对象的名称加上前缀:tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>

建议使用如下所示的查询定期查找泄漏的对象:

  • SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
  • SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'