pandas 函数 APIpandas function APIs

使用 pandas 函数 API,可以直接将 Python 原生函数(接受并输出 pandas 实例)应用于 PySpark 数据帧。pandas function APIs enable you to directly apply a Python native function, which takes and outputs pandas instances, to a PySpark DataFrame. pandas 用户定义函数类似,函数 API 也使用 Apache Arrow 来传输数据,并使用 pandas 来处理数据;但是,Python 类型提示在 pandas 函数 API 中是可选的。Similar to pandas user-defined functions, function APIs also use Apache Arrow to transfer data and pandas to work with the data; however, Python type hints are optional in pandas function APIs.

有三种类型的 pandas 函数 API:There are three types of pandas function APIs:

  • 分组的映射Grouped map
  • 映射Map
  • 协同分组的映射Cogrouped map

pandas 函数 API 利用 pandas UDF 执行所使用的内部逻辑。pandas function APIs leverage the same internal logic that pandas UDF executions use. 因此,它与 pandas UDF 具有相同的特征,例如 PyArrow、支持的 SQL 类型以及配置。Therefore, it shares the same characteristics with pandas UDFs such as PyArrow, supported SQL types, and the configurations.

有关详细信息,请参阅博客文章:New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0(即将发布的 Apache Spark 3.0 中新增的 Pandas UDF 和 Python 类型提示)。For more information, see the blog post New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0.

分组的映射Grouped map

你可以通过 groupBy().applyInPandas() 转换已分组的数据,从而实现“拆分-应用-合并”模式。You transform your grouped data via groupBy().applyInPandas() to implement the “split-apply-combine” pattern. “拆分-应用-合并”包括三个步骤:Split-apply-combine consists of three steps:

  • 使用 DataFrame.groupBy 将数据拆分成组。Split the data into groups by using DataFrame.groupBy.
  • 对每个组应用函数。Apply a function on each group. 函数的输入和输出都是 pandas.DataFrameThe input and output of the function are both pandas.DataFrame. 输入数据包含每个组的所有行和列。The input data contains all the rows and columns for each group.
  • 将结果组合到一个新的 DataFrame 中。Combine the results into a new DataFrame.

若要使用 groupBy().applyInPandas(),必须定义以下内容:To use groupBy().applyInPandas(), you must define the following:

  • 定义了每个组的计算的 Python 函数A Python function that defines the computation for each group
  • 一个 StructType 对象或字符串,用于定义输出 DataFrame 的架构A StructType object or a string that defines the schema of the output DataFrame

返回的 pandas.DataFrame 的列标签必须与所定义的输出架构中的字段名称匹配(如果指定为字符串),而如果不是字符串,则必须按位置(例如,整数索引)与字段数据类型匹配。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. 请参阅 pandas.DataFrame 来了解在构造 pandas.DataFrame 时如何标记列。See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

在应用函数之前,组的所有数据都将加载到内存中。All data for a group is loaded into memory before the function is applied. 这可能会导致内存不足异常,特别是当组大小扭曲时。This can lead to out of memory exceptions, especially if the group sizes are skewed. maxRecordsPerBatch 的配置不应用于组,你需要确保分组的数据适合可用内存。The configuration for maxRecordsPerBatch is not applied on groups and it is up to you to ensure that the grouped data fits into the available memory.

下面的示例演示了如何使用 groupby().apply() 从组中的每个值中减去平均值。The following example shows how to use groupby().apply() to subtract the mean from each value in the group.

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

有关详细用法,请参阅 pyspark.sql.GroupedData.applyInPandasFor detailed usage, see pyspark.sql.GroupedData.applyInPandas.

映射Map

通过 DataFrame.mapInPandas() 对 pandas 实例执行映射操作,以便将 pandas.DataFrame 的迭代器转换为 pandas.DataFrame 的另一个迭代器,该迭代器表示当前的 PySpark 数据帧,并以 PySpark 数据帧的形式返回结果。You perform map operations with pandas instances by DataFrame.mapInPandas() in order to transform an iterator of pandas.DataFrame to another iterator of pandas.DataFrame that represents the current PySpark DataFrame and returns the result as a PySpark DataFrame.

基础函数接受并输出 pandas.DataFrame 的迭代器。The underlying function takes and outputs an iterator of pandas.DataFrame. 它可以返回任意长度的输出,这是相对于某些 pandas UDF(例如序列到序列 pandas UDF)而言。It can return the output of arbitrary length in contrast to some pandas UDFs such as Series to Series pandas UDF.

下面的示例演示如何使用 mapInPandas()The following example shows how to use mapInPandas():

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

有关详细用法,请参阅 pyspark.sql.DataFrame.applyInPandasFor detailed usage, please see pyspark.sql.DataFrame.applyInPandas.

协同分组的映射Cogrouped map

对于 pandas 实例的协同分组映射操作,请对要通过一个公共键协同分组的两个 PySpark DataFrame 使用 DataFrame.groupby().cogroup().applyInPandas(),然后将一个 Python 函数应用于每个协同组。For cogrouped map operations with pandas instances, use DataFrame.groupby().cogroup().applyInPandas() for two PySpark DataFrames to be cogrouped by a common key and then a Python function applied to each cogroup. 它包括以下步骤:It consists of the following steps:

  • 对数据进行混排,使共享一个密钥的每个数据帧的组被协同分组在一起。Shuffle the data such that the groups of each DataFrame which share a key are cogrouped together.
  • 将一个函数应用于每个协同组。Apply a function to each cogroup. 函数的输入是两个 pandas.DataFrame(包含一个表示密钥的可选元组)。The input of the function is two pandas.DataFrame (with an optional tuple representing the key). 函数的输出是一个 pandas.DataFrameThe output of the function is a pandas.DataFrame.
  • 将所有组中的 pandas.DataFrame 组合成一个新的 PySpark DataFrameCombine the pandas.DataFrames from all groups into a new PySpark DataFrame.

若要使用 groupBy().cogroup().applyInPandas(),必须定义以下内容:To use groupBy().cogroup().applyInPandas(), you must define the following:

  • 一个 Python 函数,用于定义每个协同组的计算。A Python function that defines the computation for each cogroup.
  • 一个 StructType 对象或字符串,用于定义输出 PySpark DataFrame 的架构。A StructType object or a string that defines the schema of the output PySpark DataFrame.

返回的 pandas.DataFrame 的列标签必须与所定义的输出架构中的字段名称匹配(如果指定为字符串),而如果不是字符串,则必须按位置(例如,整数索引)与字段数据类型匹配。The column labels of the returned pandas.DataFrame must either match the field names in the defined output schema if specified as strings, or match the field data types by position if not strings, for example, integer indices. 请参阅 pandas.DataFrame 来了解在构造 pandas.DataFrame 时如何标记列。See pandas.DataFrame for how to label columns when constructing a pandas.DataFrame.

在应用函数之前,协同组的所有数据都将加载到内存中。All data for a cogroup is loaded into memory before the function is applied. 这可能会导致内存不足异常,特别是当组大小扭曲时。This can lead to out of memory exceptions, especially if the group sizes are skewed. maxRecordsPerBatch 的配置不会应用,你需要确保协同分组的数据适合可用内存。The configuration for maxRecordsPerBatch is not applied and it is up to you to ensure that the cogrouped data fits into the available memory.

下面的示例展示了如何使用 groupby().cogroup().applyInPandas() 在两个数据集之间执行 asof joinThe following example shows how to use groupby().cogroup().applyInPandas() to perform an asof join between two datasets.

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+

有关详细用法,请参阅 pyspark.sql.PandasCogroupedOps.applyInPandasFor detailed usage, see pyspark.sql.PandasCogroupedOps.applyInPandas.