Unity 目录中的 Python 用户定义的表函数 (UDF)

重要

在 Unity 目录中注册 Python UDF 以 公共预览版提供

Unity 目录用户定义的表函数(UDTF)允许注册返回完整表的函数,而不是标量值。 与从每个调用返回单个结果值的标量函数不同,UDDF 在 SQL 语句的 FROM 子句中调用,并可以返回多个行和列。

UDF 特别适用于:

  • 将数组或复杂数据结构转换为多行
  • 将外部 API 或服务集成到 SQL 工作流中
  • 实现自定义数据生成或扩充逻辑
  • 处理需要跨行执行有状态作的数据

每个 UDTF 调用都可以接受零个或多个参数。 这些参数可以是表示整个输入表的标量表达式或表参数。

可以通过两种方式注册 UDF:

要求

以下计算类型支持 Unity 目录 Python UDF:

  • 具有标准访问模式的经典计算(Databricks Runtime 17.1 及更高版本)
  • SQL 仓库(专业和经典)

在 Unity 目录中创建 UDTF

使用 SQL DDL 在 Unity 目录中创建受治理的 UDTF。 UDF 是使用 SQL 语句的子句调用的 FROM

CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
    """
    Basic UDTF that computes a sequence of integers
    and includes the square of each number in the range.
    """
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)
$$;

SELECT * FROM square_numbers(1, 5);

+-----+---------+
| num | squared |
+-----+---------+
| 1   | 1       |
| 2   | 4       |
| 3   | 9       |
| 4   | 16      |
| 5   | 25      |
+-----+---------+

Databricks 使用生成输出行的必需 eval 方法将 Python UDDF 实现为 Python 类。

实例

以下示例演示了 Unity 目录 Python UDDF 的实际用例,从简单的数据转换到复杂的外部集成。

示例:重新实现 explode

虽然 Spark 提供内置 explode 函数,但创建自己的版本演示了采用单个输入并生成多个输出行的基本 UDTF 模式。

CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
    def eval(self, arr):
        if arr is None:
            return
        for element in arr:
            yield (element,)
$$;

直接在 SQL 查询中使用函数:

SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
|| element |
+---------+
|| apple   |
|| banana  |
|| cherry  |
+---------+

或者,将其应用于具有 LATERAL 联接的现有表数据:

SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;

示例:通过 REST API 的 IP 地址地理位置

此示例演示 UDDF 如何将外部 API 直接集成到 SQL 工作流中。 分析人员可以使用熟悉的 SQL 语法通过实时 API 调用来丰富数据,而不是需要单独的 ETL 进程。

CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
    def eval(self, ip_address):
        import requests
        api_url = f"https://api.ip-lookup.example.com/{ip_address}"
        try:
            response = requests.get(api_url)
            response.raise_for_status()
            data = response.json()
            yield (data.get('city'), data.get('country'))
        except requests.exceptions.RequestException as e:
            # Returns nothing if the ip_address is invalid
            return
$$;

注释

Python UDF 允许通过端口 80、443 和 53 通过端口 80、443 和 53 进行 TCP/UDP 网络流量,并使用配置了标准访问模式的计算。

使用函数通过地理信息丰富 Web 日志数据:

SELECT
  l.timestamp,
  l.request_path,
  geo.city,
  geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;

此方法支持实时地理分析,而无需预先处理的查阅表或单独的数据管道。 UDTF 处理 HTTP 请求、JSON 分析和错误处理,使外部数据源可通过标准 SQL 查询访问。

如果您的函数产生一致的结果,请设置DETERMINISTIC

如果您的函数在相同输入下生成相同输出,请将 DETERMINISTIC 添加到函数定义中。 这允许查询优化来提高性能。

默认情况下,除非明确声明,否则假定 Batch Unity Catalog 的 Python UDTF 为非确定性的。 非确定性函数的示例包括:生成随机值、访问当前时间或日期或进行外部 API 调用。

请参阅 CREATE FUNCTION (SQL 和 Python)

局限性

以下限制适用于 Unity 目录 Python UDDF:

后续步骤