Databricks ConnectDatabricks Connect

通过 Databricks Connect,可将喜欢的 IDE(IntelliJ、Eclipse、PyCharm、RStudio 和 Visual Studio)、笔记本服务器(Zeppelin 和 Jupyter)和其他自定义应用程序连接到 Azure Databricks 群集,并运行 Apache Spark 代码。Databricks Connect allows you to connect your favorite IDE (IntelliJ, Eclipse, PyCharm, RStudio, Visual Studio), notebook server (Zeppelin, Jupyter), and other custom applications to Azure Databricks clusters and run Apache Spark code.

本文将介绍 Databricks Connect 的工作原理,引导你完成 Databricks Connect 的入门步骤,阐述如何解决使用 Databricks Connect 时可能出现的问题,还将介绍使用 Databricks Connect 运行与在 Azure Databricks 笔记本中运行之间的区别。This article explains how Databricks Connect works, walks you through the steps to get started with Databricks Connect, explains how to troubleshoot issues that may arise when using Databricks Connect, and differences between running using Databricks Connect versus running in an Azure Databricks notebook.

概述Overview

Databricks Connect 是一个适用于 Apache Spark 的客户端库。Databricks Connect is a client library for Apache Spark. 借助它,你可使用 Spark 本机 API 编写作业,并让它们在 Azure Databricks 群集上(而不是在本地 Spark 会话中)远程执行。It allows you to write jobs using Spark native APIs and have them execute remotely on an Azure Databricks cluster instead of in the local Spark session.

例如,使用 Databricks Connect 运行 DataFrame 命令 spark.read.parquet(...).groupBy(...).agg(...).show() 时,作业的分析和计划任务将在本地计算机上运行。For example, when you run the DataFrame command spark.read.parquet(...).groupBy(...).agg(...).show() using Databricks Connect, the parsing and planning of the job runs on your local machine. 然后,作业的逻辑表示形式会发送到在 Azure Databricks 中运行的 Spark 服务器,以便在群集中执行。Then, the logical representation of the job is sent to the Spark server running in Azure Databricks for execution in the cluster.

可使用 Databricks Connect 执行以下操作:With Databricks Connect, you can:

  • 从任何 Python、Java、Scala 或 R 应用程序运行大规模的 Spark 作业。Run large-scale Spark jobs from any Python, Java, Scala, or R application. 现可在任何能执行 import pysparkimport org.apache.sparkrequire(SparkR) 的位置直接从应用程序运行 Spark 作业,无需安装任何 IDE 插件,也无需使用 Spark 提交脚本。Anywhere you can import pyspark, import org.apache.spark, or require(SparkR), you can now run Spark jobs directly from your application, without needing to install any IDE plugins or use Spark submission scripts.
  • 即使使用远程群集,也要在 IDE 中单步执行和调试代码。Step through and debug code in your IDE even when working with a remote cluster.
  • 开发库时快速循环访问。Iterate quickly when developing libraries. 在 Databricks Connect 中更改 Python 或 Java 库依赖项后,无需重启群集,这是因为群集中的每个客户端会话彼此隔离。You do not need to restart the cluster after changing Python or Java library dependencies in Databricks Connect, because each client session is isolated from each other in the cluster.
  • 关闭空闲群集而不丢失工作。Shut down idle clusters without losing work. 客户端应用程序与群集是分离的,因此它不受群集重启或升级的影响,这通常会导致你丢失笔记本中定义的所有变量、RDD 和 DataFrame 对象。Because the client application is decoupled from the cluster, it is unaffected by cluster restarts or upgrades, which would normally cause you to lose all the variables, RDDs, and DataFrame objects defined in a notebook.

要求Requirements

备注

在 Databricks Runtime 7.1 中,Databricks 建议始终使用 Databricks Connect 的最新版本。In Databricks Runtime 7.1, Databricks recommends that you always use the most recent version of Databricks Connect.

  • 客户端 Python 安装的次要版本必须与 Azure Databricks 群集的 Python 次要版本(3.5、3.6 或 3.7)相同。The minor version of your client Python installation must be the same as the minor Python version of your Azure Databricks cluster (3.5, 3.6, or 3.7). Databricks Runtime 5.5 LTS 具有 Python 3.5,Databricks Runtime 5.5 LTS ML 具有 Python 3.6,Databricks Runtime 6.1(及更高版本)和 Databricks Runtime 6.1 ML(及更高版本)具有 Python 3.7。Databricks Runtime 5.5 LTS has Python 3.5, Databricks Runtime 5.5 LTS ML has Python 3.6, and Databricks Runtime 6.1 and above and Databricks Runtime 6.1 ML and above have Python 3.7.

    例如,如果在本地开发环境中使用 Conda,并且群集运行的是 Python 3.5,则必须使用该版本创建一个环境,例如:For example, if you’re using Conda on your local development environment and your cluster is running Python 3.5, you must create an environment with that version, for example:

    conda create --name dbconnect python=3.5
    
  • Java 8。Java 8. 客户端不支持 Java 11。The client does not support Java 11.

设置客户端Set up client

步骤 1:安装客户端Step 1: Install the client

  1. 卸载 PySpark。Uninstall PySpark.

    pip uninstall pyspark
    
  2. 安装 Databricks Connect 客户端。Install the Databricks Connect client.

    pip install -U databricks-connect==5.5.*  # or 6.*.* or 7.1.* to match your cluster version. 6.1-6.6 and 7.1 are supported
    

步骤 2:配置连接属性Step 2: Configure connection properties

  1. 收集以下配置属性:Collect the following configuration properties:

    • URL工作区 URLURL: A workspace URL.

    • 用户令牌: 个人访问令牌User token: A personal access token.

    • 群集 ID: 你创建的群集的 ID。Cluster ID: The ID of the cluster you created. 可从 URL 获取群集 ID。You can obtain the cluster ID from the URL. 此处的群集 ID 是 1108-201635-xxxxxxxxHere the cluster ID is 1108-201635-xxxxxxxx.

      群集 IDCluster ID

    • 组织 IDOrganization ID. 每个工作区都具有唯一的组织 ID。Every workspace has a unique organization ID. 请参阅获取工作区、群集、笔记本、模型和作业标识符See Get workspace, cluster, notebook, model, and job identifiers.

    • 端口:Databricks Connect 连接到的端口。Port: The port that Databricks Connect connects to. 默认端口为 15001The default port is 15001. 如果将群集配置为使用其他端口,例如在之前的 Azure Databricks 说明中提供的 8787,则使用所配置的端口号。If your cluster is configured to use a different port, such as 8787 which was given in previous instructions for Azure Databricks, use the configured port number.

  2. 配置连接。Configure the connection. 可使用 CLI、SQL 配置或环境变量。You can use the CLI, SQL configs, or environment variables. 配置方法的优先级从高到低为:SQL 配置密钥、CLI 和环境变量。The precedence of configuration methods from highest to lowest is: SQL config keys, CLI, and environment variables.

    • CLICLI

      1. 运行 databricks-connectRun databricks-connect.

        databricks-connect configure
        

        许可证显示:The license displays:

        Copyright (2018) Databricks, Inc.
        
        This library (the "Software") may not be used except in connection with the
        Licensee's use of the Databricks Platform Services pursuant to an Agreement
          ...
        
      2. 接受许可证并提供配置值。Accept the license and supply configuration values.

        Do you accept the above agreement? [y/N] y
        Set new config values (leave input empty to accept default):
        Databricks Host [no current value, must start with https://]: <databricks-url>
        Databricks Token [no current value]: <databricks-token>
        Cluster ID (e.g., 0921-001415-jelly628) [no current value]: <cluster-id>
        Org ID (Azure-only, see ?o=orgId in URL) [0]: <org-id>
        Port [15001]: <port>
        
    • SQL 配置或环境变量。SQL configs or environment variables. 设置 SQL 配置密钥(例如 sql("set config=value"))和环境变量,如下所示:Set SQL config keys (for example, sql("set config=value")) and environment variables as follows:

      参数Parameter SQL 配置密钥SQL config key 环境变量名称Environment variable name
      Databricks 主机Databricks host spark.databricks.service.addressspark.databricks.service.address DATABRICKS_ADDRESSDATABRICKS_ADDRESS
      Databricks 令牌Databricks token spark.databricks.service.tokenspark.databricks.service.token DATABRICKS_API_TOKENDATABRICKS_API_TOKEN
      群集 IDCluster ID spark.databricks.service.clusterIdspark.databricks.service.clusterId DATABRICKS_CLUSTER_IDDATABRICKS_CLUSTER_ID
      组织 IDOrg ID spark.databricks.service.orgIdspark.databricks.service.orgId DATABRICKS_ORG_IDDATABRICKS_ORG_ID
      端口Port spark.databricks.service.portspark.databricks.service.port DATABRICKS_PORT(仅限版本高于 5.4 的 Databricks Runtime)DATABRICKS_PORT (Databricks Runtime > 5.4 only)

      重要

      建议不要在 SQL 配置中放置令牌。We do not recommend putting tokens in SQL configurations.

  3. 测试与 Azure Databricks 之间的连接。Test connectivity to Azure Databricks.

    databricks-connect test
    

    如果配置的群集未运行,测试将启动群集,该群集将保持运行,直到其配置的自动终止时间为止。If the cluster you configured is not running, the test starts the cluster which will remain running until its configured autotermination time. 输出应类似于以下内容:The output should be something like:

    * PySpark is installed at /.../3.5.6/lib/python3.5/site-packages/pyspark
    * Checking java version
    java version "1.8.0_152"
    Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
    Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
    * Testing scala command
    18/12/10 16:38:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    18/12/10 16:38:50 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    18/12/10 16:39:53 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
    18/12/10 16:39:59 WARN SparkServiceRPCClient: Syncing 129 files (176036 bytes) took 3003 ms
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
          /_/
    
    Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_152)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    scala> spark.range(100).reduce(_ + _)
    Spark context Web UI available at https://10.8.5.214:4040
    Spark context available as 'sc' (master = local[*], app id = local-1544488730553).
    Spark session available as 'spark'.
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    View job details at <databricks-url>?o=0#/setting/clusters/<cluster-id>/sparkUi
    res0: Long = 4950
    
    scala> :quit
    
    * Testing python command
    18/12/10 16:40:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    18/12/10 16:40:17 WARN MetricsSystem: Using default name SparkStatusTracker for source because neither spark.metrics.namespace nor spark.app.id is set.
    18/12/10 16:40:28 WARN SparkServiceRPCClient: Now tracking server state for 5abb7c7e-df8e-4290-947c-c9a38601024e, invalidating prev state
    View job details at <databricks-url>/?o=0#/setting/clusters/<cluster-id>/sparkUi
    

设置 IDE 或笔记本服务器Set up your IDE or notebook server

本部分将介绍如何配置首选 IDE 或笔记本服务器来使用 Databricks Connect 客户端。The section describes how to configure your preferred IDE or notebook server to use the Databricks Connect client.

本节内容:In this section:

JupyterJupyter

Databricks Connect 配置脚本会自动将包添加到项目配置中。The Databricks Connect configuration script automatically adds the package to your project configuration. 若要在 Python 内核中开始操作,请运行:To get started in a Python kernel, run:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

若要启用 %sql 速记来运行和直观呈现 SQL 查询,请使用以下代码片段:To enable the %sql shorthand for running and visualizing SQL queries, use the following snippet:

from IPython.core.magic import line_magic, line_cell_magic, Magics, magics_class

@magics_class
class DatabricksConnectMagics(Magics):

   @line_cell_magic
   def sql(self, line, cell=None):
       if cell and line:
           raise ValueError("Line must be empty for cell magic", line)
       try:
           from autovizwidget.widget.utils import display_dataframe
       except ImportError:
           print("Please run `pip install autovizwidget` to enable the visualization widget.")
           display_dataframe = lambda x: x
       return display_dataframe(self.get_spark().sql(cell or line).toPandas())

   def get_spark(self):
       user_ns = get_ipython().user_ns
       if "spark" in user_ns:
           return user_ns["spark"]
       else:
           from pyspark.sql import SparkSession
           user_ns["spark"] = SparkSession.builder.getOrCreate()
           return user_ns["spark"]

ip = get_ipython()
ip.register_magics(DatabricksConnectMagics)

PyCharmPyCharm

Databricks Connect 配置脚本会自动将包添加到项目配置中。The Databricks Connect configuration script automatically adds the package to your project configuration.

Python 3 群集Python 3 clusters

  1. 转到“运行”>“编辑配置”。Go to Run > Edit Configurations.

  2. PYSPARK_PYTHON=python3 添加为环境变量。Add PYSPARK_PYTHON=python3 as an environment variable.

    Python 3 群集配置Python 3 cluster configuration

SparkR 和 RStudio Desktop SparkR and RStudio Desktop

  1. 下载开源 Spark 并将它解压到本地计算机。Download and unpack the open source Spark onto your local machine. 选择与 Azure Databricks 群集 (Hadoop 2.7) 中相同的版本。Choose the same version as in your Azure Databricks cluster (Hadoop 2.7).

  2. 运行 databricks-connect get-jar-dirRun databricks-connect get-jar-dir. 此命令会返回类似 /usr/local/lib/python3.5/dist-packages/pyspark/jars 的路径。This command returns a path like /usr/local/lib/python3.5/dist-packages/pyspark/jars. 复制 JAR 目录文件路径上方的一个目录的文件路径,例如 /usr/local/lib/python3.5/dist-packages/pyspark(即 SPARK_HOME 目录)。Copy the file path of one directory above the JAR directory file path, for example, /usr/local/lib/python3.5/dist-packages/pyspark, which is the SPARK_HOME directory.

  3. 配置 Spark lib 路径和 Spark home,方式是将它们添加到 R 脚本的顶部。Configure the Spark lib path and Spark home by adding them to the top of your R script. <spark-lib-path> 设置为在步骤 1 中解压缩开源 Spark 包的目录。Set <spark-lib-path> to the directory where you unpacked the open source Spark package in step 1. <spark-home-path> 设置为步骤 2 中的 Databricks Connect 目录。Set <spark-home-path> to the Databricks Connect directory from step 2.

    # Point to the OSS package path, e.g., /path/to/.../spark-2.4.0-bin-hadoop2.7
    library(SparkR, lib.loc = .libPaths(c(file.path('<spark-lib-path>', 'R', 'lib'), .libPaths())))
    
    # Point to the Databricks Connect PySpark installation, e.g., /path/to/.../pyspark
    Sys.setenv(SPARK_HOME = "<spark-home-path>")
    
  4. 启动 Spark 会话,开始运行 SparkR 命令。Initiate a Spark session and start running SparkR commands.

    sparkR.session()
    
    df <- as.DataFrame(faithful)
    head(df)
    
    df1 <- dapply(df, function(x) { x }, schema(df))
    collect(df1)
    

sparklyr 和 RStudio Desktop sparklyr and RStudio Desktop

重要

此功能目前以公共预览版提供。This feature is in Public Preview.

备注

可复制使用 Databricks Connect 在本地开发的依赖 sparklyr 的代码,并在 Azure Databricks 笔记本或在 Azure Databricks 工作区中的托管 RStudio Server 中运行该代码,只需很少的代码更改或无需代码更改即可实现。You can copy sparklyr-dependent code that you’ve developed locally using Databricks Connect and run it in an Azure Databricks notebook or hosted RStudio Server in your Azure Databricks workspace with minimal or no code changes.

本节内容:In this section:

要求Requirements

  • sparklyr 1.2 或更高版本。sparklyr 1.2 or above.
  • 带有 Databricks Connect 6.4.1 的 Databricks Runtime 6.4。Databricks Runtime 6.4 with Databricks Connect 6.4.1.

安装、配置和使用 sparklyrInstall, configure, and use sparklyr

  1. 在 RStudio Desktop 中,从 CRAN 安装 sparklyr 1.2 或更高版本,或者从 GitHub 安装最新的主版本。In RStudio Desktop, install sparklyr 1.2 or above from CRAN or install the latest master version from GitHub.

    # Install from CRAN
    install.packages("sparklyr")
    
    # Or install the latest master version from GitHub
    install.packages("devtools")
    devtools::install_github("sparklyr/sparklyr")
    
  2. 激活安装了 databricks-connect==6.4.1 的 Python 环境,并在终端中运行以下命令来获取 <spark-home-path>Activate the Python environment with databricks-connect==6.4.1 installed and run the following command in the terminal to get the <spark-home-path>:

    databricks-connect get-spark-home
    
  3. 启动 Spark 会话,开始运行 sparklyr 命令。Initiate a Spark session and start running sparklyr commands.

    library(sparklyr)
    sc <- spark_connect(method = "databricks", spark_home = "<spark-home-path>")
    
    iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
    
    library(dplyr)
    src_tbls(sc)
    
    iris_tbl %>% count
    
  4. 关闭连接。Close the connection.

    spark_disconnect(sc)
    

资源Resources

有关详细信息,请参阅 sparklyr GitHub READMEFor more information, see the sparklyr GitHub README.

有关代码示例,请参阅 sparklyrFor code examples, see sparklyr.

sparklyr 和 RStudio Desktop 限制sparklyr and RStudio Desktop limitations

不支持以下功能:The following features are unsupported:

  • sparklyr streaming APIsparklyr streaming APIs
  • sparklyr ML APIsparklyr ML APIs
  • broom APIbroom APIs
  • csv_file serialization modecsv_file serialization mode
  • spark submitspark submit

IntelliJ(Scala 或 Java)IntelliJ (Scala or Java)

  1. 运行 databricks-connect get-jar-dirRun databricks-connect get-jar-dir.

  2. 将依赖项指向从命令返回的目录。Point the dependencies to the directory returned from the command. 转到“文件”>“项目结构”>“模块”>“依赖项”>“+ 符号”>“JAR 或目录”。Go to File > Project Structure > Modules > Dependencies > ‘+’ sign > JARs or Directories.

    IntelliJ JARIntelliJ JARs

    为避免冲突,强烈建议从类路径中删除其他所有 Spark 安装项。To avoid conflicts, we strongly recommend removing any other Spark installations from your classpath. 如果没法删除,请确保添加的 JAR 置于类路径的前面。If this is not possible, make sure that the JARs you add are at the front of the classpath. 特别是,它们必须在其他所有已安装的 Spark 版本之前(否则将使用其他 Spark 版本并在本地运行,或者将引发 ClassDefNotFoundError)。In particular, they must be ahead of any other installed version of Spark (otherwise you will either use one of those other Spark versions and run locally or throw a ClassDefNotFoundError).

  3. 检查 IntelliJ 中分类选项的设置。Check the setting of the breakout option in IntelliJ. 默认设置为“全部”;如果设置调试断点,则将导致网络超时。The default is All and will cause network timeouts if you set breakpoints for debugging. 将其设置为“线程”,以避免停止后台网络线程。Set it to Thread to avoid stopping the background network threads.

    IntelliJ 线程IntelliJ Thread

EclipseEclipse

  1. 运行 databricks-connect get-jar-dirRun databricks-connect get-jar-dir.

  2. 将外部 JAR 配置指向从命令返回的目录。Point the external JARs configuration to the directory returned from the command. 转到“项目菜单”>“属性”>“Java 生成路径”>“库”>“添加外部 JAR”。Go to Project menu > Properties > Java Build Path > Libraries > Add External Jars.

    Eclipse 外部 JAR 配置Eclipse external JAR configuration

    为避免冲突,强烈建议从类路径中删除其他所有 Spark 安装项。To avoid conflicts, we strongly recommend removing any other Spark installations from your classpath. 如果没法删除,请确保添加的 JAR 置于类路径的前面。If this is not possible, make sure that the JARs you add are at the front of the classpath. 特别是,它们必须在其他所有已安装的 Spark 版本之前(否则将使用其他 Spark 版本并在本地运行,或者将引发 ClassDefNotFoundError)。In particular, they must be ahead of any other installed version of Spark (otherwise you will either use one of those other Spark versions and run locally or throw a ClassDefNotFoundError).

    Eclipse Spark 配置Eclipse Spark configuration

Visual Studio CodeVisual Studio Code

  1. 验证是否已安装 Python 扩展Verify that the Python extension is installed.

  2. 打开命令面板(在 macOS 上使用 Command+Shift+P,在 Windows/Linux 使用 Ctrl+Shift+P )。Open the the Command Palette (Command+Shift+P on macOS and Ctrl+Shift+P on Windows/Linux).

  3. 选择 Python 解释器。Select a Python interpreter. 转到“代码”>“首选项”>“设置”,然后选择“Python 设置” 。Go to Code > Preferences > Settings, and choose python settings.

  4. 运行 databricks-connect get-jar-dirRun databricks-connect get-jar-dir.

  5. 将从命令返回的目录添加到 python.venvPath 下的用户设置 JSON 中。Add the directory returned from the command to the User Settings JSON under python.venvPath. 应将此内容添加到 Python 配置中。This should be added to the Python Configuration.

  6. 禁用 Linter。Disable the linter. 点击右侧的“..."Click the on the right side ,然后编辑 JSON 设置and edit json settings. 修改后的设置如下所示:The modified settings are as follows:

    VS Code 配置VS Code configuration

  7. 如果使用虚拟环境运行(若要在 VS Code 中开发 Python,则建议使用此环境),请在命令面板中键入 select python interpreter,并指向与群集 Python 版本匹配的环境。If running with a virtual environment, which is the recommended way to develop for Python in VS Code, in the Command Palette type select python interpreter and point to your environment that matches your cluster Python version.

    选择 Python 解释器Select Python interpreter

    例如,如果群集是 Python 3.5,则本地环境也应该是 Python 3.5。For example, if your cluster is Python 3.5, your local environment should be Python 3.5.

    Python 版本Python version

SBTSBT

若要使用 SBT,则必须配置 build.sbt 文件,使其针对 Databricks Connect JAR(而不是通常的 Spark 库依赖项)进行链接。To use SBT, you must configure your build.sbt file to link against the Databricks Connect JARs instead of the usual Spark library dependency. 使用以下示例生成文件中的 unmanagedBase 指令执行此操作,该文件假定 Scala 应用具有 com.example.Test 主对象:You do this with the unmanagedBase directive in the following example build file, which assumes a Scala app that has a com.example.Test main object:

build.sbt

name := "hello-world"
version := "1.0"
scalaVersion := "2.11.6"
// this should be set to the path returned by ``databricks-connect get-jar-dir``
unmanagedBase := new java.io.File("/usr/local/lib/python2.7/dist-packages/pyspark/jars")
mainClass := Some("com.example.Test")

从 IDE 运行示例Run examples from your IDE

JavaJava

import org.apache.spark.sql.SparkSession;

public class HelloWorld {

  public static void main(String[] args) {
    System.out.println("HelloWorld");
    SparkSession spark = SparkSession
      .builder()
      .master("local")
      .getOrCreate();

    System.out.println(spark.range(100).count());
    // The Spark code will execute on the Azure Databricks cluster.
  }
}

PythonPython

from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.getOrCreate()

print("Testing simple count")

# The Spark code will execute on the Azure Databricks cluster.
print(spark.range(100).count())

ScalaScala

import org.apache.spark.sql.SparkSession

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
        .master("local")
      .getOrCreate()
  println(spark.range(100).count())
  // The Spark code will execute on the Azure Databricks cluster.
  }
}

使用依赖项Work with dependencies

通常,主类或 Python 文件将具有其他依赖项 JAR 和文件。Typically your main class or Python file will have other dependency JARs and files. 可通过调用 sparkContext.addJar("path-to-the-jar")sparkContext.addPyFile("path-to-the-file") 来添加这类依赖项 JAR 和文件。You can add such dependency JARs and files by calling sparkContext.addJar("path-to-the-jar") or sparkContext.addPyFile("path-to-the-file"). 还可使用 addPyFile() 接口添加 Egg 文件和 zip 文件。You can also add Egg files and zip files with the addPyFile() interface. 每次在 IDE 中运行代码时,都会在群集上安装依赖项 JAR 和文件。Every time you run the code in your IDE, the dependency JARs and files are installed on the cluster.

PythonPython

from lib import Foo
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext
#sc.setLogLevel("INFO")

print("Testing simple count")
print(spark.range(100).count())

print("Testing addPyFile isolation")
sc.addPyFile("lib.py")
print(sc.parallelize(range(10)).map(lambda i: Foo(2)).collect())

class Foo(object):
  def __init__(self, x):
    self.x = x

Python + Java UDFPython + Java UDFs

from pyspark.sql import SparkSession
from pyspark.sql.column import _to_java_column, _to_seq, Column

## In this example, udf.jar contains compiled Java / Scala UDFs:
#package com.example
#
#import org.apache.spark.sql._
#import org.apache.spark.sql.expressions._
#import org.apache.spark.sql.functions.udf
#
#object Test {
#  val plusOne: UserDefinedFunction = udf((i: Long) => i + 1)
#}

spark = SparkSession.builder \
  .config("spark.jars", "/path/to/udf.jar") \
  .getOrCreate()
sc = spark.sparkContext

def plus_one_udf(col):
  f = sc._jvm.com.example.Test.plusOne()
  return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

sc._jsc.addJar("/path/to/udf.jar")
spark.range(100).withColumn("plusOne", plus_one_udf("id")).show()

ScalaScala

package com.example

import org.apache.spark.sql.SparkSession

case class Foo(x: String)

object Test {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      ...
      .getOrCreate();
    spark.sparkContext.setLogLevel("INFO")

    println("Running simple show query...")
    spark.read.parquet("/tmp/x").show()

    println("Running simple UDF query...")
    spark.sparkContext.addJar("./target/scala-2.11/hello-world_2.11-1.0.jar")
    spark.udf.register("f", (x: Int) => x + 1)
    spark.range(10).selectExpr("f(id)").show()

    println("Running custom objects query...")
    val objs = spark.sparkContext.parallelize(Seq(Foo("bye"), Foo("hi"))).collect()
    println(objs.toSeq)
  }
}

访问 DBUtilsAccess DBUtils

若要访问 dbutils.fsdbutils.secrets,可使用 Databricks 实用工具模块。To access dbutils.fs and dbutils.secrets, you use the Databricks Utilities module.

PythonPython

pip install six
from pyspark.dbutils import DBUtils

dbutils = DBUtils(spark.sparkContext)
print(dbutils.fs.ls("dbfs:/"))
print(dbutils.secrets.listScopes())

在 Python 上,若要以在本地和 Azure Databricks 群集中工作的方式访问 DBUtils 模块,请使用以下 get_dbutils()To access the DBUtils module in a way that works both locally and in Azure Databricks clusters, on Python, use the following get_dbutils():

def get_dbutils(spark):
  if spark.conf.get("spark.databricks.service.client.enabled") == "true":
    from pyspark.dbutils import DBUtils
    return DBUtils(spark)
  else:
    import IPython
    return IPython.get_ipython().user_ns["dbutils"]

ScalaScala

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

启用 dbutils.secrets.getEnabling dbutils.secrets.get

由于安全限制,需要从工作区获取特权授权令牌才能调用 dbutils.secrets.getDue to security restrictions, calling dbutils.secrets.get requires obtaining a privileged authorization token from your workspace. 这与 REST API 令牌不同,它以 dkea... 开头。This is different from your REST API token, and starts with dkea.... 首次运行 dbutils.secrets.get 时,系统会提示你如何获取特权令牌。The first time you run dbutils.secrets.get, you are prompted with instructions on how to obtain a privileged token. 使用 dbutils.secrets.setToken(token) 设置令牌,令牌的有效期为 48 小时。You set the token with dbutils.secrets.setToken(token), and it remains valid for 48 hours.

访问 Hadoop 文件系统Access the Hadoop filesystem

还可使用标准 Hadoop 文件系统接口直接访问 DBFS:You can also access DBFS directly using the standard Hadoop filesystem interface:

> import org.apache.hadoop.fs._

// get new DBFS connection
> val dbfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
dbfs: org.apache.hadoop.fs.FileSystem = com.databricks.backend.daemon.data.client.DBFS@2d036335

// list files
> dbfs.listStatus(new Path("dbfs:/"))
res1: Array[org.apache.hadoop.fs.FileStatus] = Array(FileStatus{path=dbfs:/$; isDirectory=true; ...})

// open file
> val stream = dbfs.open(new Path("dbfs:/path/to/your_file"))
stream: org.apache.hadoop.fs.FSDataInputStream = org.apache.hadoop.fs.FSDataInputStream@7aa4ef24

// get file contents as string
> import org.apache.commons.io._
> println(new String(IOUtils.toByteArray(stream)))

设置 Hadoop 配置Set Hadoop configurations

在客户端上,可使用 spark.conf.set API 设置 Hadoop 配置,该 API 适用于 SQL 和 DataFrame 操作。On the client you can set Hadoop configurations using the spark.conf.set API, which applies to SQL and DataFrame operations. sparkContext 上设置的 Hadoop 配置必须在群集配置中进行设置或使用笔记本。Hadoop configurations set on the sparkContext must be set in the cluster configuration or using a notebook. 这是因为在 sparkContext 上设置的配置没有绑定到用户会话,而是应用于整个群集。This is because configurations set on sparkContext are not tied to user sessions but apply to the entire cluster.

故障排除Troubleshooting

运行 databricks-connect test 以检查连接问题。Run databricks-connect test to check for connectivity issues. 本部分将介绍你可能遇到的一些常见问题及其各自的解决方案。This section describes some common issues you may encounter and how to resolve them.

Python 版本不匹配Python version mismatch

检查确保在本地使用的 Python 版本至少与群集上的版本具有相同的次要版本(例如,3.5.13.5.2 是正确的,3.53.6 不正确)。Check the Python version you are using locally has at least the same minor release as the version on the cluster (for example, 3.5.1 versus 3.5.2 is OK, 3.5 versus 3.6 is not).

如果本地安装了多个 Python 版本,请设置 PYSPARK_PYTHON 环境变量(例如 PYSPARK_PYTHON=python3),确保 Databricks Connect 使用的版本是正确的。If you have multiple Python versions installed locally, ensure that Databricks Connect is using the right one by setting the PYSPARK_PYTHON environment variable (for example, PYSPARK_PYTHON=python3).

未启用服务器Server not enabled

确保群集已使用 spark.databricks.service.server.enabled true 启用 Spark 服务器。Ensure the cluster has the Spark server enabled with spark.databricks.service.server.enabled true. 如果已启用,则应会在驱动程序日志中看到以下行:You should see the following lines in the driver log if it is:

18/10/25 21:39:18 INFO SparkConfUtils$: Set spark config:
spark.databricks.service.server.enabled -> true
...
18/10/25 21:39:21 INFO SparkContext: Loading Spark Service RPC Server
18/10/25 21:39:21 INFO SparkServiceRPCServer:
Starting Spark Service RPC Server
18/10/25 21:39:21 INFO Server: jetty-9.3.20.v20170531
18/10/25 21:39:21 INFO AbstractConnector: Started ServerConnector@6a6c7f42
{HTTP/1.1,[http/1.1]}{0.0.0.0:15001}
18/10/25 21:39:21 INFO Server: Started @5879ms

PySpark 安装存在冲突Conflicting PySpark installations

databricks-connect 包与 PySpark 冲突。The databricks-connect package conflicts with PySpark. 在 Python 中初始化 Spark 上下文时,安装这两种都会导致错误。Having both installed will cause errors when initializing the Spark context in Python. 这可能以多种方式显示出来,包括“流已损坏”或“未找到类”错误。This can manifest in several ways, including “stream corrupted” or “class not found” errors. 如果已在 Python 环境中安装 PySpark,请确保先卸载它,然后再安装 databricks-connect。If you have PySpark installed in your Python environment, ensure it is uninstalled before installing databricks-connect. 卸载 PySpark 后,请确保彻底重新安装 Databricks Connect 包:After uninstalling PySpark, make sure to fully re-install the Databricks Connect package:

pip uninstall pyspark
pip uninstall databricks-connect
pip install -U databricks-connect==5.5.*  # or 6.*.* or 7.1.* to match your cluster version. 6.1-6.6 and 7.1 are supported

SPARK_HOME 存在冲突Conflicting SPARK_HOME

如果你之前在计算机上使用过 Spark,则 IDE 可能配置为使用 Spark 的某个其他版本,而不是 Databricks Connect Spark。If you have previously used Spark on your machine, your IDE may be configured to use one of those other versions of Spark rather than the Databricks Connect Spark. 这可能以多种方式显示出来,包括“流已损坏”或“未找到类”错误。This can manifest in several ways, including “stream corrupted” or “class not found” errors. 可检查 SPARK_HOME 环境变量的值来查看正在使用哪个版本的 Spark:You can see which version of Spark is being used by checking the value of the SPARK_HOME environment variable:

JavaJava

System.out.println(System.getenv("SPARK_HOME"));

PythonPython

import os
print(os.environ['SPARK_HOME'])

ScalaScala

println(sys.env.get("SPARK_HOME"))

解决方法Resolution

如果设置 SPARK_HOME,使其使用的 Spark 版本与客户端中的版本不同,则应取消设置 SPARK_HOME 变量,然后重试。If SPARK_HOME is set to a version of Spark other than the one in the client, you should unset the SPARK_HOME variable and try again.

检查 IDE 环境变量设置,检查 .bashrc.zshrc.bash_profile 文件,还要检查可能设置了环境变量的其他所有位置。Check your IDE environment variable settings, your .bashrc, .zshrc, or .bash_profile file, and anywhere else environment variables might be set. 你很可能必须退出再重启 IDE 来清除旧状态;如果问题仍然存在,甚至可能需要创建新项目。You will most likely have to quit and restart your IDE to purge the old state, and you may even need to create a new project if the problem persists.

无需将 SPARK_HOME 设置为新值;取消设置就已足够。You should not need to set SPARK_HOME to a new value; unsetting it should be sufficient.

二进制文件的 PATH 项冲突或缺失Conflicting or Missing PATH entry for binaries

可能是这样配置 PATH 的:spark-shell 等命令将运行之前安装的其他某些二进制文件,而不是运行随附 Databricks Connect 提供的二进制文件。It is possible your PATH is configured so that commands like spark-shell will be running some other previously installed binary instead of the one provided with Databricks Connect. 这可能会导致 databricks-connect test 失败。This can cause databricks-connect test to fail. 应确保 Databricks Connect 二进制文件优先,或者删除之前安装的二进制文件。You should make sure either the Databricks Connect binaries take precedence, or remove the previously installed ones.

如果无法运行 spark-shell 之类的命令,也可能是 pip install 未自动设置 PATH,而且你需要将 bin dir 手动安装到 PATH 中。If you can’t run commands like spark-shell, it is also possible your PATH was not automatically set up by pip install and you’ll need to add the installation bin dir to your PATH manually. 即使未设置此项,也可将 Databricks Connect 与 IDE 一起使用。It’s possible to use Databricks Connect with IDEs even if this isn’t set up. 但是,databricks-connect test 命令将不起作用。However, the databricks-connect test command will not work.

群集上的序列化设置存在冲突Conflicting serialization settings on the cluster

如果在运行 databricks-connect test 时看到“流已损坏”错误,这可能是由于群集序列化配置不兼容而造成的。If you see “stream corrupted” errors when running databricks-connect test, this may be due to incompatible cluster serialization configs. 例如,设置 spark.io.compression.codec 配置可能会导致此问题。For example, setting the spark.io.compression.codec config can cause this issue. 若要解决此问题,请考虑从群集设置中删除这些配置,或在 Databricks Connect 客户端中设置配置。To resolve this issue, consider removing these configs from the cluster settings, or setting the configuration in the Databricks Connect client.

在 Windows 上找不到 winutils.exeCannot find winutils.exe on Windows

如果正在 Windows 上使用 Databricks Connect,请参阅:If you are using Databricks Connect on Windows and see:

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

按照说明在 Windows 上配置 Hadoop 路径Follow the instructions to configure the Hadoop path on Windows.

Windows 上的文件名、目录名称或卷标签语法不正确The filename, directory name, or volume label syntax is incorrect on Windows

如果正在 Windows 上使用 Databricks Connect,请参阅:If you are using Databricks Connect on Windows and see:

The filename, directory name, or volume label syntax is incorrect.

Java 或 Databricks Connect 已安装到路径中带有空格的目录中。Either Java or Databricks Connect was installed into a directory with a space in your path. 要解决此问题,可安装到不带空格的目录路径或使用短名称格式配置路径。You can work around this by either installing into a directory path without spaces, or configuring your path using the short name form.

限制Limitations

不支持以下 Azure Databricks 功能和第三方平台:The following Azure Databricks features and third-party platforms are unsupported:

  • 以下 Databricks 实用工具笔记本工作流小组件The following Databricks Utilities: library, notebook workflow, and widgets.
  • 结构化流。Structured Streaming.
  • 在远程群集上运行未包含在 Spark 作业中的任意代码。Running arbitrary code that is not a part of a Spark job on the remote cluster.
  • 用于 Delta 表操作的本机 Scala、Python 和 R API(例如 DeltaTable.forPath)。Native Scala, Python, and R APIs for Delta table operations (for example, DeltaTable.forPath). 但是,具有 Delta Lake 操作的 SQL API (spark.sql(...)) 和 Delta 表上的常规 Spark API(例如 spark.read.load)都受支持。However, the SQL API (spark.sql(...)) with Delta Lake operations and the regular Spark API (for example, spark.read.load) on Delta tables are both supported.
  • Apache Zeppelin 0.7.x 及更低版本。Apache Zeppelin 0.7.x and lower.