使用 HDInsight Spark 群集在 Azure SQL 数据库中读取和写入数据Use HDInsight Spark cluster to read and write data to Azure SQL database

了解如何将 Azure HDInsight 中的 Apache Spark 群集连接到 Azure SQL 数据库。Learn how to connect an Apache Spark cluster in Azure HDInsight with an Azure SQL Database. 然后,在 SQL 数据库中读取、写入和流式传输数据。Then read, write, and stream data into the SQL database. 本文中的说明使用 Jupyter Notebook 运行 Scala 代码片段。The instructions in this article use a Jupyter Notebook to run the Scala code snippets. 但是,可以在 Scala 或 Python 中创建独立的应用程序,然后执行相同的任务。However, you can create a standalone application in Scala or Python and do the same tasks.

先决条件Prerequisites

  • Azure HDInsight Spark 群集Azure HDInsight Spark cluster. 遵照在 HDInsight 中创建 Apache Spark 群集中的说明。Follow the instructions at Create an Apache Spark cluster in HDInsight.

  • Azure SQL 数据库。Azure SQL Database. 按照在 Azure SQL 数据库中创建数据库中的说明进行操作。Follow the instructions at Create a database in Azure SQL Database. 确保使用示例 AdventureWorksLT 架构和数据创建数据库。Make sure you create a database with the sample AdventureWorksLT schema and data. 另外,请确保创建服务器级防火墙规则,以允许客户端的 IP 地址访问 SQL 数据库。Also, make sure you create a server-level firewall rule to allow your client's IP address to access the SQL database. 同一篇文章中提供了有关添加防火墙规则的说明。The instructions to add the firewall rule is available in the same article. 创建 SQL 数据库后,请确保准备好以下值。Once you've created your SQL database, make sure you keep the following values handy. 从 Spark 群集连接到数据库时需要这些值。You need them to connect to the database from a Spark cluster.

    • 服务器名称。Server name.
    • 数据库名称。Database name.
    • Azure SQL 数据库管理员用户名/密码。Azure SQL Database admin user name / password.
  • SQL Server Management Studio (SSMS)。SQL Server Management Studio (SSMS). 遵照使用 SSMS 连接和查询数据中的说明。Follow the instructions at Use SSMS to connect and query data.

创建 Jupyter NotebookCreate a Jupyter Notebook

首先,创建与 Spark 群集关联的 Jupyter Notebook。Start by creating a Jupyter Notebook associated with the Spark cluster. 到时要使用此 Notebook 来运行本文中所用的代码片段。You use this notebook to run the code snippets used in this article.

  1. Azure 门户网站打开群集。From the Azure portal, open your cluster.

  2. 选择右侧群集仪表板下方的 Jupyter notebookSelect Jupyter notebook underneath Cluster dashboards on the right side. 如果没有看到“群集仪表板”,请从左侧菜单中选择“概述”。If you don't see Cluster dashboards, select Overview from the left menu. 出现提示时,请输入群集的管理员凭据。If prompted, enter the admin credentials for the cluster.

    Spark 上的 Jupyter NotebookJupyter notebook on Spark

    备注

    也可以在浏览器中打开以下 URL 来访问 Spark 群集中的 Jupyter Notebook。You can also access the Jupyter notebook on Spark cluster by opening the following URL in your browser. CLUSTERNAME 替换为群集的名称:Replace CLUSTERNAME with the name of your cluster:

    https://CLUSTERNAME.azurehdinsight.cn/jupyter

  3. 在 Jupyter Notebook 的右上角,依次单击“新建”、“Spark”创建 Scala 笔记本。 In the Jupyter notebook, from the top-right corner, click New, and then click Spark to create a Scala notebook. HDInsight Spark 群集上的 Jupyter Notebook 还提供适用于 Python2 应用程序的 PySpark 内核,以及适用于 Python3 应用程序的 PySpark3 内核。Jupyter notebooks on HDInsight Spark cluster also provide the PySpark kernel for Python2 applications, and the PySpark3 kernel for Python3 applications. 本文将会创建 Scala 笔记本。For this article, we create a Scala notebook.

    Spark 上适用于 Jupyter Notebook 的内核Kernels for Jupyter notebook on Spark

    要深入了解这些内核,请参阅 在 HDInsight 中将 Jupyter 笔记本内核与 Apache Spark 群集配合使用For more information about the kernels, see Use Jupyter notebook kernels with Apache Spark clusters in HDInsight.

    备注

    本文使用 Spark (Scala) 内核,因为目前只有 Scala 和 Java 才支持将数据从 Spark 流式传输到 SQL 数据库。In this article, we use a Spark (Scala) kernel because streaming data from Spark into SQL database is only supported in Scala and Java currently. 尽管可以使用 Python 在 SQL中 读取和写入数据,但为了保持一致,本文将使用 Scala 执行所有三个操作。Even though reading from and writing into SQL can be done using Python, for consistency in this article, we use Scala for all three operations.

  4. 将打开默认名称为“无标题”的新笔记本。A new notebook opens with a default name, Untitled. 单击笔记本名称,然后输入所选的名称。Click the notebook name and enter a name of your choice.

    提供笔记本的名称Provide a name for the notebook

现在可以开始创建应用程序。You can now start creating your application.

从 Azure SQL 数据库读取数据Read data from Azure SQL database

在本部分,我们要从 AdventureWorks 数据库中的某个表(例如 SalesLT.Address)读取数据。In this section, you read data from a table (for example, SalesLT.Address) that exists in the AdventureWorks database.

  1. 在新 Jupyter Notebook 的代码单元中,粘贴以下代码片段,并将占位符值替换为 Azure SQL 数据库的值。In a new Jupyter notebook, in a code cell, paste the following snippet and replace the placeholder values with the values for your Azure SQL database.

    // Declare the values for your Azure SQL database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.chinacloudapi.cn
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    SHIFT + ENTER 运行代码单元。Press SHIFT + ENTER to run the code cell.

  2. 使用以下代码片段生成可传递给 Spark 数据帧 API 的 JDBC URL。Use the snippet below to build a JDBC URL that you can pass to the Spark dataframe APIs. 此代码创建一个 Properties 对象来保存参数。The code creates a Properties object to hold the parameters. 粘贴代码单元中的代码片段,然后按 SHIFT + ENTER 运行。Paste the snippet in a code cell and press SHIFT + ENTER to run.

    import java.util.Properties
    
    val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.chinacloudapi.cn;loginTimeout=60;"
    val connectionProperties = new Properties()
    connectionProperties.put("user", s"${jdbcUsername}")
    connectionProperties.put("password", s"${jdbcPassword}")         
    
  3. 使用以下代码片段创建一个数据帧,其中包含 Azure SQL 数据库内某个表中的数据。Use the snippet below to create a dataframe with the data from a table in your Azure SQL Database. 此代码片段使用 AdventureWorksLT 数据库中包含的 SalesLT.Address 表。In this snippet, we use a SalesLT.Address table that is available as part of the AdventureWorksLT database. 粘贴代码单元中的代码片段,然后按 SHIFT + ENTER 运行。Paste the snippet in a code cell and press SHIFT + ENTER to run.

    val sqlTableDF = spark.read.jdbc(jdbc_url, "SalesLT.Address", connectionProperties)
    
  4. 现在,可以对数据帧执行操作,例如,获取数据架构:You can now do operations on the dataframe, such as getting the data schema:

    sqlTableDF.printSchema
    

    会显示如下图所示的输出:You see an output similar to the following image:

    架构输出schema output

  5. 还可以执行其他操作,例如检索前 10 行。You can also do operations like, retrieve the top 10 rows.

    sqlTableDF.show(10)
    
  6. 或者,从数据集检索特定的列。Or, retrieve specific columns from the dataset.

    sqlTableDF.select("AddressLine1", "City").show(10)
    

将数据写入 Azure SQL 数据库Write data into Azure SQL database

在本部分,我们使用群集上的示例 CSV 文件在 Azure SQL 数据库中创建一个表,并在其中填充数据。In this section, we use a sample CSV file available on the cluster to create a table in Azure SQL database and populate it with data. 该示例 CSV 文件 (HVAC.csv) 已在所有 HDInsight 群集上提供,路径为 HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csvThe sample CSV file (HVAC.csv) is available on all HDInsight clusters at HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv.

  1. 在新 Jupyter Notebook 的代码单元中,粘贴以下代码片段,并将占位符值替换为 Azure SQL 数据库的值。In a new Jupyter notebook, in a code cell, paste the following snippet and replace the placeholder values with the values for your Azure SQL database.

    // Declare the values for your Azure SQL database
    
    val jdbcUsername = "<SQL DB ADMIN USER>"
    val jdbcPassword = "<SQL DB ADMIN PWD>"
    val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.chinacloudapi.cn
    val jdbcPort = 1433
    val jdbcDatabase ="<AZURE SQL DB NAME>"
    

    SHIFT + ENTER 运行代码单元。Press SHIFT + ENTER to run the code cell.

  2. 以下代码片段生成可传递给 Spark 数据帧 API 的 JDBC URL。The following snippet builds a JDBC URL that you can pass to the Spark dataframe APIs. 此代码创建一个 Properties 对象来保存参数。The code creates a Properties object to hold the parameters. 粘贴代码单元中的代码片段,然后按 SHIFT + ENTER 运行。Paste the snippet in a code cell and press SHIFT + ENTER to run.

       import java.util.Properties
    
       val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.chinacloudapi.cn;loginTimeout=60;"
       val connectionProperties = new Properties()
       connectionProperties.put("user", s"${jdbcUsername}")
       connectionProperties.put("password", s"${jdbcPassword}")
    
  3. 使用以下代码片段提取 HVAC.csv 中的数据架构,并使用该架构将 CSV 中的数据载入数据帧 readDfUse the following snippet to extract the schema of the data in HVAC.csv and use the schema to load the data from the CSV in a dataframe, readDf. 粘贴代码单元中的代码片段,然后按 SHIFT + ENTER 运行。Paste the snippet in a code cell and press SHIFT + ENTER to run.

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readDf = spark.read.format("csv").schema(userSchema).load("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
    
  4. 使用 readDf 数据帧创建临时表 temphvactableUse the readDf dataframe to create a temporary table, temphvactable. 然后使用该临时表创建 Hive 表 hvactable_hiveThen use the temporary table to create a hive table, hvactable_hive.

    readDf.createOrReplaceTempView("temphvactable")
    spark.sql("create table hvactable_hive as select * from temphvactable")
    
  5. 最后,使用该 Hive 表在 Azure SQL 数据库中创建一个表。Finally, use the hive table to create a table in Azure SQL Database. 以下代码片段在 Azure SQL 数据库中创建 hvactableThe following snippet creates hvactable in Azure SQL Database.

    spark.table("hvactable_hive").write.jdbc(jdbc_url, "hvactable", connectionProperties)
    
  6. 使用 SSMS 连接到 Azure SQL 数据库,并确认其中是否显示了 dbo.hvactableConnect to the Azure SQL Database using SSMS and verify that you see a dbo.hvactable there.

    a.a. 启动 SSMS,然后提供以下屏幕截图中所示的连接详细信息,以连接到 Azure SQL 数据库。Start SSMS and connect to the Azure SQL Database by providing connection details as shown in the screenshot below.

    使用 SSMS1 连接到 SQL 数据库Connect to SQL database using SSMS1

    b.b. 在对象资源管理器中,展开 Azure SQL 数据库和“表”节点,查看创建的 dbo.hvactable 。From Object Explorer, expand the Azure SQL Database and the Table node to see the dbo.hvactable created.

    使用 SSMS2 连接到 SQL 数据库Connect to SQL database using SSMS2

  7. 在 SSMS 中运行查询以查看表中的列。Run a query in SSMS to see the columns in the table.

    SELECT * from hvactable
    

将数据流式传输到 Azure SQL 数据库Stream data into Azure SQL database

在本部分,我们要将数据流式传输到上一部分已创建的 hvactableIn this section, we stream data into the hvactable that you created in the previous section.

  1. 首先,请确保 hvactable 中没有记录。As a first step, make sure there are no records in the hvactable. 使用 SSMS 针对该表运行以下查询。Using SSMS, run the following query on the table.

    TRUNCATE TABLE [dbo].[hvactable]
    
  2. 在 HDInsight Spark 群集上创建一个新的 Jupyter Notebook。Create a new Jupyter notebook on the HDInsight Spark cluster. 在代码单元中粘贴以下代码片段,然后按 SHIFT + ENTERIn a code cell, paste the following snippet and then press SHIFT + ENTER:

    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming._
    import java.sql.{Connection,DriverManager,ResultSet}
    
  3. 我们要将数据从 HVAC.csv 流式传输到 hvactableWe stream data from the HVAC.csv into the hvactable. HVAC.csv 文件已在群集上提供,其路径为 /HdiSamples/HdiSamples/SensorSampleData/HVAC/HVAC.csv file is available on the cluster at /HdiSamples/HdiSamples/SensorSampleData/HVAC/. 在以下代码片段中,我们先获取要流式传输的数据的架构。In the following snippet, we first get the schema of the data to be streamed. 然后,使用该架构创建流数据帧。Then, we create a streaming dataframe using that schema. 粘贴代码单元中的代码片段,然后按 SHIFT + ENTER 运行。Paste the snippet in a code cell and press SHIFT + ENTER to run.

    val userSchema = spark.read.option("header", "true").csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv").schema
    val readStreamDf = spark.readStream.schema(userSchema).csv("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/") 
    readStreamDf.printSchema
    
  4. 输出显示 HVAC.csv 的架构。The output shows the schema of HVAC.csv. hvactable 也有相同的架构。The hvactable has the same schema as well. 输出列出表中的列。The output lists the columns in the table.

    HDInsight Apache Spark 架构表hdinsight Apache Spark schema table

  5. 最后,使用以下代码片段从 HVAC.csv 读取数据,并将其流式传输到 Azure SQL 数据库中的 hvactableFinally, use the following snippet to read data from the HVAC.csv and stream it into the hvactable in Azure SQL Database. 在代码单元中粘贴该代码片段,并将占位符值替换为 Azure SQL 数据库的值,然后按 Shift+Enter 运行。Paste the snippet in a code cell, replace the placeholder values with the values for your Azure SQL Database, and then press SHIFT + ENTER to run.

       val WriteToSQLQuery  = readStreamDf.writeStream.foreach(new ForeachWriter[Row] {
          var connection:java.sql.Connection = _
          var statement:java.sql.Statement = _
    
          val jdbcUsername = "<SQL DB ADMIN USER>"
          val jdbcPassword = "<SQL DB ADMIN PWD>"
          val jdbcHostname = "<SQL SERVER NAME HOSTING SDL DB>" //typically, this is in the form or servername.database.chinacloudapi.cn
          val jdbcPort = 1433
          val jdbcDatabase ="<AZURE SQL DB NAME>"
          val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
          val jdbc_url = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.chinacloudapi.cn;loginTimeout=30;"
    
         def open(partitionId: Long, version: Long):Boolean = {
           Class.forName(driver)
           connection = DriverManager.getConnection(jdbc_url, jdbcUsername, jdbcPassword)
           statement = connection.createStatement
           true
         }
    
         def process(value: Row): Unit = {
           val Date  = value(0)
           val Time = value(1)
           val TargetTemp = value(2)
           val ActualTemp = value(3)
           val System = value(4)
           val SystemAge = value(5)
           val BuildingID = value(6)  
    
           val valueStr = "'" + Date + "'," + "'" + Time + "'," + "'" + TargetTemp + "'," + "'" + ActualTemp + "'," + "'" + System + "'," + "'" + SystemAge + "'," + "'" + BuildingID + "'"
           statement.execute("INSERT INTO " + "dbo.hvactable" + " VALUES (" + valueStr + ")")   
           }
    
         def close(errorOrNull: Throwable): Unit = {
            connection.close
          }
         })
    
         var streamingQuery = WriteToSQLQuery.start()
    
  6. 通过在 SQL Server Management Studio (SSMS) 中运行以下查询,验证数据是否正在流式传输到 hvactableVerify that the data is being streamed into the hvactable by running the following query in SQL Server Management Studio (SSMS). 每次运行该查询时,它都会显示表中的行数已递增。Every time you run the query, it shows the number of rows in the table increasing.

    SELECT COUNT(*) FROM hvactable
    

后续步骤Next steps