在 PySpark 与 Pandas 数据帧之间进行转换

了解如何在 Azure Databricks 中使用 Apache Arrow 在 Apache Spark 数据帧与 Pandas 数据帧之间进行转换。

Apache Arrow 和 PyArrow

Apache Arrow 是一种内存中纵栏式数据格式,在 Apache Spark 中用于在 JVM 和 Python 进程之间高效传输数据。 这对于处理 Pandas 和 NumPy 数据的 Python 开发人员非常有利。 但是,它的使用需要一些细微的配置或代码更改,以确保兼容性并获得最大的好处。

PyArrow 是 Apache Arrow 的 Python 绑定,安装在 Databricks Runtime 中。 有关每个 Databricks Runtime 版本中可用的 PyArrow 版本的信息,请参阅 Databricks Runtime 发行说明版本和兼容性

支持的 SQL 类型

基于 Arrow 的转换支持除 TimestampTypeArrayType 外的所有 Spark SQL 数据类型。 仅当使用 PyArrow 2.0.0.0 及更高版本时,才支持 MapType 和嵌套的 StructTypeArrayTypeStructType 表示为 pandas.DataFrame 而不是 pandas.Series

将 PySpark 数据帧与 Pandas 数据帧相互转换

使用 toPandas() 将 PySpark 数据帧转换为 Pandas 数据帧时,以及使用 createDataFrame(pandas_df) 从 Pandas 数据帧创建 PySpark 数据帧时,可使用 Arrow 进行优化。

若要将 Arrow 用于这些方法,请将 Spark 配置 spark.sql.execution.arrow.pyspark.enabled 设置为 true。 默认情况下启用此配置,但已启用 Unity Catalog 的工作区中的高并发群集和用户隔离群集除外。

此外,如果在 Spark 内进行计算之前发生错误,则 spark.sql.execution.arrow.pyspark.enabled 启用的优化可能会回退到非 Arrow 实现。 可以使用 Spark 配置 spark.sql.execution.arrow.pyspark.fallback.enabled 来控制此行为。

示例

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

使用 Arrow 优化生成的结果与未启用 Arrow 时的结果相同。 即使使用 Arrow,toPandas() 也会将数据帧中的所有记录收集到驱动程序中,因此应该对数据的一小部分执行此操作。

另外,并非所有 Spark 数据类型都受支持。如果列的类型不受支持,则可能会引发错误。 如果在 createDataFrame() 期间发生错误,Spark 会在不使用 Arrow 的情况下创建数据帧。