适用于 Python 的 Databricks Connect 的高级用法
注意
本文介绍适用于 Databricks Runtime 14.0 及更高版本的 Databricks Connect。
本文介绍的主题超出了 Databricks Connect 的基本设置。
配置 Spark Connect 连接字符串
除了使用“配置与群集的连接”中所述的选项连接到群集之外,更高级的选项是使用 Spark Connect 连接字符串进行连接。 可在 remote
函数中传递该字符串,也可设置 SPARK_REMOTE
环境变量。
注意
只能使用 Databricks 个人访问令牌身份验证通过 Spark Connect 连接字符串进行连接。
使用 remote
函数设置连接字符串:
# Set the Spark Connect connection string in DatabricksSession.builder.remote.
from databricks.connect import DatabricksSession
workspace_instance_name = retrieve_workspace_instance_name()
token = retrieve_token()
cluster_id = retrieve_cluster_id()
spark = DatabricksSession.builder.remote(
f"sc://{workspace_instance_name}:443/;token={token};x-databricks-cluster-id={cluster_id}"
).getOrCreate()
或者,设置 SPARK_REMOTE
环境变量:
sc://<workspace-instance-name>:443/;token=<access-token-value>;x-databricks-cluster-id=<cluster-id>
然后如下所示初始化 DatabricksSession
类:
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
Pyspark shell
适用于 Python 的 Databricks Connect 附带一个 pyspark
二进制文件,该二级制文件是配置为使用 Databricks Connect 的 PySpark REPL (Spark shell)。
在没有其他参数的情况下启动时,shell 会从环境中获取默认凭据(例如 DATABRICKS_
环境变量或 DEFAULT
配置文件)以连接到 Azure Databricks 群集。 有关配置连接的信息,请参阅 Databricks Connect 的计算配置。
若要启动 Spark shell 并将其连接到正在运行的群集,请从已激活的 Python 虚拟环境运行以下命令之一:
pyspark
此时会显示 Spark shell,例如:
Python 3.10 ... [Clang ...] on darwin Type "help", "copyright", "credits" or "license" for more information. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 13.x.dev0 /_/ Using Python version 3.10 ... Client connected to the Spark Connect server at sc://...:.../;token=...;x-databricks-cluster-id=... SparkSession available as 'spark'. >>>
shell 启动后,
spark
对象可以在 Databricks 群集上运行 Apache Spark 命令。 运行简单的 PySpark 命令,例如spark.range(1,10).show()
。 如果未出现错误,则表示连接成功。有关如何配合使用 Spark shell 和 Python 在计算上运行命令的信息,请参阅使用 Spark Shell 进行交互式分析。
使用内置的
spark
变量表示正在运行的群集上的SparkSession
,例如:>>> df = spark.read.table("samples.nyctaxi.trips") >>> df.show(5) +--------------------+---------------------+-------------+-----------+----------+-----------+ |tpep_pickup_datetime|tpep_dropoff_datetime|trip_distance|fare_amount|pickup_zip|dropoff_zip| +--------------------+---------------------+-------------+-----------+----------+-----------+ | 2016-02-14 16:52:13| 2016-02-14 17:16:04| 4.94| 19.0| 10282| 10171| | 2016-02-04 18:44:19| 2016-02-04 18:46:00| 0.28| 3.5| 10110| 10110| | 2016-02-17 17:13:57| 2016-02-17 17:17:55| 0.7| 5.0| 10103| 10023| | 2016-02-18 10:36:07| 2016-02-18 10:41:45| 0.8| 6.0| 10022| 10017| | 2016-02-22 14:14:41| 2016-02-22 14:31:52| 4.51| 17.0| 10110| 10282| +--------------------+---------------------+-------------+-----------+----------+-----------+ only showing top 5 rows
所有 Python 代码都在本地运行,而涉及 DataFrame 操作的 PySpark 代码在远程 Azure Databricks 工作区中的群集上运行,运行响应发送回给本地调用方。
若要停止 Spark shell,请按
Ctrl + d
或Ctrl + z
,或者运行命令quit()
或exit()
。
其他 HTTP 标头
Databricks Connect 通过 HTTP/2 使用 gRPC 与 Databricks 群集通信。
某些高级用户可能选择在客户端和 Azure Databricks 群集之间安装代理服务,以便更好地控制来自其客户端的请求。
在某些情况下,代理可能需要 HTTP 请求中的自定义标头。
headers()
方法可用于将自定义标头添加到其 HTTP 请求。
spark = DatabricksSession.builder.header('x-custom-header', 'value').getOrCreate()
Certificates
如果群集依赖于自定义 SSL/TLS 证书来解析 Azure Databricks 工作区完全限定的域名 (FQDN),则必须在本地开发计算机上设置环境变量 GRPC_DEFAULT_SSL_ROOTS_FILE_PATH
。 此环境变量必须设置为群集上已安装证书的完整路径。
例如,在 Python 代码中设置此环境变量,如下所示:
import os
os.environ["GRPC_DEFAULT_SSL_ROOTS_FILE_PATH"] = "/etc/ssl/certs/ca-bundle.crt"
有关设置环境变量的其他方式,请参阅操作系统的文档。
日志记录和调试日志
适用于 Python 的 Databricks Connect 使用标准 Python 日志记录生成日志。
日志将发送到标准错误流 (stderr),默认情况下,仅发出 WARN 级别和更高级别的日志。
设置环境变量 SPARK_CONNECT_LOG_LEVEL=debug
将修改此默认值,并打印 DEBUG
级别及更高级别的所有日志消息。