PySpark 基础知识

本文通过演练一些简单的示例来说明 PySpark 的用法。 它假设你了解基本的 Apache Spark 概念,并在连接到计算的 Azure Databricks 笔记本中运行命令。 你将使用示例数据创建 DataFrame,对这些数据执行基本转换(包括行和列操作),合并多个 DataFrame 并聚合这些数据,可视化这些数据,然后将其保存到表或文件中。

上传数据

本文中的一些示例使用 Databricks 提供的示例数据来演示如何使用 DataFrame 加载、转换和保存数据。 如果想要使用 Databricks 中尚不存在的你自己的数据,可以先将其上传,然后使用这些数据创建 DataFrame。 请参阅使用文件上传创建或修改表将文件上传到 Unity Catalog 卷

关于 Databricks 示例数据

Databricks 在 samples 目录和 /databricks-datasets 目录中提供了示例数据。

  • 若要访问 samples 目录中的示例数据,请使用格式 samples.<schema-name>.<table-name>。 本文使用 samples.tpch 架构中的表,其中包含一家虚构企业的数据。 customer 表包含客户的相关信息,orders 包含这些客户所下订单的相关信息。
  • 使用 dbutils.fs.ls 浏览 /databricks-datasets 中的数据。 使用 Spark SQL 或 DataFrame,通过文件路径来查询此位置中的数据。 若要详细了解 Databricks 提供的示例数据,请参阅示例数据集

导入数据类型

许多 PySpark 操作都要求使用 SQL 函数或与本机 Spark 类型交互。 可以仅直接导入所需的这些函数和类型,也可以导入整个模块。

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

由于某些导入的函数可能会替代 Python 内置函数,因此一些用户选择使用别名来导入这些模块。 以下示例展示了 Apache Spark 代码示例中常用的别名:

import pyspark.sql.types as T
import pyspark.sql.functions as F

有关数据类型的完整列表,请参阅 Spark 数据类型

有关 PySpark SQL 函数的完整列表,请参阅 Spark 函数

创建 DataFrame

可通过多种方法来创建 DataFrame。 通常,需要根据数据源(例如表或文件集合)来定义 DataFrame。 然后,如 Apache Spark 基本概念部分中所述,使用 display 等操作触发要执行的转换。 display 方法可输出 DataFrame。

创建包含指定值的 DataFrame

若要创建包含指定值的 DataFrame,请使用 createDataFrame 方法,其中行以元组列表的形式表示:

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

请注意,在输出中,df_children 列的数据类型是自动推断的。 也可以通过添加架构来指定类型。 架构是使用由 StructFields 组成的 StructType 定义的,其中指定了名称、数据类型和指示它们是否包含 null 值的布尔标志。 必须从 pyspark.sql.types 中导入数据类型。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

根据 Unity Catalog 中的表创建 DataFrame

若要根据 Unity Catalog 中的表创建 DataFrame,请使用 table 方法,并通过格式 <catalog-name>.<schema-name>.<table-name> 来标识表。 单击左侧导航栏上的“目录”,使用“目录资源管理器”导航到表。 单击它,然后选择“复制表路径”,将表路径插入笔记本中

以下示例会加载表 samples.tpch.customer,但你也可以提供自己的表的路径。

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

根据上传的文件创建 DataFrame

若要根据上传到 Unity Catalog 卷的文件创建 DataFrame,请使用 read 属性。 此方法会返回一个 DataFrameReader,然后可将其用于读取相应的格式。 单击左侧小边栏上的目录选项,并使用目录浏览器查找文件。 选择该文件,然后单击“复制卷文件路径”

下面的示例从 *.csv 文件读取数据,但 DataFrameReader 支持上传许多其他格式的文件。 请参阅 DataFrameReader 方法

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

有关 Unity Catalog 卷的详细信息,请参阅什么是 Unity Catalog 卷?

根据 JSON 响应创建 DataFrame

若要根据 REST API 返回的 JSON 响应有效负载创建 DataFrame,请使用 Python requests 包来查询和分析响应。 必须导入包才能使用。 此示例使用来自美国食品和药物管理局药物申请数据库的数据。

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

有关在 Databricks 上处理 JSON 等半结构化数据的信息,请参阅对半结构化数据建模

选择 JSON 字段或对象

若要从转换后的 JSON 中选择特定字段或对象,请使用 [] 表示法。 例如,若要选择本身就是产品数组的 products 字段,请使用以下表示法:

display(df_drugs.select(df_drugs["products"]))

还可以将方法调用链接在一起,以遍历多个字段。 例如,若要输出药物申请表中第一个产品的品牌名:

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

根据文件创建 DataFrame

为了演示如何根据文件创建 DataFrame,此示例将加载 /databricks-datasets 目录中的 CSV 数据。

若要导航到示例数据集,可以使用 Databricks Utilties 文件系统命令。 以下示例使用 dbutils 列出在 /databricks-datasets 中可用的数据集:

display(dbutils.fs.ls('/databricks-datasets'))

或者,可以使用 %fs 访问 Databricks CLI 文件系统命令,如以下示例所示:

%fs ls '/databricks-datasets'

若要根据文件或文件目录创建 DataFrame,请在 load 方法中指定路径:

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

使用 DataFrame 转换数据

使用 DataFrame,可以利用内置方法对数据进行排序、筛选和聚合,从而轻松转换数据。 许多转换并不作为 DataFrame 的方法指定,而是在 spark.sql.functions 包中提供。 请参阅 Databricks Spark SQL 函数

列操作

Spark 提供了许多基本列操作:

提示

若要输出 DataFrame 中的所有列,请使用 columns,例如 df_customer.columns

选择列

可以使用 selectcol 选择特定列。 col 函数位于 pyspark.sql.functions 子模块中。

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

还可以使用 expr 来引用列,它可以获取定义为字符串的表达式:

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

还可以使用 selectExpr,它可接受 SQL 表达式:

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

若要使用字符串字面量选择列,请执行以下操作:

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

若要从特定 DataFrame 显式选择列,可以使用 [] 运算符或 . 运算符。 (. 运算符不能用于选择以整数开头的列或包含空格或特殊字符的列。)在联接 DataFrame 时,如果某些列具有相同的名称,这种方法尤其有用。

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

创建列

若要创建新列,请使用 withColumn 方法。 以下示例创建一个新列,该列包含一个布尔值,该值根据客户帐户余额 c_acctbal 是否超过 1000 来确定:

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

重命名列

若要重命名列,请使用 withColumnRenamed 方法,该方法可接受现有列名称和新列名称:

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

如果要在聚合过程中同时重命名列,alias 方法特别有用:

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

强制转换列类型

在某些情况下,可能需要更改 DataFrame 中一列或多列的数据类型。 为此,请使用 cast 方法在列数据类型之间进行转换。 以下示例演示如何使用 col 方法引用列,将列从整数转换为字符串类型:

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

删除列

若要删除列,可以在选择时忽略这些列或使用 select(*) except,也可以使用 drop 方法:

df_customer_flag_renamed.drop("balance_flag_renamed")

还可以一次性删除多列:

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

行操作

Spark 提供了许多基本行操作:

筛选行

若要筛选行,请对 DataFrame 使用 filterwhere 方法,以便仅返回特定行。 若要标识要筛选的列,请使用 col 方法或计算结果为列的表达式。

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

若要根据多个条件进行筛选,请使用逻辑运算符。 例如,&| 可分别用于表示 ANDOR 条件。 以下示例筛选 c_nationkey 等于 20c_acctbal 大于 1000 的行。

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

删除重复行

若要去除重复行,请使用 distinct,它仅返回非重复行。

df_unique = df_customer.distinct()

处理 null 值

若要处理 null 值,请使用 na.drop 方法删除包含 null 值的行。 使用此方法,可以指定是要删除包含 any null 值的行,还是要删除包含 all null 值的行。

若要删除任何 null 值,请使用以下示例之一。

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

如果只想筛除全部为 null 值的行,请使用以下方法:

df_customer_no_nulls = df_customer.na.drop("all")

通过指定以下命令,可以对列的子集应用此操作,如下所示:

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

若要填写缺失值,请使用 fill 方法。 可以选择将此方法应用于所有列或列的子集。 在下面的示例中,帐户余额 c_acctbal 为 null 值的帐户余额将填入 0

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

若要将字符串替换为其他值,请使用 replace 方法。 在下面的示例中,任何空地址字符串都将替换为 UNKNOWN 一词:

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

追加行

若要追加行,需要使用 union 方法创建新的 DataFrame。 在以下示例中,将之前创建的 DataFrame df_that_one_customerdf_filtered_customer 组合在一起,它将返回一个包含三个客户的 DataFrame:

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

注意

还可以将 DataFrame 写入表,然后追加新行,从而将其组合在一起。 对于生产工作负载,随着数据规模的增长,将数据源递增地处理到目标表可以大幅降低延迟和计算成本。 查看将数据输入 Databricks 湖屋

对行进行排序

重要

对大量数据进行排序的成本可能很高,而且如果存储已排序的数据并使用 Spark 重新加载数据,则不能保证顺序。 请确保你在使用排序时是有明确意图的。

若要按一列或多列对行进行排序,请使用 sortorderBy 方法。 默认情况下,这些方法按升序排序:

df_customer.orderBy(col("c_acctbal"))

若要按降序筛选,请使用 desc

df_customer.sort(col("c_custkey").desc())

以下示例演示如何对两列进行排序:

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

若要限制在 DataFrame 排序后要返回的行数,请使用 limit 方法。 以下示例仅显示前 10 个结果:

display(df_sorted.limit(10))

联接数据帧

若要联接两个或多个 DataFrame,请使用 join 方法。 可以在 how(联接类型)和 on(基于哪些列进行联接)参数中指定联接 DataFrame 的方式。 常见的联接类型包括:

  • inner:这是默认的联接类型,它返回的 DataFrame 仅保留那些在 DataFrame 中的 on 参数有匹配项的行。
  • left:此类型会保留第一个指定 DataFrame 的所有行,以及第二个指定 DataFrame 中与第一个 DataFrame 有匹配项的行。
  • outer:无论是否有匹配项,外部联接都会保留这两个 DataFrame 中的所有行。

有关联接的详细信息,请参阅在 Azure Databricks 上使用联接。 有关 PySpark 支持的联接列表,请参阅 DataFrame 联接

以下示例返回一个 DataFrame,其中 orders DataFrame 的每一行都与 customers DataFrame 中的相应行联接。 使用的是内联,因为预期每个订单都对应于一个客户。

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

若要基于多个条件进行联接,请使用布尔运算符(例如 &|)分别指定 ANDOR。 以下示例添加了一个附加条件,仅筛选 o_totalprice 大于 500,000 的行:

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

聚合数据

若要聚合 DataFrame 中的数据(类似于 SQL 中的 GROUP BY),请使用 groupBy 方法指定要作为分组依据的列,并使用 agg 方法指定聚合。 从 pyspark.sql.functions 中导入常见聚合,包括 avgsummaxmin。 以下示例显示了按市场细分划分的平均客户余额:

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

某些聚合是操作,这意味着它们会触发计算。 在这种情况下,就无需使用其他操作来输出结果。

若要对 DataFrame 中的行进行计数,请使用 count 方法:

df_customer.count()

链接调用

转换 DataFrame 的方法会返回 DataFrame,在调用操作之前,Spark 不会对转换执行操作。 这种延迟计算意味着你可以将多个方法链接起来,既方便又易读。 以下示例演示了如何链接筛选、聚合和排序:

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

可视化 DataFrame

若要在笔记本中可视化 DataFrame,请单击 DataFrame 左上角的表旁边的 + 符号,然后选择“可视化”,根据 DataFrame 添加一个或多个图表。 有关可视化效果的详细信息,请参阅 Databricks 笔记本中的可视化效果

display(df_order)

若要执行其他可视化,Databricks 建议使用适用于 Spark 的 Pandas API。 使用 .pandas_api(),可以强制转换为 Spark DataFrame 对应的 Pandas API。 有关详细信息,请参阅 Spark 上的 Pandas API

保存数据

转换数据后,可以使用 DataFrameWriter 方法保存数据。 可以在 DataFrameWriter 中找到这些方法的完整列表。 以下部分演示了如何将 DataFrame 另存为表和数据文件集合。

将 DataFrame 另存为表

若要将 DataFrame 另存为 Unity Catalog 中的表,请使用 write.saveAsTable 方法,并按照 <catalog-name>.<schema-name>.<table-name> 格式指定路径。

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

将 DataFrame 写入为 CSV

若要将 DataFrame 写入为 *.csv 格式,请使用 write.csv 方法,并指定格式和选项。 默认情况下,如果指定路径存在数据,写入操作将失败。 可以指定以下模式之一以执行不同的操作:

  • overwrite 使用 DataFrame 内容覆盖目标路径中的所有现有数据。
  • append 将 DataFrame 的内容追加到目标路径中的数据。
  • ignore 在目标路径中存在数据时将写入操作静默失败。

以下示例演示了如何使用 DataFrame 内容覆盖数据,以 CSV 文件保存:

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

后续步骤

若要在 Databricks 上利用更多 Spark 功能,请参阅: