pandas 用户定义函数 pandas user-defined functions

pandas 用户定义函数 (UDF) 也称为向量化 UDF,是一个用户定义函数,它使用 Apache Arrow 来传输数据并使用 pandas 来处理数据。A pandas user-defined function (UDF)—also known as vectorized UDF—is a user-defined function that uses Apache Arrow to transfer data and pandas to work with the data. pandas UDF 允许向量化操作,与一次一行的 Python UDF 相比,这些操作可将性能提高到 100 倍。pandas UDFs allow vectorized operations that can increase performance up to 100x compared to row-at-a-time Python UDFs.

有关背景信息,请参阅博客文章 New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0(即将发布的 Apache Spark 3.0 中新增的 Pandas UDF 和 Python 类型提示)和 Optimize conversion between PySpark and pandas DataFrames(优化 PySpark 与 pandas 数据帧之间的转换)。For background information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0 and Optimize conversion between PySpark and pandas DataFrames.

你将使用关键字 pandas_udf 作为修饰器来定义 pandas UDF,并使用 Python 类型提示来包装函数。You define a pandas UDF using the keyword pandas_udf as a decorator and wrap the function with a Python type hint. 本文介绍了不同类型的 pandas UDF,并展示了如何将 pandas UDF 与类型提示配合使用。This article describes the different types of pandas UDFs and shows how to use pandas UDFs with type hints.

序列到序列 UDFSeries to Series UDF

你可以使用序列到序列 pandas UDF 将标量运算矢量化。You use a Series to Series pandas UDF to vectorize scalar operations. 可以将它们与 selectwithColumn 等 API 一起使用。You can use them with APIs such as select and withColumn.

Python 函数应采用 pandas 序列作为输入,并返回相同长度的 pandas 序列。你应在 Python 类型提示中指定这些。The Python function should take a pandas Series as an input and return a pandas Series of the same length, and you should specify these in the Python type hints. Spark 通过以下方式运行 pandas UDF:将列拆分为批,为作为数据子集的每个批调用函数,然后将结果连接起来。Spark runs a pandas UDF by splitting columns into batches, calling the function for each batch as a subset of the data, then concatenating the results.

以下示例展示了如何创建一个 pandas UDF 来计算 2 个列的乘积。The following example shows how to create a pandas UDF that computes the product of 2 columns.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF"x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

序列迭代器到序列迭代器 UDF Iterator of Series to Iterator of Series UDF

除了以下方面之外,迭代器 UDF 与标量 pandas UDF 相同:An iterator UDF is the same as a scalar pandas UDF except:

  • Python 函数The Python function
    • 采用批的迭代器而非单个输入批作为输入。Takes an iterator of batches instead of a single input batch as input.
    • 返回输出批的迭代器,而非单个输出批。Returns an iterator of output batches instead of a single output batch.
  • 迭代器中整个输出的长度应与整个输入的长度相同。The length of the entire output in the iterator should be the same as the length of the entire input.
  • 包装的 pandas UDF 采用单个 Spark 列作为输入。The wrapped pandas UDF takes a single Spark column as an input.

你应将 Python 类型提示指定为 Iterator[pandas.Series] -> Iterator[pandas.Series]You should specify the Python type hint as Iterator[pandas.Series] -> Iterator[pandas.Series].

当 UDF 执行需要初始化某个状态时,此 pandas UDF 非常有用,例如,加载机器学习模型文件以将推理应用于每个输入批。This pandas UDF is useful when the UDF execution requires initializing some state, for example, loading a machine learning model file to apply inference to every input batch.

以下示例展示了如何创建具有迭代器支持的 pandas UDF。The following example shows how to create a pandas UDF with iterator support.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1"x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
        for x in batch_iter:
            yield x + y
        pass  # release resources here, if any"x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

多序列迭代器到序列迭代器 UDFIterator of multiple Series to Iterator of Series UDF

序列迭代器到序列迭代器 UDF 相比,多序列迭代器到序列迭代器 UDF 具有类似的特性和限制。An Iterator of multiple Series to Iterator of Series UDF has similar characteristics and restrictions as Iterator of Series to Iterator of Series UDF. 指定的函数接受批迭代器并输出批迭代器。The specified function takes an iterator of batches and outputs an iterator of batches. 当 UDF 执行需要初始化某个状态时,它也很有用。It is also useful when the UDF execution requires initializing some state.

区别在于:The differences are:

  • 基础 Python 函数采用 pandas 序列的元组的迭代器。The underlying Python function takes an iterator of a tuple of pandas Series.
  • 包装的 pandas UDF 采用多个 Spark 列作为输入。The wrapped pandas UDF takes multiple Spark columns as an input.

将类型提示指定为 Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]You specify the type hints as Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b"x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

序列到标量 UDFSeries to scalar UDF

序列到标量 pandas UDF 类似于 Spark 聚合函数。Series to scalar pandas UDFs are similar to Spark aggregate functions. 序列到标量 pandas UDF 定义从一个或多个 pandas 序列到标量值的聚合,其中的每个 pandas 序列表示一个 Spark 列。A Series to scalar pandas UDF defines an aggregation from one or more pandas Series to a scalar value, where each pandas Series represents a Spark column. 可以将序列到标量 pandas UDF 与 API(例如 selectwithColumngroupBy.aggpyspark.sql.Window)一起使用。You use a Series to scalar pandas UDF with APIs such as select, withColumn, groupBy.agg, and pyspark.sql.Window.

将类型提示表示为 pandas.Series, ... -> AnyYou express the type hint as pandas.Series, ... -> Any. 返回类型应当是一个基元数据类型,返回的标量可以是 Python 基元类型,例如 intfloat 或 NumPy 数据类型(如 numpy.int64numpy.float64)。The return type should be a primitive data type, and the returned scalar can be either a Python primitive type, for example, int or float or a NumPy data type such as numpy.int64 or numpy.float64. 理想情况下,Any 应当是一个特定的标量类型。Any should ideally be a specific scalar type.

此类型的 UDF 不支持部分聚合,每个组的所有数据都将加载到内存中。This type of UDF does not support partial aggregation and all data for each group is loaded into memory.

以下示例展示了如何使用此类型的 UDF 通过 selectgroupBywindow 运算来计算平均值:The following example shows how to use this type of UDF to compute mean with select, groupBy, and window operations:

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
def mean_udf(v: pd.Series) -> float:
    return v.mean()['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

有关详细用法,请参阅 pyspark.sql.functions.pandas_udfFor detailed usage, see pyspark.sql.functions.pandas_udf.


设置 Arrow 批大小Setting Arrow batch size

Spark 中的数据分区将被转换为 Arrow 记录批,这可能会暂时导致 JVM 中的内存使用率过高。Data partitions in Spark are converted into Arrow record batches, which can temporarily lead to high memory usage in the JVM. 为了避免可能的内存不足异常,可以调整 Arrow 记录批的大小,方法是:将 spark.sql.execution.arrow.maxRecordsPerBatch 配置设置为一个用于确定每个批的最大行数的整数。To avoid possible out of memory exceptions, you can adjust the size of the Arrow record batches by setting the spark.sql.execution.arrow.maxRecordsPerBatch configuration to an integer that determines the maximum number of rows for each batch. 默认值为每批 10,000 条记录。The default value is 10,000 records per batch. 如果列数较大,则应相应地调整值。If the number of columns is large, the value should be adjusted accordingly. 使用此限制,每个数据分区将拆分为 1 个或多个记录批来进行处理。Using this limit, each data partition is divided into 1 or more record batches for processing.

包含时区语义的时间戳Timestamp with time zone semantics

Spark 在内部将时间戳存储为 UTC 值,在未指定时区的情况下引入的时间戳数据将作为本地时间转换为 UTC,并提供微秒分辨率。Spark internally stores timestamps as UTC values, and timestamp data brought in without a specified time zone is converted as local time to UTC with microsecond resolution.

在 Spark 中导出或显示时间戳数据时,将使用会话时区来本地化时间戳值。When timestamp data is exported or displayed in Spark, the session time zone is used to localize the timestamp values. 会话时区是通过 spark.sql.session.timeZone 配置设置的,默认为 JVM 系统本地时区。The session time zone is set with the spark.sql.session.timeZone configuration and defaults to the JVM system local time zone. pandas 使用纳秒分辨率为 datetime64[ns]datetime64 类型,每个列有可选的时区。pandas uses a datetime64 type with nanosecond resolution, datetime64[ns], with optional time zone on a per-column basis.

将时间戳数据从 Spark 传输到 pandas 时,会将其转换为纳秒,并将每列转换为 Spark 会话时区,然后将其本地化为该时区,这样会删除时区并将值显示为本地时间。When timestamp data is transferred from Spark to pandas it is converted to nanoseconds and each column is converted to the Spark session time zone then localized to that time zone, which removes the time zone and displays values as local time. 带时间戳列调用 toPandas()pandas_udf 时,会发生这种情况。This occurs when calling toPandas() or pandas_udf with timestamp columns.

将时间戳数据从 pandas 传输到 Spark 时,会将其转换为 UTC 微秒。When timestamp data is transferred from pandas to Spark, it is converted to UTC microseconds. 当使用 pandas 数据帧调用 createDataFrame 或从 pandas UDF 返回时间戳时,会发生这种情况。This occurs when calling createDataFrame with a pandas DataFrame or when returning a timestamp from a pandas UDF. 这些转换会自动执行,目的是确保 Spark 具有预期格式的数据,因此你自己无需执行任何此类转换。These conversions are done automatically to ensure Spark has data in the expected format, so it is not necessary to do any of these conversions yourself. 任何纳秒值都会被截断。Any nanosecond values are truncated.

标准 UDF 将时间戳数据加载为 Python 日期/时间对象,这不同于 pandas 时间戳。A standard UDF loads timestamp data as Python datetime objects, which is different than a pandas timestamp. 为了获得最佳性能,我们建议你在 pandas UDF 中使用时间戳时使用 pandas 时序功能。To get the best performance, we recommend that you use pandas time series functionality when working with timestamps in a pandas UDF. 有关详细信息,请参阅时序/日期功能For details, see Time Series / Date functionality.

示例笔记本Example notebook

以下笔记本演示了可以通过 pandas UDF 实现的性能改进:The following notebook illustrates the performance improvements you can achieve with pandas UDFs:

pandas UDF 基准笔记本pandas UDFs benchmark notebook

获取笔记本Get notebook