创建 PyArrow 本机用户定义表函数 (UDTF)。 此函数为 UDDF 提供 PyArrow 本机接口,其中 eval 方法接收 PyArrow RecordBatches 或数组,并返回 PyArrow 表或 RecordBatches 的迭代器。 这样就可以实现真正的矢量化计算,而无需逐行处理开销。
Syntax
from pyspark.databricks.sql import functions as dbf
@dbf.arrow_udtf(returnType=<returnType>)
class MyUDTF:
def eval(self, ...):
...
参数
| 参数 | 类型 | Description |
|---|---|---|
cls |
class自选 |
Python 用户定义的表函数处理程序类。 |
returnType |
pyspark.sql.types.StructType 或 str(可选) |
用户定义的表函数的返回类型。 该值可以是 StructType 对象,也可以是 DDL 格式的结构类型字符串。 |
例子
带有 PyArrow RecordBatch 输入的 UDTF:
import pyarrow as pa
from pyspark.databricks.sql.functions import arrow_udtf
@arrow_udtf(returnType="x int, y int")
class MyUDTF:
def eval(self, batch: pa.RecordBatch):
# Process the entire batch vectorized
x_array = batch.column('x')
y_array = batch.column('y')
result_table = pa.table({
'x': x_array,
'y': y_array
})
yield result_table
df = spark.range(10).selectExpr("id as x", "id as y")
MyUDTF(df.asTable()).show()
带有 PyArrow 数组输入的 UDTF:
@arrow_udtf(returnType="x int, y int")
class MyUDTF2:
def eval(self, x: pa.Array, y: pa.Array):
# Process arrays vectorized
result_table = pa.table({
'x': x,
'y': y
})
yield result_table
MyUDTF2(lit(1), lit(2)).show()
注释
- eval 方法必须接受 PyArrow RecordBatches 或数组作为输入
- eval 方法必须生成 PyArrow 表或 RecordBatches 作为输出