适用于 Apache Spark 的 Azure 数据资源管理器连接器Azure Data Explorer Connector for Apache Spark

Apache Spark 是用于大规模数据处理的统一分析引擎。Apache Spark is a unified analytics engine for large-scale data processing. Azure 数据资源管理器是一个快速、完全托管的数据分析服务,可用于实时分析大量数据。Azure Data Explorer is a fast, fully managed data analytics service for real-time analysis on large volumes of data.

适用于 Spark 的 Azure 数据资源管理器连接器是可在任何 Spark 群集上运行的开源项目The Azure Data Explorer connector for Spark is an open source project that can run on any Spark cluster. 它实现了用于跨 Azure 数据资源管理器和 Spark 群集移动数据的数据源和数据接收器。It implements data source and data sink for moving data across Azure Data Explorer and Spark clusters. 使用 Azure 数据资源管理器和 Apache Spark,可以构建面向数据驱动型方案的可缩放快速应用程序。Using Azure Data Explorer and Apache Spark, you can build fast and scalable applications targeting data driven scenarios. 例如,机器学习 (ML)、提取-转换-加载 (ETL) 和 Log Analytics。For example, machine learning (ML), Extract-Transform-Load (ETL), and Log Analytics. 有了此连接器,Azure 数据资源管理器变成了标准 Spark 源和接收器操作(例如写入、读取和 writeStream)的有效数据存储。With the connector, Azure Data Explorer becomes a valid data store for standard Spark source and sink operations, such as write, read, and writeStream.

你可以采用批处理模式或流式处理模式向 Azure 数据资源管理器进行写入。You can write to Azure Data Explorer in either batch or streaming mode. 从 Azure 数据资源管理器中读取支持列删除和谓词下推,这可在 Azure 数据资源管理器中筛选数据,从而减少传输的数据量。Reading from Azure Data Explorer supports column pruning and predicate pushdown, which filters the data in Azure Data Explorer, reducing the volume of transferred data.

本主题介绍了如何安装和配置 Azure 数据资源管理器 Spark 连接器,以及如何在 Azure 数据资源管理器与 Apache Spark 群集之间移动数据。This topic describes how to install and configure the Azure Data Explorer Spark connector and move data between Azure Data Explorer and Apache Spark clusters.

备注

尽管下面的某些示例提到了 Azure Databricks Spark 群集,但 Azure 数据资源管理器 Spark 连接器并不直接依赖于 Databricks 或任何其他 Spark 分发版。Although some of the examples below refer to an Azure Databricks Spark cluster, Azure Data Explorer Spark connector does not take direct dependencies on Databricks or any other Spark distribution.

先决条件Prerequisites

提示

也支持版本 2.3.x,不过,这可能需要在 pom.xml 依赖项中进行某些更改。2.3.x versions are also supported, but may require some changes in pom.xml dependencies.

如何生成 Spark 连接器How to build the Spark connector

备注

此步骤是可选的。This step is optional. 如果使用的是预生成库,请转到 Spark 群集设置If you are using pre-built libraries go to Spark cluster setup.

生成先决条件Build prerequisites

  1. 安装依赖项中列出的库,包括以下 Kusto Java SDK 库:Install the libraries listed in dependencies including the following Kusto Java SDK libraries:

  2. 参考此源来生成 Spark 连接器。Refer to this source for building the Spark Connector.

  3. 对于使用 Maven 项目定义的 Scala/Java 应用程序,请将应用程序链接到以下项目(在最新版本中可能不同):For Scala/Java applications using Maven project definitions, link your application with the following artifact (latest version may differ):

       <dependency>
         <groupId>com.microsoft.azure</groupId>
         <artifactId>spark-kusto-connector</artifactId>
         <version>1.1.0</version>
       </dependency>
    

生成命令Build commands

生成 jar 并运行所有测试:To build jar and run all tests:

mvn clean package

生成 jar,运行所有测试,并将 jar 安装到本地 Maven 存储库:To build jar, run all tests, and install jar to your local Maven repository:

mvn clean install

有关详细信息,请参阅连接器用法For more information, see connector usage.

Spark 群集设置Spark cluster setup

备注

建议使用最新的 Azure 数据资源管理器 Spark 连接器版本执行以下步骤。It's recommended to use the latest Azure Data Explorer Spark connector release when performing the following steps.

  1. 使用 Spark 2.4.4 和 Scala 2.11,基于 Azure Databricks 群集配置以下 Spark 群集设置:Configure the following Spark cluster settings, based on Azure Databricks cluster using Spark 2.4.4 and Scala 2.11:

    Databricks 群集设置

  2. 从 Maven 安装最新 spark-kusto-connector 库:Install the latest spark-kusto-connector library from Maven:

    导入库 选择 Spark-Kusto-ConnectorImport libraries Select Spark-Kusto-Connector

  3. 验证是否已安装所有必需的库:Verify that all required libraries are installed:

    验证是否已安装库

  4. 对于使用 JAR 文件的安装,请验证是否安装了其他依赖项:For installation using a JAR file, verify that additional dependencies were installed:

    添加依赖项

身份验证Authentication

使用 Azure 数据资源管理器 Spark 连接器,可以使用下列方法之一通过 Azure Active Directory (Azure AD) 进行身份验证:Azure Data Explorer Spark connector enables you to authenticate with Azure Active Directory (Azure AD) using one of the following methods:

Azure AD 应用程序身份验证Azure AD application authentication

Azure AD 应用程序身份验证是最简单且最常用的身份验证方法,建议将其用于 Azure 数据资源管理器 Spark 连接器。Azure AD application authentication is the simplest and most common authentication method and is recommended for the Azure Data Explorer Spark connector.

属性Properties 选项字符串Option String 说明Description
KUSTO_AAD_APP_IDKUSTO_AAD_APP_ID kustoAadAppIdkustoAadAppId Azure AD 应用程序(客户端)标识符。Azure AD application (client) identifier.
KUSTO_AAD_AUTHORITY_IDKUSTO_AAD_AUTHORITY_ID kustoAadAuthorityIDkustoAadAuthorityID Azure AD 身份验证颁发机构。Azure AD authentication authority. Azure AD 目录(租户)ID。Azure AD Directory (tenant) ID.
KUSTO_AAD_APP_SECRETKUSTO_AAD_APP_SECRET kustoAadAppSecretkustoAadAppSecret 客户端的 Azure AD 应用程序密钥。Azure AD application key for the client.

备注

较旧的 API 版本(低于 2.0.0)有以下命名:“kustoAADClientID”、“kustoClientAADClientPassword”、“kustoAADAuthorityID”Older API versions (less than 2.0.0) have the following naming: "kustoAADClientID", "kustoClientAADClientPassword", "kustoAADAuthorityID"

Azure 数据资源管理器权限Azure Data Explorer privileges

必须在 Azure 数据资源管理器群集上授予以下权限:Grant the following privileges on an Azure Data Explorer cluster:

  • 对于读取操作(数据源),Azure AD 标识必须对目标数据库拥有“查看者”特权,或者对目标表拥有“管理员”特权。 For reading (data source), the Azure AD identity must have viewer privileges on the target database, or admin privileges on the target table.
  • 对于写入操作(数据接收器),Azure AD 标识必须对目标数据库拥有“引入者”特权。 For writing (data sink), the Azure AD identity must have ingestor privileges on the target database. 此外,它必须对目标数据库拥有“用户”特权,这样才能创建新表。 It must also have user privileges on the target database to create new tables. 如果目标表已存在,则必须配置对目标表的“管理员”权限。 If the target table already exists, you must configure admin privileges on the target table.

有关 Azure 数据资源管理器主体角色的详细信息,请参阅基于角色的授权For more information on Azure Data Explorer principal roles, see role-based authorization. 有关如何管理安全角色,请参阅安全角色管理For managing security roles, see security roles management.

Spark 接收器:写入 Azure 数据资源管理器Spark sink: writing to Azure Data Explorer

  1. 设置接收器参数:Set up sink parameters:

    val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId")
    val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey")
    
    val appId = KustoSparkTestAppId
    val appKey = KustoSparkTestAppKey
    val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com
    val cluster = "Sparktest.chinanorth2"
    val database = "TestDb"
    val table = "StringAndIntTable"
    
  2. 将 Spark 数据帧分批写入 Azure 数据资源管理器群集:Write Spark DataFrame to Azure Data Explorer cluster as batch:

    import com.microsoft.kusto.spark.datasink.KustoSinkOptions
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    df.write
      .format("com.microsoft.kusto.spark.datasource")
      .option(KustoSinkOptions.KUSTO_CLUSTER, cluster)
      .option(KustoSinkOptions.KUSTO_DATABASE, database)
      .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark")
      .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId)
      .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey)
      .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId)
      .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist")
      .mode(SaveMode.Append)
      .save()  
    

    或者使用简化的语法:Or use the simplified syntax:

         import com.microsoft.kusto.spark.datasink.SparkIngestionProperties
         import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
         val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed
         df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
    
  3. 写入流数据:Write streaming data:

    import org.apache.spark.sql.streaming.Trigger
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeUnit
    import org.apache.spark.sql.streaming.Trigger
    
    // Set up a checkpoint and disable codeGen. 
    spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint")
    
    // Write to a Kusto table from a streaming source
    val kustoQ = df
          .writeStream
          .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider")
          .options(conf) 
          .option(KustoSinkOptions.KUSTO_WRITE_ENABLE_ASYNC, "true") // Optional, better for streaming, harder to handle errors
          .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database
          .start()
    

Spark 源:从 Azure 数据资源管理器读取数据Spark source: reading from Azure Data Explorer

  1. 读取少量数据时,可以定义数据查询:When reading small amounts of data, define the data query:

    import com.microsoft.kusto.spark.datasource.KustoSourceOptions
    import org.apache.spark.SparkConf
    import org.apache.spark.sql._
    import com.microsoft.azure.kusto.data.ClientRequestProperties
    
    val query = s"$table | where (ColB % 1000 == 0) | distinct ColA"
    val conf: Map[String, String] = Map(
          KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
          KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
        )
    
    val df = spark.read.format("com.microsoft.kusto.spark.datasource").
      options(conf).
      option(KustoSourceOptions.KUSTO_QUERY, query).
      option(KustoSourceOptions.KUSTO_DATABASE, database).
      option(KustoSourceOptions.KUSTO_CLUSTER, cluster).
      load()
    
    // Simplified syntax flavor
    import com.microsoft.kusto.spark.sql.extension.SparkExtension._
    
    val cpr: Option[ClientRequestProperties] = None // Optional
    val df2 = spark.read.kusto(cluster, database, query, conf, cpr)
    display(df2)
    
  2. 可选:如果提供暂时性 blob 存储(而不是 Azure 数据资源管理器),则由调用方负责创建 blob。Optional: If you provide the transient blob storage (and not Azure Data Explorer) the blobs are created are under the caller's responsibility. 这包括预配存储、轮换访问密钥以及删除暂时性项目。This includes provisioning the storage, rotating access keys, and deleting transient artifacts. KustoBlobStorageUtils 模块包含用于以下用途的帮助程序函数:基于帐户和容器坐标以及帐户凭据(或基于具有写入、读取和列出权限的完整 SAS URL)删除 blob。The KustoBlobStorageUtils module contains helper functions for deleting blobs based on either account and container coordinates and account credentials, or a full SAS URL with write, read and list permissions. 当不再需要相应的 RDD 时,每个事务会将暂时性 blob 项目存储在一个单独的目录中。When the corresponding RDD is no longer needed, each transaction stores transient blob artifacts in a separate directory. 此目录是作为 Spark 驱动程序节点上报告的读取事务信息日志的一部分捕获的。This directory is captured as part of read-transaction information logs reported on the Spark Driver node.

    // Use either container/account-key/account name, or container SaS
    val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer")
    val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey")
    val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName")
    // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
    

    在上面的示例中,无法使用连接器接口访问 Key Vault;使用了一种更简单的方法,即使用 Databricks 机密。In the example above, the Key Vault isn't accessed using the connector interface; a simpler method of using the Databricks secrets is used.

  3. 从 Azure 数据资源管理器进行读取。Read from Azure Data Explorer.

    • 如果提供暂时性 blob 存储,请如下所示从 Azure 数据资源管理器进行读取:If you provide the transient blob storage, read from Azure Data Explorer as follows:

       val conf3 = Map(
            KustoSourceOptions.KUSTO_AAD_APP_ID -> appId,
            KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey
            KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas)
      val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3)
      
      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      
    • 如果 Azure 数据资源管理器提供暂时性 blob 存储,请如下所示从 Azure 数据资源管理器进行读取:If Azure Data Explorer provides the transient blob storage, read from Azure Data Explorer as follows:

      val dfFiltered = df2
        .where(df2.col("ColA").startsWith("row-2"))
        .filter("ColB > 12")
        .filter("ColB <= 21")
        .select("ColA")
      
      display(dfFiltered)
      

后续步骤Next steps