Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
创建 PyArrow 本机用户定义表函数 (UDTF)。 此函数为 UDDF 提供 PyArrow 本机接口,其中 eval 方法接收 PyArrow RecordBatches 或数组,并返回 PyArrow 表或 RecordBatches 的迭代器。 这样就可以实现真正的矢量化计算,而无需逐行处理开销。
Syntax
from pyspark.databricks.sql import functions as dbf
@dbf.arrow_udtf(returnType=<returnType>)
class MyUDTF:
def eval(self, ...):
...
参数
| 参数 | 类型 | Description |
|---|---|---|
cls |
class自选 |
Python 用户定义的表函数处理程序类。 |
returnType |
pyspark.sql.types.StructType 或 str(可选) |
用户定义的表函数的返回类型。 该值可以是 StructType 对象,也可以是 DDL 格式的结构类型字符串。 |
例子
带有 PyArrow RecordBatch 输入的 UDTF:
import pyarrow as pa
from pyspark.databricks.sql.functions import arrow_udtf
@arrow_udtf(returnType="x int, y int")
class MyUDTF:
def eval(self, batch: pa.RecordBatch):
# Process the entire batch vectorized
x_array = batch.column('x')
y_array = batch.column('y')
result_table = pa.table({
'x': x_array,
'y': y_array
})
yield result_table
df = spark.range(10).selectExpr("id as x", "id as y")
MyUDTF(df.asTable()).show()
带有 PyArrow 数组输入的 UDTF:
@arrow_udtf(returnType="x int, y int")
class MyUDTF2:
def eval(self, x: pa.Array, y: pa.Array):
# Process arrays vectorized
result_table = pa.table({
'x': x,
'y': y
})
yield result_table
MyUDTF2(lit(1), lit(2)).show()
注释
- eval 方法必须接受 PyArrow RecordBatches 或数组作为输入
- eval 方法必须生成 PyArrow 表或 RecordBatches 作为输出