Compartir a través de

pandas_udf

创建 pandas 用户定义的函数。

Pandas UDF 是 Spark 使用 Arrow 传输数据和 Pandas 处理数据(允许 pandas作)执行的用户定义的函数。 Pandas UDF 使用 pandas_udf 修饰器或包装函数进行定义,无需其他配置。 Pandas UDF 通常充当常规 PySpark 函数 API。

Syntax

import pyspark.sql.functions as sf

# As a decorator
@sf.pandas_udf(returnType=<returnType>, functionType=<functionType>)
def function_name(col):
    # function body
    pass

# As a function wrapper
sf.pandas_udf(f=<function>, returnType=<returnType>, functionType=<functionType>)

参数

参数 类型 Description
f function 可选。 用户定义的函数。 用作独立函数的 python 函数。
returnType pyspark.sql.types.DataTypestr 可选。 用户定义的函数的返回类型。 该值可以是 DataType 对象或 DDL 格式的类型字符串。
functionType int 可选。 PandasUDFType 中的枚举值。 默认值:SCALAR。 此参数存在是为了兼容性。 建议使用 Python 类型提示。

例子

示例 1:序列到序列 - 将字符串转换为大写。

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()
+--------------+
|to_upper(name)|
+--------------+
|      JOHN DOE|
+--------------+

示例 2:带关键字参数的序列到序列。

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as sf

@pandas_udf(returnType=IntegerType())
def calc(a: pd.Series, b: pd.Series) -> pd.Series:
    return a + 10 * b

spark.range(2).select(calc(b=sf.col("id") * 10, a=sf.col("id"))).show()
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
|                            0|
|                          101|
+-----------------------------+

示例 3:系列迭代器到序列迭代器。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf

@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for s in iterator:
        yield s + 1

df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
df.select(plus_one(df.v)).show()
+-----------+
|plus_one(v)|
+-----------+
|          2|
|          3|
|          4|
+-----------+

示例 4:系列到标量 - 分组聚合。

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(mean_udf(df['v'])).show()
+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+

示例 5:具有窗口函数的系列到标量。

import pandas as pd
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()
+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.0|
|  1| 2.0|   1.5|
|  2| 3.0|   3.0|
|  2| 5.0|   4.0|
|  2|10.0|   7.5|
+---+----+------+

示例 6:序列迭代器到标量 - 内存高效的分组聚合。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf

@pandas_udf("double")
def pandas_mean_iter(it: Iterator[pd.Series]) -> float:
    sum_val = 0.0
    cnt = 0
    for v in it:
        sum_val += v.sum()
        cnt += len(v)
    return sum_val / cnt

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(pandas_mean_iter(df['v'])).show()
+---+-------------------+
| id|pandas_mean_iter(v)|
+---+-------------------+
|  1|                1.5|
|  2|                6.0|
+---+-------------------+