适用于 Python 的 Databricks SQL 连接器

适用于 Python 的 Databricks SQL 连接器是一个 Python 库,让你能够使用 Python 代码在 Azure Databricks 群集和 Databricks SQL 仓库上运行 SQL 命令。 相比类似的 Python 库(如 pyodbc),适用于 Python 的 Databricks SQL 连接器更易于设置和使用。 此库遵循 PEP 249 - Python 数据库 API 规范 v2.0

注意

Python 的 Databricks SQL 连接器还包括适用于 Azure Databricks 的 SQLAlchemy 方言。 请参阅将 SQLAlchemy 与 Azure Databricks 配合使用

要求

  • 运行 Python >=3.8 和<=3.11 的开发计算机。
  • Databricks 建议使用 Python 虚拟环境,例如 python 随附的 venv 提供的环境。 虚拟环境有助于确保同时使用正确版本的 Python 和适用于 Python 的 Databricks SQL 连接器。 设置和使用虚拟环境不在本文的讨论范围之内。 有关详细信息,请参阅创建虚拟环境
  • 现有群集SQL 仓库

开始使用

  • 通过运行 pip install databricks-sql-connectorpython -m pip install databricks-sql-connector,在开发计算机上安装适用于 Python 的 Databricks SQL 连接器库。

  • 收集想要使用的群集或 SQL 仓库的以下信息:

    群集

    SQL 仓库

    • SQL 仓库的服务器主机名。 从 SQL 仓库的“连接详细信息”选项卡的“服务器主机名”值中可以获取此主机名。
    • SQL 仓库的 HTTP 路径。 从 SQL 仓库的“连接详细信息”选项卡的“HTTP 路径”值中可以获取此路径。

身份验证

适用于 Python 的 Databricks SQL 连接器支持以下 Azure Databricks 身份验证类型:

适用于 Python 的 Databricks SQL 连接器尚不支持以下 Azure Databricks 身份验证类型:

Databricks 个人访问令牌身份验证

要将适用于 Python 的 Databricks SQL 连接器与 Azure Databricks 个人访问令牌身份验证配合使用,你必须先创建一个 Azure Databricks 个人访问令牌,如下所示:

  1. 在 Azure Databricks 工作区中,单击顶部栏中 Azure Databricks 用户名,然后从下拉列表中选择“用户设置”。
  2. 单击“开发人员”。
  3. 在“访问令牌”旁边,单击“管理”。
  4. 单击“生成新令牌”。
  5. (可选)输入有助于将来识别此令牌的注释,并将令牌的默认生存期更改为 90 天。 若要创建没有生存期的令牌(不建议),请将“生存期(天)”框留空(保留空白)。
  6. 单击“生成” 。
  7. 将显示的令牌复制到安全位置,然后单击“完成”。

注意

请务必将复制的令牌保存到安全的位置。 请勿与他人共享复制的令牌。 如果丢失了复制的令牌,你将无法重新生成完全相同的令牌, 而必须重复此过程来创建新令牌。 如果丢失了复制的令牌,或者认为令牌已泄露,Databricks 强烈建议通过单击“访问令牌”页上令牌旁边的垃圾桶(撤销)图标立即从工作区中删除该令牌。

如果你无法在工作区中创建或使用令牌,可能是因为工作区管理员已禁用令牌或未授予你创建或使用令牌的权限。 请与工作区管理员联系,或参阅以下内容:

要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME,设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。
  • DATABRICKS_TOKEN,设置为 Azure Databricks 个人访问令牌。

若要设置环境变量,请参阅操作系统的文档。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...

OAuth 用户到计算机 (U2M) 身份验证

适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 用户到计算机 (U2M) 身份验证。 还必须安装用于 Python 0.19.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdkpython -m pip install databricks-sdk)。

要使用 OAuth U2M 身份验证对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 OAuth U2M 身份验证使用实时人工登录和同意对目标 Azure Databricks 用户帐户进行身份验证。 此代码片段假定你已设置以下环境变量:

  • DATABRICKS_SERVER_HOSTNAME,设置为你的群集或 SQL 仓库的服务器主机名值。
  • DATABRICKS_HTTP_PATH,设置为你的群集或 SQL 仓库的 HTTP 路径值。

若要设置环境变量,请参阅操作系统的文档。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 auth_type       = "databricks-oauth") as connection:
# ...

示例

以下代码示例演示如何使用用于 Python 的 Databricks SQL 连接器来查询和插入数据、查询元数据、管理游标和连接,以及配置日志记录。

注意

以下代码示例演示如何使用 Azure Databricks 个人访问令牌进行身份验证。 若要改用其他可用的 Azure Databricks 身份验证类型,请参阅身份验证

这些代码示例从以下环境变量中检索它们的 server_hostnamehttp_pathaccess_token 连接变量值:

  • DATABRICKS_SERVER_HOSTNAME,表示要求中的“服务器主机名”值。
  • DATABRICKS_HTTP_PATH,表示要求中的“HTTP 路径”值。
  • DATABRICKS_TOKEN,表示要求中的访问令牌。

可以使用其他方法来检索这些连接变量值。 使用环境变量只是众多方法中的一种。

查询数据

以下代码示例演示如何调用适用于 Python 的 Databricks SQL 连接器在群集或 SQL 仓库上运行基本 SQL 命令。 此命令返回 samples 目录的 nyctaxi 架构中的trips 表中的前两行。

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
    result = cursor.fetchall()

    for row in result:
      print(row)

插入数据

以下示例演示如何插入少量数据(数千行):

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")

    squares = [(i, i * i) for i in range(100)]
    values = ",".join([f"({x}, {y})" for (x, y) in squares])

    cursor.execute(f"INSERT INTO squares VALUES {values}")

    cursor.execute("SELECT * FROM squares LIMIT 10")

    result = cursor.fetchall()

    for row in result:
      print(row)

要插入大量数据,请先将数据上传到云存储,然后执行 COPY INTO 命令。

查询元数据

可通过一些专用方法检索元数据。 以下示例检索有关示例表中的列的元数据:

from databricks import sql
import os

with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:

  with connection.cursor() as cursor:
    cursor.columns(schema_name="default", table_name="squares")
    print(cursor.fetchall())

管理游标和连接

最好关闭不再使用的任何连接和游标。 这可以释放 Azure Databricks 群集和 Databricks SQL 仓库上的资源。

可以使用上下文管理器(在前面示例中使用的 with 语法)来管理资源,或显式调用 close

from databricks import sql
import os

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())

cursor.close()
connection.close()

配置日志记录

Databricks SQL 连接器使用 Python 的标准日志记录模块。 可如下配置日志记录级别:

from databricks import sql
import os, logging

logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
                    level    = logging.DEBUG)

connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))

cursor = connection.cursor()

cursor.execute("SELECT * from range(10)")

result = cursor.fetchall()

for row in result:
   logging.debug(row)

cursor.close()
connection.close()

API 参考

程序包

databricks-sql-connector

使用情况:pip install databricks-sql-connector

另请参阅 Python 包索引 (PyPI) 中的 databricks-sql-connector

模块

databricks.sql

使用情况:from databricks import sql

方法

connect 方法

创建与数据库的连接。

返回一个 Connection 对象。

参数
server_hostname

类型:str

群集或 SQL 仓库的服务器主机名。 要获取服务器主机名,请参阅本文前面部分的说明。

此参数是必需的。

示例: adb-1234567890123456.7.databricks.azure.cn
http_path

类型:str

群集或 SQL 仓库的 HTTP 路径。 要获取 HTTP 路径,请参阅本文前面部分的说明。

此参数是必需的。

例如:
sql/protocolv1/o/1234567890123456/1234-567890-test123 适用于群集。
SQL 仓库的 /sql/1.0/warehouses/a1b234c567d8e9fa
access_tokenauth_type

类型:str

有关 Azure Databricks 身份验证设置的信息。 有关详细信息,请参阅身份验证
session_configuration

类型:dict[str, Any]

Spark 会话配置参数的字典。 设置一个等效于使用 SET key=val SQL 命令的配置。 运行 SQL 命令 SET -v 可以获取可用配置的完整列表。

默认为 None

此参数是可选的。

示例: {"spark.sql.variable.substitute": True}
http_headers

类型:List[Tuple[str, str]]]

在客户端发出的每个 RPC 请求的 HTTP 标头中设置的其他(键、值)对。 典型用法不会设置任何额外的 HTTP 标头。 默认为 None

此参数是可选的。

从版本 2.0 开始
catalog

类型:str

用于连接的初始目录。 默认为 None(在这种情况下,将使用默认目录,通常为 hive_metastore)。

此参数是可选的。

从版本 2.0 开始
架构

类型:str

用于连接的初始架构。 默认为 None(在这种情况下,将使用默认架构 default)。

此参数是可选的。

从版本 2.0 开始
use_cloud_fetch

类型:bool

True 将提取请求直接发送到云对象存储以下载数据区块。 False 默认将提取请求直接发送到 Azure Databricks。

如果 use_cloud_fetch 设置为 True 但网络访问被阻止,则提取请求将失败。

从版本 2.8 开始

Connection

表示与数据库的连接。

方法
close 方法

关闭与数据库的连接,并释放服务器上所有关联的资源。 对此连接的任何其他调用都将引发 Error

无参数。

没有返回值。

cursor 方法

返回一种机制,该机制允许遍历数据库中的各种记录。

无参数。

返回一个 Cursor 对象。

Cursor

属性
arraysize 属性

fetchmany 方法一起使用,指定内部缓冲区大小,该大小也是一次性从服务器实际提取的行数。 默认值是 10000。 对于窄结果(在结果中每行未包含大量数据),应增大该值以提高性能。

读写访问。

description 属性

包含 tuple 对象的 Python list。 每个 tuple 对象都包含 7 个值,并且每个 tuple 对象的前 2 个项目包含如下所示的描述单个结果列的信息:

  • name:列的名称。
  • type_code:表示列的类型的字符串。 例如,整数列的类型代码为 int

每个 7 项目 tuple 对象的剩余 5 个项目未实现,并且其值未定义。 它们通常返回 4 个 None 值,后跟一个 True 值。

只读访问。

方法
cancel 方法

中断运行游标启动的任何数据库查询或命令。 要释放服务器上关联的资源,请在调用 cancel 方法后,调用 close 方法。

无参数。

没有返回值。

close 方法

关闭游标并释放服务器上关联的资源。 关闭已关闭的游标可能会引发错误。

无参数。

没有返回值。

execute 方法

准备并运行数据库查询或命令。

无返回值。

参数
operation

类型:str

要准备并运行的查询或命令。

此参数是必需的。

不使用 parameters 参数的示例:


cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE pickup_zip="10019" LIMIT 2'
)

不使用 parameters 参数的示例:


cursor.execute(
'SELECT * FROM samples.nyctaxi.trips WHERE zip=%(pickup_zip)s LIMIT 2',
{ 'pickup_zip': '10019' }
)
参数

类型:字典

要与 operation 参数一起使用的参数序列。

此参数是可选的。 默认为 None
executemany 方法

使用 seq_of_parameters 实参中的所有形参序列来准备并运行数据库查询或命令。 仅保留最终结果集。

无返回值。

参数
operation

类型:str

要准备并运行的查询或命令。

此参数是必需的。
seq_of_parameters

类型:dictlist


operation 参数一起使用的多个参数集的序列。

此参数是必需的。
catalogs 方法

执行有关目录的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。 结果集中的重要字段包括:

  • 字段名称:TABLE_CAT。 键入:str。 目录的名称。

无参数。

无返回值。

从版本 1.0 开始

schemas 方法

执行有关架构的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。 结果集中的重要字段包括:

  • 字段名称:TABLE_SCHEM。 键入:str。 架构的名称。
  • 字段名称:TABLE_CATALOG。 键入:str。 架构所属的目录。

无返回值。

从版本 1.0 开始

参数
catalog_name

类型:str

要检索其相关信息的目录名称。 % 字符解释为通配符。

此参数是可选的。
schema_name

类型:str

要检索其相关信息的架构名称。 % 字符解释为通配符。

此参数是可选的。
tables 方法

执行有关表和视图的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。 结果集中的重要字段包括:

  • 字段名称:TABLE_CAT。 键入:str。 表所属的目录。
  • 字段名称:TABLE_SCHEM。 键入:str。 表所属的架构。
  • 字段名称:TABLE_NAME。 键入:str。 表的名称。
  • 字段名称:TABLE_TYPE。 键入:str。 关系类型,例如 VIEWTABLE(适用于 Databricks Runtime 10.2 和更高版本以及 Databricks SQL;对更低版本的 Databricks Runtime 使用会返回空字符串)。

无返回值。

从版本 1.0 开始

参数
catalog_name

类型:str

要检索其相关信息的目录名称。 % 字符解释为通配符。

此参数是可选的。
schema_name

类型:str

要检索其相关信息的架构名称。 % 字符解释为通配符。

此参数是可选的。
table_name

类型:str

要检索其相关信息的表名称。 % 字符解释为通配符。

此参数是可选的。
table_types

类型:List[str]

要匹配的表类型列表,例如 TABLEVIEW

此参数是可选的。
columns 方法

执行有关列的元数据查询。 然后应使用 fetchmanyfetchall 获取实际结果。 结果集中的重要字段包括:

  • 字段名称:TABLE_CAT。 键入:str。 列所属的目录。
  • 字段名称:TABLE_SCHEM。 键入:str。 列所属的架构。
  • 字段名称:TABLE_NAME。 键入:str。 列所属的表的名称。
  • 字段名称:COLUMN_NAME。 键入:str。 列的名称。

无返回值。

从版本 1.0 开始

参数
catalog_name

类型:str

要检索其相关信息的目录名称。 % 字符解释为通配符。

此参数是可选的。
schema_name

类型:str

要检索其相关信息的架构名称。 % 字符解释为通配符。

此参数是可选的。
table_name

类型:str

要检索其相关信息的表名称。 % 字符解释为通配符。

此参数是可选的。
column_name

类型:str

要检索其相关信息的列名称。 % 字符解释为通配符。

此参数是可选的。
fetchall 方法

获取查询的所有(或所有剩余)行。

无参数。

Row 对象的 Python list 形式返回查询的所有(或所有剩余)行。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

fetchmany 方法

获取查询的后续行。

Row 对象的 Python list 的形式返回查询后续行中 size 行及以前的行(如果没有指定 size,则返回 arraysize 属性)。 如果要提取的行数少于 size,将返回所有剩余的行。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

参数
大小

类型:int

要获取的后续行数。

此参数是可选的。 如果未指定,则使用 arraysize 属性的值。

示例: cursor.fetchmany(10)
fetchone 方法

获取数据集的下一行。

无参数。

以 Python tuple 对象的形式返回数据集的下一行,如果没有更多可用数据,则返回 None

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

fetchall_arrow 方法

以 PyArrow Table 对象的形式获取查询的所有(或所有剩余)行。 返回大量数据的查询应改用 fetchmany_arrow,以减少内存消耗。

无参数。

以 PyArrow 表的形式返回查询的所有(或所有剩余)行。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

从版本 2.0 开始

fetchmany_arrow 方法

以 PyArrow Table 对象的形式获取查询后续行。

以 Python PyArrow Table 对象的形式返回查询后续行中 size 参数行及以前的行(如果没有指定 size,则返回 arraysize 属性)。

如果之前对 execute 方法的调用未返回任何数据或尚未进行 execute 调用,则会引发 Error

从版本 2.0 开始

参数
大小

类型:int

要获取的后续行数。

此参数是可选的。 如果未指定,则使用 arraysize 属性的值。

示例: cursor.fetchmany_arrow(10)

Row

行类是一个类似于元组的数据结构,表示单个结果行。 如果行包含名为 "my_column" 的列,则你可以通过 row.my_column 访问 row"my_column" 字段。 还可以使用数字索引来访问字段,例如 row[0]。 如果不允许将列名称用作属性方法名称(例如,它以数字开头),则可以将字段作为 row["1_my_column"] 来访问。

从版本 1.0 开始

方法
asDict 方法

返回行的字典表示形式,此值按字段名称编制索引。 如果存在重复的字段名称,则在字典中返回一个重复字段(但只有一个)。 未定义返回哪个重复字段。

无参数。

返回字段的 dict

类型转换

下表将 Apache Spark SQL 数据类型映射到其 Python 数据类型等效项。

Apache Spark SQL 数据类型 Python 数据类型
array numpy.ndarray
bigint int
binary bytearray
boolean bool
date datetime.date
decimal decimal.Decimal
double float
int int
map str
null NoneType
smallint int
string str
struct str
timestamp datetime.datetime
tinyint int

疑难解答

tokenAuthWrapperInvalidAccessToken: Invalid access token 消息

问题:运行代码时看到类似于 Error during request to server: tokenAuthWrapperInvalidAccessToken: Invalid access token 的消息。

可能的原因:传递给 access_token 的值不是有效的 Azure Databricks 个人访问令牌。

建议的解决方法:检查传递给 access_token 的值是否正确,然后重试。

gaierror(8, 'nodename nor servname provided, or not known') 消息

问题:运行代码时看到类似于 Error during request to server: gaierror(8, 'nodename nor servname provided, or not known') 的消息。

可能的原因:传递给 server_hostname 的值不是正确的主机名。

建议的解决方法:检查传递给 server_hostname 的值是否正确,然后重试。

有关查找服务器主机名的详细信息,请参阅获取 Azure Databricks 计算资源的连接详细信息

IpAclError 消息

问题:在运行代码期间,尝试使用 Azure Databricks 笔记本上的连接器时看到消息 Error during request to server: IpAclValidation

可能的原因:可能为 Azure Databricks 工作区启用了 IP 允许列表。 使用 IP 允许列表时,默认不允许从 Spark 群集连接回到控制平面。

建议的解决方法:请管理员将计算平面子网添加到 IP 允许列表

其他资源

有关详细信息,请参阅: