使用 Spark 连接器加快实时大数据分析Accelerate real-time big data analytics using the Spark connector

适用于: Azure SQL 数据库 Azure SQL 托管实例

备注

截至 2020 年 9 月,此连接器未进行主动维护。As of Sep 2020, this connector is not actively maintained. 但是,适用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器现在已可用,它支持 Python 和 R 绑定,更易于使用的界面可批量插入数据,此外还有许多其他改进。However, Apache Spark Connector for SQL Server and Azure SQL is now available, with support for Python and R bindings, an easier-to use interface to bulk insert data, and many other improvements. 强烈建议你评估并使用新连接器而不是此连接器。We strongly encourage you to evaluate and use the new connector instead of this one. 有关旧连接器的信息(此页)只保留用于存档目的。The information about the old connector (this page) is only retained for archival purposes.

通过使用 Spark 连接器,Azure SQL 数据库、Azure SQL 托管实例和 SQL Server 中的数据库可以充当 Spark 作业的输入数据源或输出数据接收器。The Spark connector enables databases in Azure SQL Database, Azure SQL Managed Instance, and SQL Server to act as the input data source or output data sink for Spark jobs. 由此,可在大数据分析中利用实时事务数据,并保留临时查询或报告的结果。It allows you to utilize real-time transactional data in big data analytics and persist results for ad hoc queries or reporting. 与内置 JDBC 连接器相比,此连接器能够将数据批量插入数据库。Compared to the built-in JDBC connector, this connector provides the ability to bulk insert data into your database. 它的性能可以比逐行插入快 10 倍到 20 倍。It can outperform row-by-row insertion with 10x to 20x faster performance. Spark 连接器支持在连接到 Azure SQL 数据库和 Azure SQL 托管实例时执行 Azure Active Directory (Azure AD) 身份验证。The Spark connector supports Azure Active Directory (Azure AD) authentication to connect to Azure SQL Database and Azure SQL Managed Instance. 它提供与内置 JDBC 连接器类似的接口。It provides similar interfaces with the built-in JDBC connector. 可以轻松迁移现有的 Spark 作业以使用此新连接器。It is easy to migrate your existing Spark jobs to use this new connector.

下载并构建 Spark 连接器Download and build a Spark connector

我们不主动维护以前从此页链接到的旧连接器的 GitHub 存储库,The GitHub repo for the old connector previously linked to from this page is not actively maintained. 但强烈建议你评估并使用新连接器Instead, we strongly encourage you to evaluate and use the new connector.

官方支持的版本Official supported versions

组件Component 版本Version
Apache SparkApache Spark 2.0.2 或更高版本2.0.2 or later
ScalaScala 2.10 或更高版本2.10 or later
Microsoft JDBC Driver for SQL ServerMicrosoft JDBC Driver for SQL Server 6.2 或更高版本6.2 or later
Microsoft SQL ServerMicrosoft SQL Server SQL Server 2008 或更高版本SQL Server 2008 or later
Azure SQL 数据库Azure SQL Database 支持Supported
Azure SQL 托管实例Azure SQL Managed Instance 支持Supported

Spark 连接器利用 Microsoft JDBC Driver for SQL Server 在 Spark 工作器节点和数据库之间移动数据:The Spark connector utilizes the Microsoft JDBC Driver for SQL Server to move data between Spark worker nodes and databases:

数据流如下所示:The dataflow is as follows:

  1. Spark 主节点连接到 SQL 数据库或 SQL Server 中的数据库,并从特定的表中或使用特定的 SQL 查询加载数据。The Spark master node connects to databases in SQL Database or SQL Server and loads data from a specific table or using a specific SQL query.
  2. Spark 主节点将数据分发到辅助角色节点以进行转换。The Spark master node distributes data to worker nodes for transformation.
  3. 工作器节点连接到与 SQL 数据库或 SQL Server 连接的数据库并将数据写入数据库。The Worker node connects to databases that connect to SQL Database and SQL Server and writes data to the database. 用户可选择使用逐行插入或批量插入。User can choose to use row-by-row insertion or bulk insert.

下图演示了此数据流。The following diagram illustrates the data flow.

此图显示了所描述的流,其中一个主节点直接连接到数据库,并连接到三个连接到数据库的工作器节点。

生成 Spark 连接器Build the Spark connector

目前,连接器项目使用 maven。Currently, the connector project uses maven. 若要构建不带依赖项的连接器,可以运行:To build the connector without dependencies, you can run:

  • mvn 清理包mvn clean package
  • 从 releases 文件夹中下载最新版本的 JARDownload the latest versions of the JAR from the release folder
  • 包括 SQL 数据库 Spark JARInclude the SQL Database Spark JAR

使用 Spark 连接器连接和读取数据Connect and read data using the Spark connector

可以从 Spark 作业连接到 SQL 数据库和 SQL Server 中的数据库以读取或写入数据。You can connect to databases in SQL Database and SQL Server from a Spark job to read or write data. 也可在 SQL 数据库或 SQL Server 的数据库中运行 DML 或 DDL 查询。You can also run a DML or DDL query in databases in SQL Database and SQL Server.

从 Azure SQL 和 SQL Server 读取数据Read data from Azure SQL and SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.chinacloudapi.cn",
  "databaseName"   -> "MyDatabase",
  "dbTable"        -> "dbo.Clients",
  "user"           -> "username",
  "password"       -> "*********",
  "connectTimeout" -> "5", //seconds
  "queryTimeout"   -> "5"  //seconds
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

使用指定的 SQL 查询从 Azure SQL 和 SQL Server 读取数据Read data from Azure SQL and SQL Server with specified SQL query

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"          -> "mysqlserver.database.chinacloudapi.cn",
  "databaseName" -> "MyDatabase",
  "queryCustom"  -> "SELECT TOP 100 * FROM dbo.Clients WHERE PostalCode = 98074" //Sql query
  "user"         -> "username",
  "password"     -> "*********",
))

//Read all data in table dbo.Clients
val collection = sqlContext.read.sqlDB(config)
collection.show()

将数据写入 Azure SQL 和 SQL ServerWrite data to Azure SQL and SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

// Aquire a DataFrame collection (val collection)

val config = Config(Map(
  "url"          -> "mysqlserver.database.chinacloudapi.cn",
  "databaseName" -> "MyDatabase",
  "dbTable"      -> "dbo.Clients",
  "user"         -> "username",
  "password"     -> "*********"
))

import org.apache.spark.sql.SaveMode
collection.write.mode(SaveMode.Append).sqlDB(config)

在 Azure SQL 和 SQL Server 中运行 DML 或 DDL 查询Run DML or DDL query in Azure SQL and SQL Server

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
val query = """
              |UPDATE Customers
              |SET ContactName = 'Alfred Schmidt', City = 'Frankfurt'
              |WHERE CustomerID = 1;
            """.stripMargin

val config = Config(Map(
  "url"          -> "mysqlserver.database.chinacloudapi.cn",
  "databaseName" -> "MyDatabase",
  "user"         -> "username",
  "password"     -> "*********",
  "queryCustom"  -> query
))

sqlContext.sqlDBQuery(config)

使用 Azure AD 身份验证从 Spark 进行连接Connect from Spark using Azure AD authentication

可以使用 Azure AD 身份验证连接到 Azure SQL 数据库和 SQL 托管实例。You can connect to Azure SQL Database and SQL Managed Instance using Azure AD authentication. Azure AD 身份验证可用于集中管理数据库用户的标识,并替代 SQL Server 身份验证。Use Azure AD authentication to centrally manage identities of database users and as an alternative to SQL Server authentication.

使用 ActiveDirectoryPassword 身份验证模式进行连接Connecting using ActiveDirectoryPassword Authentication Mode

设置要求Setup requirement

如果使用 ActiveDirectoryPassword 身份验证模式,则需要下载 azure-activedirectory-library-for-java 及其依赖项,并将他它们包含在 Java 生成路径中。If you are using the ActiveDirectoryPassword authentication mode, you need to download azure-activedirectory-library-for-java and its dependencies, and include them in the Java build path.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"            -> "mysqlserver.database.chinacloudapi.cn",
  "databaseName"   -> "MyDatabase",
  "user"           -> "username",
  "password"       -> "*********",
  "authentication" -> "ActiveDirectoryPassword",
  "encrypt"        -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

使用访问令牌进行连接Connecting using an access token

设置要求Setup requirement

如果使用基于访问令牌的身份验证模式,则需要下载 azure-activedirectory-library-for-java 及其依赖项,并将他它们包含在 Java 生成路径中。If you are using the access token-based authentication mode, you need to download azure-activedirectory-library-for-java and its dependencies, and include them in the Java build path.

若要了解如何获取对 Azure SQL 数据库或 Azure SQL 托管实例中数据库的访问令牌,请参阅使用 Azure Active Directory 身份验证进行身份验证See Use Azure Active Directory Authentication for authentication to learn how to get an access token to your database in Azure SQL Database or Azure SQL Managed Instance.

import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

val config = Config(Map(
  "url"                   -> "mysqlserver.database.chinacloudapi.cn",
  "databaseName"          -> "MyDatabase",
  "accessToken"           -> "access_token",
  "hostNameInCertificate" -> "*.database.chinacloudapi.cn",
  "encrypt"               -> "true"
))

val collection = sqlContext.read.sqlDB(config)
collection.show()

通过批量插入操作写入数据Write data using bulk insert

传统的 jdbc 连接器使用逐行插入的方式将数据写入数据库。The traditional jdbc connector writes data into your database using row-by-row insertion. 可以使用 Spark 连接器,以批量插入的方式将数据写入 Azure SQL 和 SQL Server。You can use the Spark connector to write data to Azure SQL and SQL Server using bulk insert. 在加载大型数据集或将数据加载到使用列存储索引的表中时,该方式显着提高了写入性能。It significantly improves the write performance when loading large data sets or loading data into tables where a column store index is used.

import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._

/**
  Add column Metadata.
  If not specified, metadata is automatically added
  from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)

val bulkCopyConfig = Config(Map(
  "url"               -> "mysqlserver.database.chinacloudapi.cn",
  "databaseName"      -> "MyDatabase",
  "user"              -> "username",
  "password"          -> "*********",
  "dbTable"           -> "dbo.Clients",
  "bulkCopyBatchSize" -> "2500",
  "bulkCopyTableLock" -> "true",
  "bulkCopyTimeout"   -> "600"
))

df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.

后续步骤Next steps

如果尚未下载连接器,请从 azure-sqldb-spark GitHub 存储库下载 Spark 连接器,并浏览存储库中的其他资源:If you haven't already, download the Spark connector from azure-sqldb-spark GitHub repository and explore the additional resources in the repo:

此外,还可能需要查看 Apache Spark SQL, DataFrames, and Datasets Guide(Apache Spark SQL、DataFrame 和数据集指南)。You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide .