优化 PySpark 与 Pandas 数据帧之间的转换Optimize conversion between PySpark and pandas DataFrames

Apache Arrow 是一种内存中纵栏式数据格式,在 Apache Spark 中用于在 JVM 和 Python 进程之间高效传输数据。Apache Arrow is an in-memory columnar data format used in Apache Spark to efficiently transfer data between JVM and Python processes. 这对于处理 Pandas 和 NumPy 数据的 Python 开发人员非常有利。This is beneficial to Python developers that work with pandas and NumPy data. 但是,其使用并不是自动的,需要对配置或代码进行一些小改动,才能充分利用并确保兼容性。However, its usage is not automatic and requires some minor changes to configuration or code to take full advantage and ensure compatibility.

PyArrow 版本PyArrow versions

PyArrow 安装在 Databricks Runtime 中。PyArrow is installed in Databricks Runtime. 有关每个 Databricks Runtime 版本中可用的 PyArrow 版本的信息,请参阅 Databricks Runtime 发行说明For information on the version of PyArrow available in each Databricks Runtime version, see the Databricks runtime release notes.

支持的 SQL 类型Supported SQL types

基于 Arrow 的转换支持除 MapTypeTimestampTypeArrayType 和嵌套的 StructType 外的所有 Spark SQL 数据类型。All Spark SQL data types are supported by Arrow-based conversion except MapType, ArrayType of TimestampType, and nested StructType. StructType 表示为 pandas.DataFrame 而不是 pandas.SeriesStructType is represented as a pandas.DataFrame instead of pandas.Series. 仅当 PyArrow 等于或高于 0.10.0 时,才支持 BinaryTypeBinaryType is supported only when PyArrow is equal to or higher than 0.10.0.

将 PySpark 数据帧与 Pandas 数据帧相互转换Convert PySpark DataFrames to and from pandas DataFrames

使用 toPandas() 将 PySpark 数据帧转换为 Pandas 数据帧时,以及使用 createDataFrame(pandas_df) 从 Pandas 数据帧创建 PySpark 数据帧时,可使用 Arrow 进行优化。Arrow is available as an optimization when converting a PySpark DataFrame to a pandas DataFrame with toPandas() and when creating a PySpark DataFrame from a pandas DataFrame with createDataFrame(pandas_df). 若要将 Arrow 用于这些方法,请将 Spark 配置 spark.sql.execution.arrow.enabled 设置为 trueTo use Arrow for these methods, set the Spark configuration spark.sql.execution.arrow.enabled to true. 默认情况下,此配置处于禁用状态。This configuration is disabled by default.

此外,如果在 Spark 内进行计算之前发生错误,则 spark.sql.execution.arrow.enabled 启用的优化可能会回退到非 Arrow 实现。In addition, optimizations enabled by spark.sql.execution.arrow.enabled could fall back to a non-Arrow implementation if an error occurs before the computation within Spark. 可以使用 Spark 配置 spark.sql.execution.arrow.fallback.enabled 来控制此行为。You can control this behavior using the Spark configuration spark.sql.execution.arrow.fallback.enabled.

示例Example

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Generate a pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

使用 Arrow 优化生成的结果与未启用 Arrow 时的结果相同。Using the Arrow optimizations produces the same results as when Arrow is not enabled. 即使使用 Arrow,toPandas() 也会将数据帧中的所有记录收集到驱动程序中,因此应该对数据的一小部分执行此操作。Even with Arrow, toPandas() results in the collection of all records in the DataFrame to the driver program and should be done on a small subset of the data.

另外,并非所有 Spark 数据类型都受支持。如果列的类型不受支持,则可能会引发错误。In addition, not all Spark data types are supported and an error can be raised if a column has an unsupported type. 如果在 createDataFrame() 期间发生错误,Spark 将回退到不使用 Arrow 创建数据帧。If an error occurs during createDataFrame(), Spark falls back to create the DataFrame without Arrow.