Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
创建 pandas 用户定义的函数。
Pandas UDF 是 Spark 使用 Arrow 传输数据和 Pandas 处理数据(允许 pandas作)执行的用户定义的函数。 Pandas UDF 使用 pandas_udf 修饰器或包装函数进行定义,无需其他配置。 Pandas UDF 通常充当常规 PySpark 函数 API。
Syntax
import pyspark.sql.functions as sf
# As a decorator
@sf.pandas_udf(returnType=<returnType>, functionType=<functionType>)
def function_name(col):
# function body
pass
# As a function wrapper
sf.pandas_udf(f=<function>, returnType=<returnType>, functionType=<functionType>)
参数
| 参数 | 类型 | Description |
|---|---|---|
f |
function |
可选。 用户定义的函数。 用作独立函数的 python 函数。 |
returnType |
pyspark.sql.types.DataType 或 str |
可选。 用户定义的函数的返回类型。 该值可以是 DataType 对象或 DDL 格式的类型字符串。 |
functionType |
int |
可选。 PandasUDFType 中的枚举值。 默认值:SCALAR。 此参数存在是为了兼容性。 建议使用 Python 类型提示。 |
例子
示例 1:序列到序列 - 将字符串转换为大写。
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
return s.str.upper()
df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()
+--------------+
|to_upper(name)|
+--------------+
| JOHN DOE|
+--------------+
示例 2:带关键字参数的序列到序列。
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as sf
@pandas_udf(returnType=IntegerType())
def calc(a: pd.Series, b: pd.Series) -> pd.Series:
return a + 10 * b
spark.range(2).select(calc(b=sf.col("id") * 10, a=sf.col("id"))).show()
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
| 0|
| 101|
+-----------------------------+
示例 3:系列迭代器到序列迭代器。
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf
@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1
df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
df.select(plus_one(df.v)).show()
+-----------+
|plus_one(v)|
+-----------+
| 2|
| 3|
| 4|
+-----------+
示例 4:系列到标量 - 分组聚合。
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(mean_udf(df['v'])).show()
+---+-----------+
| id|mean_udf(v)|
+---+-----------+
| 1| 1.5|
| 2| 6.0|
+---+-----------+
示例 5:具有窗口函数的系列到标量。
import pandas as pd
from pyspark.sql import Window
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
w = Window.partitionBy('id').orderBy('v').rowsBetween(-1, 0)
df.withColumn('mean_v', mean_udf("v").over(w)).show()
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.0|
| 1| 2.0| 1.5|
| 2| 3.0| 3.0|
| 2| 5.0| 4.0|
| 2|10.0| 7.5|
+---+----+------+
示例 6:序列迭代器到标量 - 内存高效的分组聚合。
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import pandas_udf
@pandas_udf("double")
def pandas_mean_iter(it: Iterator[pd.Series]) -> float:
sum_val = 0.0
cnt = 0
for v in it:
sum_val += v.sum()
cnt += len(v)
return sum_val / cnt
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(pandas_mean_iter(df['v'])).show()
+---+-------------------+
| id|pandas_mean_iter(v)|
+---+-------------------+
| 1| 1.5|
| 2| 6.0|
+---+-------------------+