重要
在 Unity 目录中注册 Python UDF 以 公共预览版提供。
Unity 目录用户定义的表函数(UDTF)允许注册返回完整表的函数,而不是标量值。 与从每个调用返回单个结果值的标量函数不同,UDDF 在 SQL 语句的 FROM
子句中调用,并可以返回多个行和列。
UDF 特别适用于:
- 将数组或复杂数据结构转换为多行
- 将外部 API 或服务集成到 SQL 工作流中
- 实现自定义数据生成或扩充逻辑
- 处理需要跨行执行有状态作的数据
每个 UDTF 调用都可以接受零个或多个参数。 这些参数可以是表示整个输入表的标量表达式或表参数。
可以通过两种方式注册 UDF:
- Unity 目录:在 Unity 目录中将 UDTF 注册为受治理对象。
- 会话范围:注册到本地
SparkSession
,与当前笔记本或作业隔离。 请参阅 Python 用户定义的表函数(UDF)。
要求
以下计算类型支持 Unity 目录 Python UDF:
- 具有标准访问模式的经典计算(Databricks Runtime 17.1 及更高版本)
- SQL 仓库(专业和经典)
在 Unity 目录中创建 UDTF
使用 SQL DDL 在 Unity 目录中创建受治理的 UDTF。 UDF 是使用 SQL 语句的子句调用的 FROM
。
CREATE OR REPLACE FUNCTION square_numbers(start INT, end INT)
RETURNS TABLE (num INT, squared INT)
LANGUAGE PYTHON
HANDLER 'SquareNumbers'
DETERMINISTIC
AS $$
class SquareNumbers:
"""
Basic UDTF that computes a sequence of integers
and includes the square of each number in the range.
"""
def eval(self, start: int, end: int):
for num in range(start, end + 1):
yield (num, num * num)
$$;
SELECT * FROM square_numbers(1, 5);
+-----+---------+
| num | squared |
+-----+---------+
| 1 | 1 |
| 2 | 4 |
| 3 | 9 |
| 4 | 16 |
| 5 | 25 |
+-----+---------+
Databricks 使用生成输出行的必需 eval
方法将 Python UDDF 实现为 Python 类。
实例
以下示例演示了 Unity 目录 Python UDDF 的实际用例,从简单的数据转换到复杂的外部集成。
示例:重新实现 explode
虽然 Spark 提供内置 explode
函数,但创建自己的版本演示了采用单个输入并生成多个输出行的基本 UDTF 模式。
CREATE OR REPLACE FUNCTION my_explode(arr ARRAY<STRING>)
RETURNS TABLE (element STRING)
LANGUAGE PYTHON
HANDLER 'MyExplode'
DETERMINISTIC
AS $$
class MyExplode:
def eval(self, arr):
if arr is None:
return
for element in arr:
yield (element,)
$$;
直接在 SQL 查询中使用函数:
SELECT element FROM my_explode(array('apple', 'banana', 'cherry'));
+---------+
|| element |
+---------+
|| apple |
|| banana |
|| cherry |
+---------+
或者,将其应用于具有 LATERAL
联接的现有表数据:
SELECT s.*, e.element
FROM my_items AS s,
LATERAL my_explode(s.items) AS e;
示例:通过 REST API 的 IP 地址地理位置
此示例演示 UDDF 如何将外部 API 直接集成到 SQL 工作流中。 分析人员可以使用熟悉的 SQL 语法通过实时 API 调用来丰富数据,而不是需要单独的 ETL 进程。
CREATE OR REPLACE FUNCTION ip_to_location(ip_address STRING)
RETURNS TABLE (city STRING, country STRING)
LANGUAGE PYTHON
HANDLER 'IPToLocationAPI'
AS $$
class IPToLocationAPI:
def eval(self, ip_address):
import requests
api_url = f"https://api.ip-lookup.example.com/{ip_address}"
try:
response = requests.get(api_url)
response.raise_for_status()
data = response.json()
yield (data.get('city'), data.get('country'))
except requests.exceptions.RequestException as e:
# Returns nothing if the ip_address is invalid
return
$$;
注释
Python UDF 允许通过端口 80、443 和 53 通过端口 80、443 和 53 进行 TCP/UDP 网络流量,并使用配置了标准访问模式的计算。
使用函数通过地理信息丰富 Web 日志数据:
SELECT
l.timestamp,
l.request_path,
geo.city,
geo.country
FROM web_logs AS l,
LATERAL ip_to_location(l.ip_address) AS geo;
此方法支持实时地理分析,而无需预先处理的查阅表或单独的数据管道。 UDTF 处理 HTTP 请求、JSON 分析和错误处理,使外部数据源可通过标准 SQL 查询访问。
如果您的函数产生一致的结果,请设置DETERMINISTIC
如果您的函数在相同输入下生成相同输出,请将 DETERMINISTIC
添加到函数定义中。 这允许查询优化来提高性能。
默认情况下,除非明确声明,否则假定 Batch Unity Catalog 的 Python UDTF 为非确定性的。 非确定性函数的示例包括:生成随机值、访问当前时间或日期或进行外部 API 调用。
请参阅 CREATE FUNCTION (SQL 和 Python)
局限性
以下限制适用于 Unity 目录 Python UDDF:
- 不支持多态表函数。
- 不支持 TABLE 参数。
- 不支持 Unity 目录服务凭据。
- 不支持自定义依赖项。