用于 Python 的 Databricks SQL 连接器是一个 Python 库,可用于使用 Python 代码在 Azure Databricks 全用途计算和 Databricks SQL 仓库上运行 SQL 命令。 相比类似的 Python 库(如 pyodbc),适用于 Python 的 Databricks SQL 连接器更易于设置和使用。 此库遵循 PEP 249 - Python 数据库 API 规范 v2.0。
重要
用于 Python 版本 3.0.0 及更高版本的 Databricks SQL 连接器支持本机参数化查询执行,这可以防止 SQL 注入并改进查询性能。 以前版本使用内联参数化执行,这对于 SQL 注入不安全,并且存在其他缺点。 有关详细信息,请参阅 “使用本机参数”。
用于 Python 的 Databricks SQL 连接器还支持 Azure Databricks SQLAlchemy 方言,但必须安装它才能使用这些功能。 请参阅将 SQLAlchemy 与 Azure Databricks 配合使用。
要求
- 运行 Python 3.8 及更高版本的开发计算机。
- Databricks 建议使用 Python 虚拟环境,例如 python 随附的 venv 提供的环境。 虚拟环境有助于确保同时使用正确版本的 Python 和适用于 Python 的 Databricks SQL 连接器。 设置和使用虚拟环境不在本文的讨论范围之内。 有关详细信息,请参阅创建虚拟环境。
- 现有的 全用途计算 或 SQL 仓库。
开始
- 安装用于 Python 的 Databricks SQL 连接器。 PyArrow 是用于 Python 的 Databricks SQL 连接器的可选依赖项,默认情况下在连接器版本 4.0.0 及更高版本中未安装。 如果未安装 PyArrow,CloudFetch 和其他 Apache Arrow 功能等功能不可用,这可能会影响大量数据的性能。 - 若要安装精简连接器,请使用: - pip install databricks-sql-connector
- 若要安装完整的连接器(包括 PyArrow),请使用: - pip install databricks-sql-connector[pyarrow]
 
- 为要使用的通用计算或 SQL 仓库收集以下信息: - 通用计算- 所有用途计算的服务器主机名。 可以从 选项卡中的服务器>值获取此值,以便进行全用途计算。 
- 全用途计算的 HTTP 路径。 可以从 选项卡中的 >值获取此值,以便进行全用途计算。 
 - 注意 - SQL 连接器不支持连接到作业计算集群。 - SQL 仓库- SQL 仓库的服务器主机名。 从 SQL 仓库的“连接详细信息”选项卡的“服务器主机名”值中可以获取此主机名。
- SQL 仓库的 HTTP 路径。 从 SQL 仓库的“连接详细信息”选项卡的“HTTP 路径”值中可以获取此路径。
 
身份验证
适用于 Python 的 Databricks SQL 连接器支持以下 Azure Databricks 身份验证类型:
适用于 Python 的 Databricks SQL 连接器尚不支持以下 Azure Databricks 身份验证类型:
Databricks 个人访问令牌身份验证
要将适用于 Python 的 Databricks SQL 连接器与 Azure Databricks 个人访问令牌身份验证配合使用,你必须先创建一个 Azure Databricks 个人访问令牌。 为此,请遵循 为工作区用户创建个人访问令牌中的步骤。
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
- 
              DATABRICKS_SERVER_HOSTNAME将其设置为用于通用计算或 SQL 仓库的服务器主机名值。
- 
              DATABRICKS_HTTP_PATH,设置为用于所有用途的计算或 SQL 仓库的 HTTP 路径 值。
- 
              DATABRICKS_TOKEN,设置为 Azure Databricks 个人访问令牌。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
# ...
OAuth 计算机到计算机 (M2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 计算机到计算机 (M2M) 身份验证。 还必须安装用于 Python 0.18.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk 或 python -m pip install databricks-sdk)。
要将 Databricks SQL Connector for Python 与 OAuth M2M 身份验证配合使用,必须执行以下操作:
- 在 Azure Databricks 工作区中创建 Azure Databricks 服务主体,并为该服务主体创建 OAuth 机密。 - 若要创建服务主体及其 OAuth 机密,请参阅 使用 OAuth 授权服务主体访问 Azure Databricks。 记下服务主体的 UUID 或应用程序 ID 值,以及服务主体的 OAuth 机密的机密值。 
- 为该服务主体提供对通用计算或仓库的访问权限。 - 若要向服务主体授予对通用计算或仓库的访问权限,请参阅 “计算权限 ”或 “管理 SQL 仓库”。 
要对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 此代码片段假定你已设置以下环境变量:
- 
              DATABRICKS_SERVER_HOSTNAME设置为用于所有用途计算或 SQL 仓库 的服务器主机名 值。
- 
              DATABRICKS_HTTP_PATH,设置为用于所有用途的计算或 SQL 仓库的 HTTP 路径 值。
- 
              DATABRICKS_CLIENT_ID,设置为服务主体的 UUID 或应用程序 ID 值。
- 
              DATABRICKS_CLIENT_SECRET,设置为服务主体的 OAuth 机密的“机密”值。
若要设置环境变量,请参阅操作系统的文档。
from databricks.sdk.core import Config, oauth_service_principal
from databricks import sql
import os
server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME")
def credential_provider():
  config = Config(
    host          = f"https://{server_hostname}",
    client_id     = os.getenv("DATABRICKS_CLIENT_ID"),
    client_secret = os.getenv("DATABRICKS_CLIENT_SECRET"))
  return oauth_service_principal(config)
with sql.connect(server_hostname      = server_hostname,
                 http_path            = os.getenv("DATABRICKS_HTTP_PATH"),
                 credentials_provider = credential_provider) as connection:
# ...
OAuth 用户到计算机 (U2M) 身份验证
适用于 Python 的 Databricks SQL 连接器版本 2.7.0 及更高版本支持 OAuth 用户到计算机 (U2M) 身份验证。 还必须安装用于 Python 0.19.0 或更高版本的 Databricks SDK(例如通过运行 pip install databricks-sdk 或 python -m pip install databricks-sdk)。
要使用 OAuth U2M 身份验证对适用于 Python 的 Databricks SQL 连接器进行身份验证,可使用以下代码片段。 OAuth U2M 身份验证使用实时人工登录和同意对目标 Azure Databricks 用户帐户进行身份验证。 此代码片段假定你已设置以下环境变量:
- 将 DATABRICKS_SERVER_HOSTNAME设置为计算或 SQL 仓库的 服务器主机名 值,用于所有用途。
- 将 DATABRICKS_HTTP_PATH设置为用于通用计算或 SQL 仓库的 HTTP 路径 值。
若要设置环境变量,请参阅操作系统的文档。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 auth_type       = "databricks-oauth") as connection:
# ...
示例
以下代码示例演示如何使用用于 Python 的 Databricks SQL 连接器来查询和插入数据、查询元数据、管理游标和连接、管理 Unity 目录中的文件以及配置日志记录。
注意
以下代码示例演示如何使用 Azure Databricks 个人访问令牌进行身份验证。 若要使用不同的身份验证类型,请参阅 “身份验证”。
这些代码示例从以下环境变量中检索它们的 server_hostname、http_path 和 access_token 连接变量值:
- 
              DATABRICKS_SERVER_HOSTNAME,表示要求中的“服务器主机名”值。
- 
              DATABRICKS_HTTP_PATH,表示要求中的“HTTP 路径”值。
- 
              DATABRICKS_TOKEN,表示要求中的访问令牌。
设置用户代理
下面的代码示例演示如何设置 User-Agent 应用程序 product_name 进行使用情况跟踪。
from databricks import sql
import os
with sql.connect(server_hostname   = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path         = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token      = os.getenv("DATABRICKS_TOKEN"),
                 user_agent_entry = "product_name") as connection:
  with connection.cursor() as cursor:
    cursor.execute("SELECT 1 + 1")
    result = cursor.fetchall()
    for row in result:
      print(row)
查询数据
下面的代码示例演示如何调用用于 Python 的 Databricks SQL 连接器,以便在通用计算或 SQL 仓库上运行基本 SQL 命令。 此命令返回 trips 目录的 samples 架构中的nyctaxi 表中的前两行。
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
  with connection.cursor() as cursor:
    cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
    result = cursor.fetchall()
    for row in result:
      print(row)
查询标记
重要
此功能以个人预览版提供。 若要请求访问权限,请联系你的帐户团队。
以下示例演示如何将键值标记附加到 SQL 查询以用于跟踪和分析目的。 查询标记显示在 system.query.history 表中。
from databricks import sql
import os
with sql.connect(
    server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
    http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
    access_token    = os.getenv("DATABRICKS_TOKEN"),
    session_configuration = {
        'query_tags': 'team:engineering,dashboard:abc123,env:prod'
    }
) as connection:
    with connection.cursor() as cursor:
        cursor.execute("SELECT * FROM samples.nyctaxi.trips LIMIT 2")
        result = cursor.fetchall()
        # Query is now tagged and trackable in system.query.history
        for row in result:
            print(row)
插入数据
以下示例演示如何插入少量数据(数千行):
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
  with connection.cursor() as cursor:
    cursor.execute("CREATE TABLE IF NOT EXISTS squares (x int, x_squared int)")
    squares = [(i, i * i) for i in range(100)]
    values = ",".join([f"({x}, {y})" for (x, y) in squares])
    cursor.execute(f"INSERT INTO squares VALUES {values}")
    cursor.execute("SELECT * FROM squares LIMIT 10")
    result = cursor.fetchall()
    for row in result:
      print(row)
要插入大量数据,请先将数据上传到云存储,然后执行 COPY INTO 命令。
查询元数据
可通过一些专用方法检索元数据。 以下示例检索有关示例表中的列的元数据:
from databricks import sql
import os
with sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token    = os.getenv("DATABRICKS_TOKEN")) as connection:
  with connection.cursor() as cursor:
    cursor.columns(schema_name="default", table_name="squares")
    print(cursor.fetchall())
管理游标和连接
最好关闭不再使用的任何连接和游标。 这会释放 Azure Databricks 全用途计算和 Databricks SQL 仓库上的资源。
可以使用上下文管理器(在前面示例中使用的 with 语法)来管理资源,或显式调用 close:
from databricks import sql
import os
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
print(cursor.fetchall())
cursor.close()
connection.close()
管理 Unity Catalog 卷中的文件
Databricks SQL 连接器可让你将本地文件写入 Unity Catalog 卷、从卷下载文件以及从卷中删除文件,如下例所示:
from databricks import sql
import os
# For writing local files to volumes and downloading files from volumes,
# you must set the staging_allowed_local_path argument to the path to the
# local folder that contains the files to be written or downloaded.
# For deleting files in volumes, you must also specify the
# staging_allowed_local_path argument, but its value is ignored,
# so in that case its value can be set for example to an empty string.
with sql.connect(server_hostname            = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                 http_path                  = os.getenv("DATABRICKS_HTTP_PATH"),
                 access_token               = os.getenv("DATABRICKS_TOKEN"),
                 staging_allowed_local_path = "/tmp/") as connection:
  with connection.cursor() as cursor:
    # Write a local file to the specified path in a volume.
    # Specify OVERWRITE to overwrite any existing file in that path.
    cursor.execute(
      "PUT '/tmp/my-data.csv' INTO '/Volumes/main/default/my-volume/my-data.csv' OVERWRITE"
    )
    # Download a file from the specified path in a volume.
    cursor.execute(
      "GET '/Volumes/main/default/my-volume/my-data.csv' TO '/tmp/my-downloaded-data.csv'"
    )
    # Delete a file from the specified path in a volume.
    cursor.execute(
      "REMOVE '/Volumes/main/default/my-volume/my-data.csv'"
    )
配置日志记录
Databricks SQL 连接器使用 Python 的标准日志记录模块。 以下示例配置日志记录级别并生成调试日志:
from databricks import sql
import os, logging
logging.getLogger("databricks.sql").setLevel(logging.DEBUG)
logging.basicConfig(filename = "results.log",
                    level    = logging.DEBUG)
connection = sql.connect(server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
                         http_path       = os.getenv("DATABRICKS_HTTP_PATH"),
                         access_token    = os.getenv("DATABRICKS_TOKEN"))
cursor = connection.cursor()
cursor.execute("SELECT * from range(10)")
result = cursor.fetchall()
for row in result:
   logging.debug(row)
cursor.close()
connection.close()
测试
若要测试代码,请使用 Python 测试框架,例如 pytest。 若要在模拟条件下测试代码,而不调用 Azure Databricks REST API 终结点或更改 Azure Databricks 帐户或工作区的状态,可以使用 Python 模拟库(如 unittest.mock)。
例如,如果以下名为“helpers.py”的文件,其中包含使用 Azure Databricks 个人访问令牌返回到 Azure Databricks 工作区的连接的 get_connection_personal_access_token 函数,以及使用连接从 select_nyctaxi_trips 目录的 trips 架构中的 samples 表中获取指定数目的数据行的 nyctaxi 函数:
# helpers.py
from databricks import sql
from databricks.sql.client import Connection, List, Row, Cursor
def get_connection_personal_access_token(
  server_hostname: str,
  http_path: str,
  access_token: str
) -> Connection:
  return sql.connect(
    server_hostname = server_hostname,
    http_path = http_path,
    access_token = access_token
  )
def select_nyctaxi_trips(
  connection: Connection,
  num_rows: int
) -> List[Row]:
  cursor: Cursor = connection.cursor()
  cursor.execute(f"SELECT * FROM samples.nyctaxi.trips LIMIT {num_rows}")
  result: List[Row] = cursor.fetchall()
  return result
另外,假设以下名为 main.py 的文件调用 get_connection_personal_access_token 和 select_nyctaxi_trips 函数:
# main.py
from databricks.sql.client import Connection, List, Row
import os
from helpers import get_connection_personal_access_token, select_nyctaxi_trips
connection: Connection = get_connection_personal_access_token(
  server_hostname = os.getenv("DATABRICKS_SERVER_HOSTNAME"),
  http_path = os.getenv("DATABRICKS_HTTP_PATH"),
  access_token = os.getenv("DATABRICKS_TOKEN")
)
rows: List[Row] = select_nyctaxi_trips(
  connection = connection,
  num_rows = 2
)
for row in rows:
  print(row)
以下名为 test_helpers.py 的文件测试 select_nyctaxi_trips 函数是否返回预期的响应。 此测试将模拟 Connection 对象,而不是创建与目标工作区的真实连接。 该测试还模拟一些符合真实数据中的架构和值的数据。 该测试通过模拟连接返回模拟数据,然后检查其中一个模拟数据行的值是否与预期值匹配。
# test_helpers.py
import pytest
from databricks.sql.client import Connection, List, Row
from datetime import datetime
from helpers import select_nyctaxi_trips
from unittest.mock import create_autospec
@pytest.fixture
def mock_data() -> List[Row]:
  return [
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 14, 16, 52, 13),
      tpep_dropoff_datetime = datetime(2016, 2, 14, 17, 16, 4),
      trip_distance = 4.94,
      fare_amount = 19.0,
      pickup_zip = 10282,
      dropoff_zip = 10171
    ),
    Row(
      tpep_pickup_datetime = datetime(2016, 2, 4, 18, 44, 19),
      tpep_dropoff_datetime = datetime(2016, 2, 4, 18, 46),
      trip_distance = 0.28,
      fare_amount = 3.5,
      pickup_zip = 10110,
      dropoff_zip = 10110
    )
  ]
def test_select_nyctaxi_trips(mock_data: List[Row]):
  # Create a mock Connection.
  mock_connection = create_autospec(Connection)
  # Set the mock Connection's cursor().fetchall() to the mock data.
  mock_connection.cursor().fetchall.return_value = mock_data
  # Call the real function with the mock Connection.
  response: List[Row] = select_nyctaxi_trips(
    connection = mock_connection,
    num_rows = 2)
  # Check the value of one of the mocked data row's columns.
  assert response[1].fare_amount == 3.5
              select_nyctaxi_trips 函数包含 SELECT 语句,因此不会更改 trips 表的状态,在此示例中并不是一定需要模拟。 但是,模拟让你能够快速运行测试,而无需等待与工作区建立实际连接。 此外,通过模拟,可以多次针对可能更改表状态的函数运行模拟测试,例如 INSERT INTO、UPDATE 和 DELETE FROM。
API 参考
本部分包含 databricks-sql-connector 软件包的 API 参考。 请参阅 Python 包索引 (PyPI) 中的 databricks-sql-connector 。
模块
包 databricks.sql 的 databricks-sql-connector 模块包含初始化与 SQL 仓库的连接的方法。
connect 方法
初始化与 SQL 仓库的连接。 返回 Connection 对象。
| 参数 | 类型 | Description | 
|---|---|---|
| server_hostname | str | 必填。 例如 adb-1234567890123456.7.databricks.azure.cn,用于全用途计算或 SQL 仓库的服务器主机名。若要获取服务器主机名,请参阅 入门中的说明。 | 
| http_path | str | 必填。 通用计算或 SQL 仓库的 HTTP 路径,例如 sql/protocolv1/o/1234567890123456/1234-567890-test123用于通用计算或/sql/1.0/warehouses/a1b234c567d8e9faSQL 仓库。若要获取 HTTP 路径,请参阅 入门中的说明。 | 
| access_token、auth_type、credentials_provider、password、username | str | 有关 Azure Databricks 身份验证设置的信息。 有关详细信息,请参阅身份验证。 | 
| session_configuration | dict[str, Any] | Spark 会话配置参数的字典。 设置一个等效于使用 SET key=valSQL 命令的配置。 运行 SQL 命令SET -v可以获取可用配置的完整列表。 默认为None。示例: {"spark.sql.variable.substitute": True} | 
| http_headers | List[Tuple[str, str]]] | 可选。 在客户端发出的每个 RPC 请求的 HTTP 标头中设置的其他(键、值)对。 典型用法不会设置任何额外的 HTTP 标头。 默认为 None。 | 
| catalog | str | 可选。 用于连接的初始目录。 None默认为(在这种情况下,将使用默认目录,通常为hive_metastore)。 | 
| schema | str | 可选。 用于连接的初始架构。 默认为 None(在这种情况下,将使用默认架构default)。从版本 2.0 开始 | 
| use_cloud_fetch | bool | 可选。 是否直接将提取请求发送到云对象存储以下载数据区块。 默认为 True。 将False设置为直接发送提取请求到 Azure Databricks。如果 use_cloud_fetch设置为True但网络访问被阻止,则提取请求将失败。从版本 2.8 开始 | 
| user_agent_entry | str | 可选。 包含在 HTTP 请求标头中的 User-Agent 条目,用于跟踪使用情况。 默认为 PyDatabricksSqlConnector。 | 
              Connection 类
表示与计算或 SQL 仓库的连接。
方法
该 Connection 类提供以下方法。
| 方法 | Description | 
|---|---|
| close | 关闭与数据库的连接,并释放服务器上所有关联的资源。 对此连接的任何其他调用都将引发 Error。无参数。 没有返回值。 | 
| cursor | 返回一个新的 Cursor 对象 ,该对象允许遍历数据库中的记录。 无参数。 | 
 
              Cursor 类
表示一种遍历数据记录的机制。
若要创建 Cursor 对象,请调用 cursorConnection'类的方法。
特性
选定的 Cursor 属性包括:
| Attribute | Description | 
|---|---|
| arraysize | 与 fetchmany方法一起使用,指定内部缓冲区大小,该大小也是一次性从服务器实际提取的行数。 默认值为10000。 对于窄结果(在结果中每行未包含大量数据),应增大该值以提高性能。 读写访问。 | 
| description | 包含 list对象的 Pythontuple。 每个tuple对象都包含 7 个值,并且每个tuple对象的前 2 个项目包含如下所示的描述单个结果列的信息:
 | 
方法
选定的 Cursor 方法包括:
| 方法 | Description | 
|---|---|
| cancel | 中断运行游标启动的任何数据库查询或命令。 若要释放服务器上的关联资源,请在调用 close方法后调用cancel方法。无参数。 没有返回值。 | 
| close | 关闭游标并释放服务器上关联的资源。 关闭已关闭的游标可能会引发错误。 无参数。 没有返回值。 | 
| execute | 准备并运行数据库查询或命令。 参数: 
 没有返回值。 | 
| executemany | 使用 seq_of_parameters实参中的所有形参序列来准备并运行数据库查询或命令。 仅保留最终结果集。参数: 
 没有返回值。 | 
| catalogs | 执行有关目录的元数据查询。 然后应使用 fetchmany或fetchall获取实际结果。结果集中的重要字段包括: 
 无参数。 没有返回值。 从版本 1.0 开始 | 
| schemas | 执行有关架构的元数据查询。 然后应使用 fetchmany或fetchall获取实际结果。结果集中的重要字段包括: 
 参数: 
 没有返回值。 从版本 1.0 开始 | 
| tables | 执行有关表和视图的元数据查询。 然后应使用 fetchmany或fetchall获取实际结果。结果集中的重要字段包括: 
 参数: 
 没有返回值。 从版本 1.0 开始 | 
| columns | 执行有关列的元数据查询。 然后应使用 fetchmany或fetchall获取实际结果。结果集中的重要字段包括: 
 参数: 
 没有返回值。 从版本 1.0 开始 | 
| fetchall | 获取查询的所有(或所有剩余)行。 无参数。 返回查询的所有(或所有剩余)行,作为 Python listRow对象的集合。如果之前对 Error方法的调用未返回任何数据或尚未进行execute调用,则会引发execute。 | 
| fetchmany | 获取查询的后续行。 参数: 
 以 size对象的 Pythonarraysize的形式返回查询后续行中size行及以前的行(如果没有指定list,则返回Row属性)。如果要提取的行数少于 size,将返回所有剩余的行。如果之前对 Error方法的调用未返回任何数据或尚未进行execute调用,则会引发execute。 | 
| fetchone | 获取数据集的下一行。 无参数。 如果有可用数据,则将数据集的下一行作为一个 Python tuple对象返回;如果没有更多可用数据,则返回None。如果之前对 Error方法的调用未返回任何数据或尚未进行execute调用,则会引发execute。 | 
| fetchall_arrow | 以 PyArrow Table对象的形式获取查询的所有(或所有剩余)行。 返回大量数据的查询应改用fetchmany_arrow,以减少内存消耗。无参数。 以 PyArrow 表的形式返回查询的所有(或所有剩余)行。 如果之前对 Error方法的调用未返回任何数据或尚未进行execute调用,则会引发execute。从版本 2.0 开始 | 
| fetchmany_arrow | 以 PyArrow Table对象的形式获取查询后续行。参数: 
 size以 Python PyArrowarraysize对象的形式返回查询的下一行的参数(或size属性(如果未Table指定)。如果之前对 Error方法的调用未返回任何数据或尚未进行execute调用,则会引发execute。从版本 2.0 开始 | 
              Row 类
行类是类似于元组的数据结构,表示 SQL 查询结果中的单个结果行。
如果行包含名为 "my_column" 的列,则你可以通过 "my_column" 访问 row 的 row.my_column 字段。 还可以使用数字索引来访问字段,例如 row[0]。
如果不允许将列名称用作属性方法名称(例如,它以数字开头),则可以将字段作为 row["1_my_column"] 来访问。
从版本 1.0 开始
选定的 Row 方法包括:
方法
| 方法 | Description | 
|---|---|
| asDict | 返回行的字典表示形式,此值按字段名称编制索引。 如果存在重复的字段名称,则在字典中返回一个重复字段(但只有一个)。 未定义返回哪个重复字段。 | 
类型转换
下表将 Apache Spark SQL 数据类型映射到其 Python 数据类型等效项。
| Apache Spark SQL 数据类型 | Python 数据类型 | 
|---|---|
| array | numpy.ndarray | 
| bigint | int | 
| binary | bytearray | 
| boolean | bool | 
| date | datetime.date | 
| decimal | decimal.Decimal | 
| double | float | 
| int | int | 
| map | str | 
| null | NoneType | 
| smallint | int | 
| string | str | 
| struct | str | 
| timestamp | datetime.datetime | 
| tinyint | int | 
疑难解答
              tokenAuthWrapperInvalidAccessToken: Invalid access token 消息
问题:运行代码时看到类似于 的消息。
可能的原因:传递给 的值不是有效的 Azure Databricks 个人访问令牌。
建议的解决方法:检查传递给 的值是否正确,然后重试。
              gaierror(8, 'nodename nor servname provided, or not known') 消息
问题:运行代码时看到类似于 的消息。
可能的原因:传递给 的值不是正确的主机名。
建议的解决方法:检查传递给 的值是否正确,然后重试。
有关查找服务器主机名的详细信息,请参阅获取 Azure Databricks 计算资源的连接详细信息。
              IpAclError 消息
问题:在运行代码期间,尝试使用 Azure Databricks 笔记本上的连接器时看到消息 。
可能的原因:可能为 Azure Databricks 工作区启用了 IP 允许列表。 使用 IP 允许列表时,默认不允许从 Spark 群集连接回到控制平面。
建议的解决方法:请管理员将计算平面子网添加到 IP 允许列表。
其他资源
有关详细信息,请参阅: