Unity 目录中的 Python 用户定义的表函数 (UDF)

重要

在 Unity 目录中注册 Python UDF 以 公共预览版提供

Unity 目录用户定义的表函数(UDTF)允许注册返回完整表的函数,而不是标量值。 与从每个调用返回单个结果值的标量函数不同,UDDF 在 SQL 语句的 FROM 子句中调用,并可以返回多个行和列。

UDF 特别适用于:

  • 将数组或复杂数据结构转换为多行
  • 将外部 API 或服务集成到 SQL 工作流中
  • 实现自定义数据生成或扩充逻辑
  • 处理需要跨行执行有状态作的数据

每个 UDTF 调用都可以接受零个或多个参数。 这些参数可以是表示整个输入表的标量表达式或表参数。

可以通过两种方式注册 UDF:

要求

以下计算类型支持 Unity 目录 Python UDF:

  • 具有标准访问模式的经典计算(Databricks Runtime 17.1 及更高版本)
  • SQL 仓库(专业和经典)

在 Unity 目录中创建 UDTF

使用 SQL DDL 在 Unity 目录中创建受治理的 UDTF。 UDF 是使用 SQL 语句的子句调用的 FROM

CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
    """
    Basic UDTF that computes a sequence of integers
    and includes the square of each number in the range.
    """
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

+-----+---------+
| num | squared |
+-----+---------+
| 1   | 1       |
| 2   | 4       |
| 3   | 9       |
| 4   | 16      |
| 5   | 25      |
+-----+---------+

Databricks 使用生成输出行的必需 eval 方法将 Python UDDF 实现为 Python 类。

环境隔离

注释

共享隔离环境需要 Databricks Runtime 17.2 及更高版本。 在早期版本中,所有 Unity 目录 Python UDDF 都以严格的隔离模式运行。

默认情况下,具有相同所有者和会话的 Unity 目录 Python UDF 可以共享隔离环境。 这通过减少需要启动的单独环境的数量来提高性能并减少内存使用量。

严格隔离

若要确保 UDTF 始终在其自己的完全隔离环境中运行,请添加 STRICT ISOLATION 特征子句。

大多数 UDF 不需要严格的隔离。 标准数据处理 UDF 受益于默认共享隔离环境,运行速度更快,内存消耗较低。

STRICT ISOLATION 特征子句添加到 UDDF,该子句:

  • 使用eval()exec()或类似函数以代码形式运行输入。
  • 将文件写入本地文件系统。
  • 修改全局变量或系统状态。
  • 访问或修改环境变量。

以下 UDTF 示例设置自定义环境变量,回读变量,并使用变量将一组数字相乘。 由于 UDTF 会改变进程环境,因此请在其中 STRICT ISOLATION运行它。 否则,它可能会泄漏或替代同一环境中其他 UDF/UDF 的环境变量,从而导致行为不正确。

CREATE OR REPLACE TEMPORARY FUNCTION multiply_numbers(factor STRING)
RETURNS TABLE (original INT, scaled INT)
LANGUAGE PYTHON
STRICT ISOLATION
HANDLER 'Multiplier'
AS $$
import os

class Multiplier:
    def eval(self, factor: str):
        # Save the factor as an environment variable
        os.environ["FACTOR"] = factor

        # Read it back and convert it to a number
        scale = int(os.getenv("FACTOR", "1"))

        # Multiply 0 through 4 by the factor
        for i in range(5):
            yield (i, i * scale)
$$;

SELECT * FROM multiply_numbers("3");

实例

以下示例演示了 Unity 目录 Python UDDF 的实际用例,从简单的数据转换到复杂的外部集成。

示例:重新实现 explode

虽然 Spark 提供内置 explode 函数,但创建自己的版本演示了采用单个输入并生成多个输出行的基本 UDTF 模式。

CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
    def eval(self, arr):
        if arr is None:
            return
        for element in arr:
            yield (element,)
$$;

直接在 SQL 查询中使用函数:

SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
|| element |
+---------+
|| apple   |
|| banana  |
|| cherry  |
+---------+

或者,将其应用于具有 LATERAL 联接的现有表数据:

SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

示例:通过 REST API 的 IP 地址地理位置

此示例演示 UDDF 如何将外部 API 直接集成到 SQL 工作流中。 分析人员可以使用熟悉的 SQL 语法通过实时 API 调用来丰富数据,而不是需要单独的 ETL 进程。

CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
    def eval(self, ip_address):
        import requests
        api_url = f"https://api.ip-lookup.example.com/{ip_address}"
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            data = response.json()
            yield (data.get('city'), data.get('country'))
        except requests.exceptions.RequestException as e:
            # Return nothing if the API request fails
            return
$$;

注释

使用配置了标准访问模式的计算时,Python UDF 允许通过端口 80、443 和 53 的 TCP/UDP 网络流量。

使用函数通过地理信息丰富 Web 日志数据:

SELECT
  l.timestamp,
  l.request_path,
  geo.city,
  geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

此方法支持实时地理分析,而无需预先处理的查阅表或单独的数据管道。 UDTF 处理 HTTP 请求、JSON 分析和错误处理,使外部数据源可通过标准 SQL 查询访问。

如果您的函数产生一致的结果,请设置DETERMINISTIC

如果您的函数在相同输入下生成相同输出,请将 DETERMINISTIC 添加到函数定义中。 这允许查询优化来提高性能。

默认情况下,除非明确声明,否则假定 Batch Unity Catalog 的 Python UDTF 为非确定性的。 非确定性函数的示例包括:生成随机值、访问当前时间或日期或进行外部 API 调用。

请参阅 CREATE FUNCTION (SQL 和 Python)

局限性

以下限制适用于 Unity 目录 Python UDDF:

后续步骤