pandas 函数 API
使用 pandas 函数 API,可以直接将 Python 原生函数(接受并输出 pandas 实例)应用于 PySpark 数据帧。 与 pandas 用户定义函数类似,函数 API 也使用 Apache Arrow 来传输数据,并使用 pandas 来处理数据;但是,Python 类型提示在 pandas 函数 API 中是可选的。
有三种类型的 pandas 函数 API:
- 分组的映射
- 映射
- 协同分组的映射
pandas 函数 API 利用 pandas UDF 执行所使用的内部逻辑。 它们具有相同的特征,例如 PyArrow、支持的 SQL 类型以及配置。
有关详细信息,请参阅博客文章:New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0(即将发布的 Apache Spark 3.0 中新增的 Pandas UDF 和 Python 类型提示)。
分组的映射
你可以通过 groupBy().applyInPandas()
转换已分组的数据,从而实现“拆分-应用-合并”模式。 “拆分-应用-合并”包括三个步骤:
- 使用
DataFrame.groupBy
将数据拆分成组。 - 对每个组应用函数。 函数的输入和输出都是
pandas.DataFrame
。 输入数据包含每个组的所有行和列。 - 将结果组合到一个新的
DataFrame
中。
若要使用 groupBy().applyInPandas()
,必须定义以下内容:
- 定义了每个组的计算的 Python 函数
- 一个
StructType
对象或字符串,用于定义输出DataFrame
的架构
返回的 pandas.DataFrame
的列标签必须与所定义的输出架构中的字段名称匹配(如果指定为字符串),而如果不是字符串,则必须按位置(例如,整数索引)与字段数据类型匹配。 请参阅 pandas.DataFrame 来了解在构造 pandas.DataFrame
时如何标记列。
在应用函数之前,组的所有数据都将加载到内存中。 这可能会导致内存不足异常,特别是当组大小扭曲时。 maxRecordsPerBatch 的配置不应用于组,你需要确保分组的数据适合可用内存。
下面的示例演示了如何使用 groupby().apply()
从组中的每个值中减去平均值。
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
有关详细用法,请参阅 pyspark.sql.GroupedData.applyInPandas。
映射
通过 DataFrame.mapInPandas()
对 pandas 实例执行映射操作,以便将 pandas.DataFrame
的迭代器转换为 pandas.DataFrame
的另一个迭代器,该迭代器表示当前的 PySpark 数据帧,并以 PySpark 数据帧的形式返回结果。
基础函数接受并输出 pandas.DataFrame
的迭代器。 它可以返回任意长度的输出,这是相对于某些 pandas UDF(例如序列到序列)而言。
下面的示例演示如何使用 mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
有关详细用法,请参阅 pyspark.sql.DataFrame.mapInPandas。
协同分组的映射
对于 pandas 实例的协同分组映射操作,请对要通过一个公共键协同分组的两个 PySpark DataFrame
使用 DataFrame.groupby().cogroup().applyInPandas()
,然后将一个 Python 函数应用于每个协同组,如下所示:
- 对数据进行混排,使共享一个密钥的每个数据帧的组被协同分组在一起。
- 将一个函数应用于每个协同组。 函数的输入是两个
pandas.DataFrame
(包含一个表示密钥的可选元组)。 函数的输出是一个pandas.DataFrame
。 - 将所有组中的
pandas.DataFrame
组合成一个新的 PySparkDataFrame
。
若要使用 groupBy().cogroup().applyInPandas()
,必须定义以下内容:
- 一个 Python 函数,用于定义每个协同组的计算。
- 一个
StructType
对象或字符串,用于定义输出 PySparkDataFrame
的架构。
返回的 pandas.DataFrame
的列标签必须与所定义的输出架构中的字段名称匹配(如果指定为字符串),而如果不是字符串,则必须按位置(例如,整数索引)与字段数据类型匹配。 请参阅 pandas.DataFrame 来了解在构造 pandas.DataFrame
时如何标记列。
在应用函数之前,协同组的所有数据都将加载到内存中。 这可能会导致内存不足异常,特别是当组大小扭曲时。 maxRecordsPerBatch 的配置不会应用,你需要确保协同分组的数据适合可用内存。
下面的示例展示了如何使用 groupby().cogroup().applyInPandas()
在两个数据集之间执行 asof join
。
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
有关详细用法,请参阅 pyspark.sql.PandasCogroupedOps.applyInPandas。