Connecting Azure Databricks and Azure Synapse with PolyBase (legacy)
Important
This documentation has been retired and might not be updated. The products, services, or technologies mentioned in this content are no longer supported. See Query data in Azure Synapse Analytics.
Databricks recommends using the default COPY
functionality with Azure Data Lake Storage Gen2 for connections to Azure Synapse. This article includes legacy documentation around PolyBase and blob storage.
Azure Synapse Analytics (formerly SQL Data Warehouse) is a cloud-based enterprise data warehouse that leverages massively parallel processing (MPP) to quickly run complex queries across petabytes of data. Use Azure as a key component of a big data solution. Import big data into Azure with simple PolyBase T-SQL queries, or COPY statement and then use the power of MPP to run high-performance analytics. As you integrate and analyze, the data warehouse will become the single version of truth your business can count on for insights.
You can access Azure Synapse from Azure Databricks using the Azure Synapse connector, a data source implementation for Apache Spark that uses Azure Blob storage, and PolyBase or the COPY
statement in Azure Synapse to transfer large volumes of data efficiently between an Azure Databricks cluster and an Azure Synapse instance.
Both the Azure Databricks cluster and the Azure Synapse instance access a common Blob storage container to exchange data between these two systems. In Azure Databricks, Apache Spark jobs are triggered by the Azure Synapse connector to read data from and write data to the Blob storage container. On the Azure Synapse side, data loading and unloading operations performed by PolyBase are triggered by the Azure Synapse connector through JDBC. In Databricks Runtime 7.0 and above, COPY
is used by default to load data into Azure Synapse by the Azure Synapse connector through JDBC.
Note
COPY
is available only on Azure Synapse Gen2 instances, which provide better performance. If your database still uses Gen1 instances, we recommend that you migrate the database to Gen2.
The Azure Synapse connector is more suited to ETL than to interactive queries, because each query execution can extract large amounts of data to Blob storage. If you plan to perform several queries against the same Azure Synapse table, we recommend that you save the extracted data in a format such as Parquet.
Requirements
An Azure Synapse database master key.
Authentication
The Azure Synapse connector uses three types of network connections:
- Spark driver to Azure Synapse
- Spark driver and executors to Azure storage account
- Azure Synapse to Azure storage account
┌─────────┐
┌─────────────────────────>│ STORAGE │<────────────────────────┐
│ Storage acc key / │ ACCOUNT │ Storage acc key / │
│ Managed Service ID / └─────────┘ OAuth 2.0 / │
│ │ │
│ │ │
│ │ Storage acc key / │
│ │ OAuth 2.0 / │
│ │ │
v v ┌──────v────┐
┌──────────┐ ┌──────────┐ │┌──────────┴┐
│ Synapse │ │ Spark │ ││ Spark │
│ Analytics│<────────────────────>│ Driver │<───────────────>│ Executors │
└──────────┘ JDBC with └──────────┘ Configured └───────────┘
username & password / in Spark
The following sections describe each connection's authentication configuration options.
Spark driver to Azure Synapse
The Spark driver can connect to Azure Synapse using JDBC with a username and password or OAuth 2.0 with a service principal for authentication.
Username and password
We recommend that you use the connection strings provided by Azure portal for both authentication types, which enable
Secure Sockets Layer (SSL) encryption for all data sent between the Spark driver and the Azure Synapse
instance through the JDBC connection. To verify that the SSL encryption is enabled, you can search for
encrypt=true
in the connection string.
To allow the Spark driver to reach Azure Synapse, we recommend that you set Allow Azure services and resources to access this workspace to ON on the Networking pane under Security of the Azure Synapse workspace through the Azure portal. This setting allows communications from all Azure IP addresses and all Azure subnets, which allows Spark drivers to reach the Azure Synapse instance.
OAuth 2.0 with a service principal
You can authenticate to Azure Synapse Analytics using a service principal with access to the underlying storage account. For more information on using service principal credentials to access an Azure storage account, see Connect to Azure Data Lake Storage Gen2 and Blob Storage. You must set the enableServicePrincipalAuth
option to true
in the connection configuration Parameters to enable the connector to authenticate with a service principal.
You can optionally use a different service principal for the Azure Synapse Analytics connection. An example that configures service principal credentials for the storage account and optional service principal credentials for Synapse:
ini
; Defining the Service Principal credentials for the Azure storage account
fs.azure.account.auth.type OAuth
fs.azure.account.oauth.provider.type org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id <application-id>
fs.azure.account.oauth2.client.secret <service-credential>
fs.azure.account.oauth2.client.endpoint https://login.chinacloudapi.cn/<directory-id>/oauth2/token
; Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.databricks.sqldw.jdbc.service.principal.client.id <application-id>
spark.databricks.sqldw.jdbc.service.principal.client.secret <service-credential>
Scala
// Defining the Service Principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.chinacloudapi.cn/<directory-id>/oauth2/token")
// Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Python
# Defining the service principal credentials for the Azure storage account
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<application-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<service-credential>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.chinacloudapi.cn/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
R
# Load SparkR
library(SparkR)
conf <- sparkR.callJMethod(sparkR.session(), "conf")
# Defining the service principal credentials for the Azure storage account
sparkR.callJMethod(conf, "set", "fs.azure.account.auth.type", "OAuth")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.secret", "<service-credential>")
sparkR.callJMethod(conf, "set", "fs.azure.account.oauth2.client.endpoint", "https://login.chinacloudapi.cn/<directory-id>/oauth2/token")
# Defining a separate set of service principal credentials for Azure Synapse Analytics (If not defined, the connector will use the Azure storage account credentials)
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.id", "<application-id>")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.jdbc.service.principal.client.secret", "<service-credential>")
Spark driver and executors to Azure storage account
The Azure storage container acts as an intermediary to store bulk data when reading from or writing to Azure Synapse. Spark connects to ADLS Gen2 or Blob Storage using the abfss
driver.
The following authentication options are available:
- Storage account access key and secret
- OAuth 2.0 authentication. For more information about OAuth 2.0 and service principals, see Access storage using a service principal & Microsoft Entra ID(Azure Active Directory).
The examples below illustrate these two ways using the storage account access key approach. The same applies to OAuth 2.0 configuration.
Notebook session configuration (preferred)
Using this approach, the account access key is set in the session configuration associated with the notebook that runs the command. This configuration does not affect other notebooks attached to the same cluster. spark
is the SparkSession
object provided in the notebook.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
Global Hadoop configuration
This approach updates the global Hadoop configuration associated with the SparkContext
object shared by all notebooks.
Scala
sc.hadoopConfiguration.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
Python
hadoopConfiguration
is not exposed in all versions of PySpark. Although the following command relies on some Spark internals, it should work with all PySpark versions and is unlikely to break or change in the future:
sc._jsc.hadoopConfiguration().set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
Azure Synapse to Azure storage account
Azure Synapse also connects to a storage account during loading and unloading of temporary data.
In case you have set up an account key and secret for the storage account, you can set forwardSparkAzureStorageCredentials
to true
, in which case
Azure Synapse connector automatically discovers the account access key set in the notebook session configuration or
the global Hadoop configuration and forwards the storage account access key to the connected Azure Synapse instance by creating a temporary Azure
database scoped credential.
Alternatively, if you use ADLS Gen2 with OAuth 2.0 authentication or your Azure Synapse instance is configured to have a Managed Service Identity (typically in conjunction with a
VNet + Service Endpoints setup), you must set useAzureMSI
to true
. In this case the connector will specify IDENTITY = 'Managed Service Identity'
for the databased scoped credential and no SECRET
.
Streaming support
The Azure Synapse connector offers efficient and scalable Structured Streaming write support for Azure Synapse that
provides consistent user experience with batch writes, and uses PolyBase or COPY
for large data transfers
between an Azure Databricks cluster and Azure Synapse instance. Similar to the batch writes, streaming is designed largely
for ETL, thus providing higher latency that may not be suitable for real-time data processing in some cases.
Fault tolerance semantics
By default, Azure Synapse Streaming offers end-to-end exactly-once guarantee for writing data into an Azure Synapse table by
reliably tracking progress of the query using a combination of checkpoint location in DBFS, checkpoint table in Azure Synapse,
and locking mechanism to ensure that streaming can handle any types of failures, retries, and query restarts.
Optionally, you can select less restrictive at-least-once semantics for Azure Synapse Streaming by setting
spark.databricks.sqldw.streaming.exactlyOnce.enabled
option to false
, in which case data duplication
could occur in the event of intermittent connection failures to Azure Synapse or unexpected query termination.
Usage (Batch)
You can use this connector via the data source API in Scala, Python, SQL, and R notebooks.
Scala
// Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.load()
// Load data from an Azure Synapse query.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("query", "select x, count(*) as cnt from table group by x")
.load()
// Apply some transformations to the data, then use the
// Data Source API to write the data back to another table in Azure Synapse.
df.write
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
.save()
Python
# Otherwise, set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.load()
# Load data from an Azure Synapse query.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("query", "select x, count(*) as cnt from table group by x") \
.load()
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
df.write \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
.save()
SQL
-- Otherwise, set up the Blob storage account access key in the notebook session conf.
SET fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn=<your-storage-account-access-key>;
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>'
);
-- Write data using SQL.
-- Create a new table, throwing an error if a table with the same name already exists:
CREATE TABLE example_table_in_spark_write
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>'
)
AS SELECT * FROM table_to_save_in_spark;
R
# Load SparkR
library(SparkR)
# Otherwise, set up the Blob storage account access key in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn", "<your-storage-account-access-key>")
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
# Load data from an Azure Synapse query.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
query = "select x, count(*) as cnt from table group by x",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
# Apply some transformations to the data, then use the
# Data Source API to write the data back to another table in Azure Synapse.
write.df(
df,
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
Usage (Streaming)
You can write data using Structured Streaming in Scala and Python notebooks.
Scala
// Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
// Prepare streaming source; this could be Kafka or a simple rate stream.
val df: DataFrame = spark.readStream
.format("rate")
.option("rowsPerSecond", "100000")
.option("numPartitions", "16")
.load()
// Apply some transformations to the data then use
// Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
.option("forwardSparkAzureStorageCredentials", "true")
.option("dbTable", "<your-table-name>")
.option("checkpointLocation", "/tmp_checkpoint_location")
.start()
Python
# Set up the Blob storage account access key in the notebook session conf.
spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.dfs.core.chinacloudapi.cn",
"<your-storage-account-access-key>")
# Prepare streaming source; this could be Kafka or a simple rate stream.
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", "100000") \
.option("numPartitions", "16") \
.load()
# Apply some transformations to the data then use
# Structured Streaming API to continuously write the data to a table in Azure Synapse.
df.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "<your-table-name>") \
.option("checkpointLocation", "/tmp_checkpoint_location") \
.start()
Configuration
This section describes how to configure write semantics for the connector, required permissions, and miscellaneous configuration parameters.
In this section:
- Supported save modes for batch writes
- Supported output modes for streaming writes
- Write semantics
- Required Azure Synapse permissions for PolyBase
- Required Azure Synapse permissions for the
COPY
statement - Parameters
- Query pushdown into Azure Synapse
- Temporary data management
- Temporary object management
- Streaming checkpoint table management
Supported save modes for batch writes
The Azure Synapse connector supports ErrorIfExists
, Ignore
, Append
, and Overwrite
save modes with the default mode being ErrorIfExists
.
For more information on supported save modes in Apache Spark,
see Spark SQL documentation on Save Modes.
Supported output modes for streaming writes
The Azure Synapse connector supports Append
and Complete
output modes for record appends and aggregations.
For more details on output modes and compatibility matrix, see the
Structured Streaming guide.
Write semantics
Note
COPY
is available in Databricks Runtime 7.0 and above.
In addition to PolyBase, the Azure Synapse connector supports the COPY
statement. The COPY
statement offers a more convenient way of loading data into Azure Synapse without the need to
create an external table, requires fewer permissions to load data, and improves the performance of
data ingestion into Azure Synapse.
By default, the connector automatically discovers the best write semantics (COPY
when targeting an
Azure Synapse Gen2 instance, PolyBase otherwise). You can also specify the write semantics with the
following configuration:
Scala
// Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
Python
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
spark.conf.set("spark.databricks.sqldw.writeSemantics", "<write-semantics>")
SQL
-- Configure the write semantics for Azure Synapse connector in the notebook session conf.
SET spark.databricks.sqldw.writeSemantics=<write-semantics>;
R
# Load SparkR
library(SparkR)
# Configure the write semantics for Azure Synapse connector in the notebook session conf.
conf <- sparkR.callJMethod(sparkR.session(), "conf")
sparkR.callJMethod(conf, "set", "spark.databricks.sqldw.writeSemantics", "<write-semantics>")
where <write-semantics>
is either polybase
to use PolyBase, or copy
to use the COPY
statement.
Required Azure Synapse permissions for PolyBase
When you use PolyBase, the Azure Synapse connector requires the JDBC connection user to have permission to run the following commands in the connected Azure Synapse instance:
- CREATE DATABASE SCOPED CREDENTIAL
- CREATE EXTERNAL DATA SOURCE
- CREATE EXTERNAL FILE FORMAT
- CREATE EXTERNAL TABLE
As a prerequisite for the first command, the connector expects that a database master key already exists for the specified Azure Synapse instance. If not, you can create a key using the CREATE MASTER KEY command.
Additionally, to read the Azure Synapse table set through dbTable
or tables referred in query
, the JDBC user must have permission to access needed Azure Synapse tables. To write data back to an Azure Synapse table set through dbTable
, the JDBC user must have permission to write to this Azure Synapse table.
The following table summarizes the required permissions for all operations with PolyBase:
Operation | Permissions | Permissions when using external data source |
---|---|---|
Batch write | CONTROL | See Batch write |
Streaming write | CONTROL | See Streaming write |
Read | CONTROL | See Read |
Required Azure Synapse permissions for PolyBase with the external data source option
You can use PolyBase with a pre-provisioned external data source. See the externalDataSource
parameter in Parameters for more information.
To use PolyBase with a pre-provisioned external data source, the Azure Synapse connector requires the JDBC connection user to have permission to run the following commands in the connected Azure Synapse instance:
To create an external data source, you should first create a database scoped credential. The following links describe how to create a scoped credential for service principals and an external data source for an ABFS location:
Note
The external data source location must point to a container. The connector will not work if the location is a directory in a container.
The following table summarizes the permissions for PolyBase write operations with the external data source option:
Operation | Permissions (insert into an existing table) | Permissions (insert into a new table) |
---|---|---|
Batch write | ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
Streaming write | ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
The following table summarizes the permissions for PolyBase read operations with external data source option:
Operation | Permissions |
---|---|
Read | CREATE TABLE ALTER ANY SCHEMA ALTER ANY EXTERNAL DATA SOURCE ALTER ANY EXTERNAL FILE FORMAT |
You can use this connector to read via the data source API in Scala, Python, SQL, and R notebooks.
Scala
// Get some data from an Azure Synapse table.
val df: DataFrame = spark.read
.format("com.databricks.spark.sqldw")
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>")
.option("externalDataSource", "<your-pre-provisioned-data-source>")
.option("dbTable", "<your-table-name>")
.load()
Python
# Get some data from an Azure Synapse table.
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("tempDir", "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>") \
.option("externalDataSource", "<your-pre-provisioned-data-source>") \
.option("dbTable", "<your-table-name>") \
.load()
SQL
-- Read data using SQL.
CREATE TABLE example_table_in_spark_read
USING com.databricks.spark.sqldw
OPTIONS (
url 'jdbc:sqlserver://<the-rest-of-the-connection-string>',
forwardSparkAzureStorageCredentials 'true',
dbTable '<your-table-name>',
tempDir 'abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>',
externalDataSource '<your-pre-provisioned-data-source>'
);
R
# Get some data from an Azure Synapse table.
df <- read.df(
source = "com.databricks.spark.sqldw",
url = "jdbc:sqlserver://<the-rest-of-the-connection-string>",
forward_spark_azure_storage_credentials = "true",
dbTable = "<your-table-name>",
tempDir = "abfss://<your-container-name>@<your-storage-account-name>.dfs.core.chinacloudapi.cn/<your-directory-name>"
externalDataSource = "<your-pre-provisioned-data-source>")
Required Azure Synapse permissions for the COPY
statement
Note
Available in Databricks Runtime 7.0 and above.
When you use the COPY
statement, the Azure Synapse connector requires the JDBC connection user to have permission
to run the following commands in the connected Azure Synapse instance:
If the destination table does not exist in Azure Synapse, permission to run the following command is required in addition to the command above:
The following table summarizes the permissions for batch and streaming writes with COPY
:
Operation | Permissions (insert into an existing table) | Permissions (insert into a new table) |
---|---|---|
Batch write | ADMINISTER DATABASE BULK OPERATIONS INSERT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Streaming write | ADMINISTER DATABASE BULK OPERATIONS INSERT |
ADMINISTER DATABASE BULK OPERATIONS INSERT CREATE TABLE ALTER ON SCHEMA :: dbo |
Parameters
The parameter map or OPTIONS
provided in Spark SQL support the following settings:
Parameter | Required | Default | Notes |
---|---|---|---|
dbTable |
Yes, unless query is specified |
No default | The table to create or read from in Azure Synapse. This parameter is required when saving data back to Azure Synapse. You can also use {SCHEMA NAME}.{TABLE NAME} to access a table in a given schema. If schema name is not provided, the default schema associated with the JDBC user is used.The previously supported dbtable variant is deprecated and will be ignored in future releases. Use the "camel case" name instead. |
query |
Yes, unless dbTable is specified |
No default | The query to read from in Azure Synapse. For tables referred in the query, you can also use {SCHEMA NAME}.{TABLE NAME} to access a table in a given schema. If schema name is not provided, the default schema associated with the JDBC user is used. |
user |
No | No default | The Azure Synapse username. Must be used in tandem with password option. Can only be used if the user and password are not passed in the URL. Passing both will result in an error. |
password |
No | No default | The Azure Synapse password. Must be used in tandem with user option. Can only be used if the user and password are not passed in the URL. Passing both will result in an error. |
url |
Yes | No default | A JDBC URL with sqlserver set as the subprotocol. It is recommended to use the connection string provided by Azure portal. Settingencrypt=true is strongly recommended, because it enables SSL encryption of the JDBC connection. If user and password are set separately, you do not need to include them in the URL. |
jdbcDriver |
No | Determined by the JDBC URL's subprotocol | The class name of the JDBC driver to use. This class must be on the classpath. In most cases, it should not be necessary to specify this option, as the appropriate driver classname should automatically be determined by the JDBC URL's subprotocol. The previously supported jdbc_driver variant is deprecated and will be ignored in future releases. Use the "camel case" name instead. |
tempDir |
Yes | No default | A abfss URI. We recommend you use a dedicated Blob storage container for the Azure Synapse.The previously supported tempdir variant is deprecated and will be ignored in future releases. Use the "camel case" name instead. |
tempFormat |
No | PARQUET |
The format in which to save temporary files to the blob store when writing to Azure Synapse. Defaults to PARQUET ; no other values are allowed right now. |
tempCompression |
No | SNAPPY |
The compression algorithm to be used to encode/decode temporary by both Spark and Azure Synapse. Currently supported values are: UNCOMPRESSED , SNAPPY and GZIP . |
forwardSparkAzureStorageCredentials |
No | false | If true , the library automatically discovers the credentials that Spark is using to connect to the Blob storage container and forwards those credentials to Azure Synapse over JDBC. These credentials are sent as part of the JDBC query. Therefore it is strongly recommended that you enable SSL encryption of the JDBC connection when you use this option.The current version of Azure Synapse connector requires (exactly) one of forwardSparkAzureStorageCredentials , enableServicePrincipalAuth , or useAzureMSI to be explicitly set to true .The previously supported forward_spark_azure_storage_credentials variant is deprecated and will be ignored in future releases. Use the "camel case" name instead. |
useAzureMSI |
No | false | If true , the library will specify IDENTITY = 'Managed Service Identity' and no SECRET for the database scoped credentials it creates.The current version of Azure Synapse connector requires (exactly) one of forwardSparkAzureStorageCredentials , enableServicePrincipalAuth , or useAzureMSI to be explicitly set to true . |
enableServicePrincipalAuth |
No | false | If true , the library will use the provided service principal credentials to connect to the Azure storage account and Azure Synapse Analytics over JDBC.The current version of Azure Synapse connector requires (exactly) one of forwardSparkAzureStorageCredentials , enableServicePrincipalAuth , or useAzureMSI to be explicitly set to true . |
tableOptions |
No | CLUSTERED COLUMNSTORE INDEX , DISTRIBUTION = ROUND_ROBIN |
A string used to specify table options when creating the Azure Synapse table set through dbTable . This string is passed literally to the WITH clause of the CREATE TABLE SQL statement that is issued against Azure Synapse.The previously supported table_options variant is deprecated and will be ignored in future releases. Use the "camel case" name instead. |
preActions |
No | No default (empty string) | A ; separated list of SQL commands to be executed in Azure Synapse before writing data to the Azure Synapse instance. These SQL commands are required to be valid commands accepted by Azure Synapse.If any of these commands fail, it is treated as an error and the write operation is not executed. |
postActions |
No | No default (empty string) | A ; separated list of SQL commands to be executed in Azure Synapse after the connector successfully writes data to the Azure Synapse instance. These SQL commands are required to be valid commands accepted by Azure Synapse.If any of these commands fail, it is treated as an error and you'll get an exception after the data is successfully written to the Azure Synapse instance. |
maxStrLength |
No | 256 | StringType in Spark is mapped to the NVARCHAR(maxStrLength) type in Azure Synapse. You can use maxStrLength to set the string length for all NVARCHAR(maxStrLength) type columns that are in the table with namedbTable in Azure Synapse.The previously supported maxstrlength variant is deprecated and will be ignored in future releases. Use the "camel case" name instead. |
checkpointLocation |
Yes | No default | Location on DBFS that will be used by Structured Streaming to write metadata and checkpoint information. See Recovering from Failures with Checkpointing in Structured Streaming programming guide. |
numStreamingTempDirsToKeep |
No | 0 | Indicates how many (latest) temporary directories to keep for periodic cleanup of micro batches in streaming. When set to 0 , directory deletion is triggered immediately after micro batch is committed, otherwise provided number of latest micro batches is kept and the rest of directories is removed. Use -1 to disable periodic cleanup. |
applicationName |
No | Databricks-User-Query |
The tag of the connection for each query. If not specified or the value is an empty string, the default value of the tag is added the JDBC URL. The default value prevents the Azure DB Monitoring tool from raising spurious SQL injection alerts against queries. |
maxbinlength |
No | No default | Control the column length of BinaryType columns. This parameter is translated as VARBINARY(maxbinlength) . |
identityInsert |
No | false | Setting to true enables IDENTITY_INSERT mode, which inserts a DataFrame provided value in the identity column of the Azure Synapse table.See Explicitly inserting values into an IDENTITY column. |
externalDataSource |
No | No default | A pre-provisioned external data source to read data from Azure Synapse. An external data source can only be used with PolyBase and removes the CONTROL permission requirement since the connector does not need to create a scoped credential and an external data source to load data. For example usage and the list of permissions required when using an external data source, see Required Azure Synapse permissions for PolyBase with the external data source option. |
maxErrors |
No | 0 | The maximum number of rows that can be rejected during reads and writes before the loading operation (either PolyBase or COPY) is cancelled. The rejected rows will be ignored. For example, if two out of ten records have errors, only eight records will be processed. See REJECT_VALUE documentation in CREATE EXTERNAL TABLE and MAXERRORS documentation in COPY. |
Note
tableOptions
,preActions
,postActions
, andmaxStrLength
are relevant only when writing data from Azure Databricks to a new table in Azure Synapse.externalDataSource
is relevant only when reading data from Azure Synapse and writing data from Azure Databricks to a new table in Azure Synapse with PolyBase semantics. You should not specify other storage authentication types while usingexternalDataSource
such asforwardSparkAzureStorageCredentials
oruseAzureMSI
.checkpointLocation
andnumStreamingTempDirsToKeep
are relevant only for streaming writes from Azure Databricks to a new table in Azure Synapse.- Even though all data source option names are case-insensitive, we recommend that you specify them in "camel case" for clarity.
Query pushdown into Azure Synapse
The Azure Synapse connector implements a set of optimization rules to push the following operators down into Azure Synapse:
Filter
Project
Limit
The Project
and Filter
operators support the following expressions:
- Most boolean logic operators
- Comparisons
- Basic arithmetic operations
- Numeric and string casts
For the Limit
operator, pushdown is supported only when there is no ordering specified. For example:
SELECT TOP(10) * FROM table
, but not SELECT TOP(10) * FROM table ORDER BY col
.
Note
The Azure Synapse connector does not push down expressions operating on strings, dates, or timestamps.
Query pushdown built with the Azure Synapse connector is enabled by default.
You can disable it by setting spark.databricks.sqldw.pushdown
to false
.
Temporary data management
The Azure Synapse connector does not delete the temporary files that it creates in the Blob storage container.
Therefore we recommend that you periodically delete temporary files under the user-supplied tempDir
location.
To facilitate data cleanup, the Azure Synapse connector does not store data files directly under tempDir
,
but instead creates a subdirectory of the form: <tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/
.
You can set up periodic jobs (using the Azure Databricks jobs feature or otherwise) to recursively delete any subdirectories that are older than a given threshold (for example, 2 days), with the assumption that there cannot be Spark jobs running longer than that threshold.
A simpler alternative is to periodically drop the whole container and create a new one with the same name. This requires that you use a dedicated container for the temporary data produced by the Azure Synapse connector and that you can find a time window in which you can guarantee that no queries involving the connector are running.
Temporary object management
The Azure Synapse connector automates data transfer between an Azure Databricks cluster and an Azure Synapse instance.
For reading data from an Azure Synapse table or query or writing data to an Azure Synapse table,
the Azure Synapse connector creates temporary objects, including DATABASE SCOPED CREDENTIAL
, EXTERNAL DATA SOURCE
, EXTERNAL FILE FORMAT
,
and EXTERNAL TABLE
behind the scenes. These objects live
only throughout the duration of the corresponding Spark job and should automatically be dropped thereafter.
When a cluster is running a query using the Azure Synapse connector, if the Spark driver process crashes or is forcefully restarted, or if the cluster
is forcefully terminated or restarted, temporary objects might not be dropped.
To facilitate identification and manual deletion of these objects, the Azure Synapse connector prefixes the names of all intermediate temporary objects created in the Azure Synapse instance with a tag of the form: tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>
.
We recommend that you periodically look for leaked objects using queries such as the following:
SELECT * FROM sys.database_scoped_credentials WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_data_sources WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_file_formats WHERE name LIKE 'tmp_databricks_%'
SELECT * FROM sys.external_tables WHERE name LIKE 'tmp_databricks_%'
Streaming checkpoint table management
The Azure Synapse connector does not delete the streaming checkpoint table that is created when new streaming query is started.
This behavior is consistent with the checkpointLocation
on DBFS. Therefore we recommend that you periodically delete
checkpoint tables at the same time as removing checkpoint locations on DBFS for queries that are not going to be run in the future or already have checkpoint location removed.
By default, all checkpoint tables have the name <prefix>_<query-id>
, where <prefix>
is a configurable prefix with default value databricks_streaming_checkpoint
and query_id
is a streaming query ID with _
characters removed. To find all checkpoint tables for stale or deleted streaming queries, run the query:
SELECT * FROM sys.tables WHERE name LIKE 'databricks_streaming_checkpoint%'
You can configure the prefix with the Spark SQL configuration option spark.databricks.sqldw.streaming.exactlyOnce.checkpointTableNamePrefix
.
Frequently asked questions (FAQ)
I received an error while using the Azure Synapse connector. How can I tell if this error is from Azure Synapse or Azure Databricks?
To help you debug errors, any exception thrown by code that is specific to the Azure Synapse connector is wrapped in an exception extending the SqlDWException
trait. Exceptions also make the following distinction:
SqlDWConnectorException
represents an error thrown by the Azure Synapse connectorSqlDWSideException
represents an error thrown by the connected Azure Synapse instance
What should I do if my query failed with the error "No access key found in the session conf or the global Hadoop conf"?
This error means that Azure Synapse connector could not find the
storage account access key in the notebook session configuration or global Hadoop configuration for the storage account specified in tempDir
.
See Usage (Batch) for examples of how to configure Storage Account access properly. If a Spark table is created using Azure Synapse connector,
you must still provide the storage account access credentials in order to read or write to the Spark table.
Can I use a Shared Access Signature (SAS) to access the Blob storage container specified by tempDir
?
Azure Synapse does not support using SAS to access Blob storage. Therefore the Azure Synapse connector does not support SAS to access the Blob storage container specified by tempDir
.
I created a Spark table using Azure Synapse connector with the dbTable
option, wrote some data to this Spark table, and then dropped this Spark table. Will the table created at the Azure Synapse side be dropped?
No. Azure Synapse is considered an external data source. The Azure Synapse table with the name set through dbTable
is not dropped when
the Spark table is dropped.
When writing a DataFrame to Azure Synapse, why do I need to say .option("dbTable", tableName).save()
instead of just .saveAsTable(tableName)
?
That is because we want to make the following distinction clear: .option("dbTable", tableName)
refers to the database (that is, Azure Synapse) table, whereas .saveAsTable(tableName)
refers to the Spark table. In fact, you could even combine the two: df.write. ... .option("dbTable", tableNameDW).saveAsTable(tableNameSpark)
which creates a table in Azure Synapse called tableNameDW
and an external table in Spark called tableNameSpark
that is backed by the Azure Synapse table.
Warning
Beware of the following difference between .save()
and .saveAsTable()
:
- For
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).save()
,writeMode
acts on the Azure Synapse table, as expected. - For
df.write. ... .option("dbTable", tableNameDW).mode(writeMode).saveAsTable(tableNameSpark)
,writeMode
acts on the Spark table, whereastableNameDW
is silently overwritten if it already exists in Azure Synapse.
This behavior is no different from writing to any other data source. It is just a caveat of the Spark DataFrameWriter API.