PySpark 基础知识
本文通过演练一些简单的示例来说明 PySpark 的用法。 它假设你了解基本的 Apache Spark 概念,并在连接到计算的 Azure Databricks 笔记本中运行命令。 你将使用示例数据创建 DataFrame,对这些数据执行基本转换(包括行和列操作),合并多个 DataFrame 并聚合这些数据,可视化这些数据,然后将其保存到表或文件中。
上传数据
本文中的一些示例使用 Databricks 提供的示例数据来演示如何使用 DataFrame 加载、转换和保存数据。 如果想要使用 Databricks 中尚不存在的你自己的数据,可以先将其上传,然后使用这些数据创建 DataFrame。 请参阅使用文件上传创建或修改表和将文件上传到 Unity Catalog 卷。
关于 Databricks 示例数据
Databricks 在 samples
目录和 /databricks-datasets
目录中提供了示例数据。
- 若要访问
samples
目录中的示例数据,请使用格式samples.<schema-name>.<table-name>
。 本文使用samples.tpch
架构中的表,其中包含一家虚构企业的数据。customer
表包含客户的相关信息,orders
包含这些客户所下订单的相关信息。 - 使用
dbutils.fs.ls
浏览/databricks-datasets
中的数据。 使用 Spark SQL 或 DataFrame,通过文件路径来查询此位置中的数据。 若要详细了解 Databricks 提供的示例数据,请参阅示例数据集。
导入数据类型
许多 PySpark 操作都要求使用 SQL 函数或与本机 Spark 类型交互。 可以仅直接导入所需的这些函数和类型,也可以导入整个模块。
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
由于某些导入的函数可能会替代 Python 内置函数,因此一些用户选择使用别名来导入这些模块。 以下示例展示了 Apache Spark 代码示例中常用的别名:
import pyspark.sql.types as T
import pyspark.sql.functions as F
有关数据类型的完整列表,请参阅 Spark 数据类型。
有关 PySpark SQL 函数的完整列表,请参阅 Spark 函数。
创建 DataFrame
可通过多种方法来创建 DataFrame。 通常,需要根据数据源(例如表或文件集合)来定义 DataFrame。 然后,如 Apache Spark 基本概念部分中所述,使用 display
等操作触发要执行的转换。 display
方法可输出 DataFrame。
创建包含指定值的 DataFrame
若要创建包含指定值的 DataFrame,请使用 createDataFrame
方法,其中行以元组列表的形式表示:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
请注意,在输出中,df_children
列的数据类型是自动推断的。 也可以通过添加架构来指定类型。 架构是使用由 StructFields
组成的 StructType
定义的,其中指定了名称、数据类型和指示它们是否包含 null 值的布尔标志。 必须从 pyspark.sql.types
中导入数据类型。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
根据 Unity Catalog 中的表创建 DataFrame
若要根据 Unity Catalog 中的表创建 DataFrame,请使用 table
方法,并通过格式 <catalog-name>.<schema-name>.<table-name>
来标识表。 单击左侧导航栏上的“目录”,使用“目录资源管理器”导航到表。 单击它,然后选择“复制表路径”,将表路径插入笔记本中。
以下示例会加载表 samples.tpch.customer
,但你也可以提供自己的表的路径。
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
根据上传的文件创建 DataFrame
若要根据上传到 Unity Catalog 卷的文件创建 DataFrame,请使用 read
属性。 此方法会返回一个 DataFrameReader
,然后可将其用于读取相应的格式。 单击左侧小边栏上的目录选项,并使用目录浏览器查找文件。 选择该文件,然后单击“复制卷文件路径”。
下面的示例从 *.csv
文件读取数据,但 DataFrameReader
支持上传许多其他格式的文件。 请参阅 DataFrameReader 方法。
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
有关 Unity Catalog 卷的详细信息,请参阅什么是 Unity Catalog 卷?。
根据 JSON 响应创建 DataFrame
若要根据 REST API 返回的 JSON 响应有效负载创建 DataFrame,请使用 Python requests
包来查询和分析响应。 必须导入包才能使用。 此示例使用来自美国食品和药物管理局药物申请数据库的数据。
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
有关在 Databricks 上处理 JSON 等半结构化数据的信息,请参阅对半结构化数据建模。
选择 JSON 字段或对象
若要从转换后的 JSON 中选择特定字段或对象,请使用 []
表示法。 例如,若要选择本身就是产品数组的 products
字段,请使用以下表示法:
display(df_drugs.select(df_drugs["products"]))
还可以将方法调用链接在一起,以遍历多个字段。 例如,若要输出药物申请表中第一个产品的品牌名:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
根据文件创建 DataFrame
为了演示如何根据文件创建 DataFrame,此示例将加载 /databricks-datasets
目录中的 CSV 数据。
若要导航到示例数据集,可以使用 Databricks Utilties 文件系统命令。 以下示例使用 dbutils
列出在 /databricks-datasets
中可用的数据集:
display(dbutils.fs.ls('/databricks-datasets'))
或者,可以使用 %fs
访问 Databricks CLI 文件系统命令,如以下示例所示:
%fs ls '/databricks-datasets'
若要根据文件或文件目录创建 DataFrame,请在 load
方法中指定路径:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
使用 DataFrame 转换数据
使用 DataFrame,可以利用内置方法对数据进行排序、筛选和聚合,从而轻松转换数据。 许多转换并不作为 DataFrame 的方法指定,而是在 spark.sql.functions
包中提供。 请参阅 Databricks Spark SQL 函数。
列操作
Spark 提供了许多基本列操作:
提示
若要输出 DataFrame 中的所有列,请使用 columns
,例如 df_customer.columns
。
选择列
可以使用 select
和 col
选择特定列。 col
函数位于 pyspark.sql.functions
子模块中。
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
还可以使用 expr
来引用列,它可以获取定义为字符串的表达式:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
还可以使用 selectExpr
,它可接受 SQL 表达式:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
若要使用字符串字面量选择列,请执行以下操作:
df_customer.select(
"c_custkey",
"c_acctbal"
)
若要从特定 DataFrame 显式选择列,可以使用 []
运算符或 .
运算符。 (.
运算符不能用于选择以整数开头的列或包含空格或特殊字符的列。)在联接 DataFrame 时,如果某些列具有相同的名称,这种方法尤其有用。
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
创建列
若要创建新列,请使用 withColumn
方法。 以下示例创建一个新列,该列包含一个布尔值,该值根据客户帐户余额 c_acctbal
是否超过 1000
来确定:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
重命名列
若要重命名列,请使用 withColumnRenamed
方法,该方法可接受现有列名称和新列名称:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
如果要在聚合过程中同时重命名列,alias
方法特别有用:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
强制转换列类型
在某些情况下,可能需要更改 DataFrame 中一列或多列的数据类型。 为此,请使用 cast
方法在列数据类型之间进行转换。 以下示例演示如何使用 col
方法引用列,将列从整数转换为字符串类型:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
删除列
若要删除列,可以在选择时忽略这些列或使用 select(*) except
,也可以使用 drop
方法:
df_customer_flag_renamed.drop("balance_flag_renamed")
还可以一次性删除多列:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
行操作
Spark 提供了许多基本行操作:
筛选行
若要筛选行,请对 DataFrame 使用 filter
或 where
方法,以便仅返回特定行。 若要标识要筛选的列,请使用 col
方法或计算结果为列的表达式。
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
若要根据多个条件进行筛选,请使用逻辑运算符。 例如,&
和 |
可分别用于表示 AND
和 OR
条件。 以下示例筛选 c_nationkey
等于 20
且 c_acctbal
大于 1000
的行。
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
删除重复行
若要去除重复行,请使用 distinct
,它仅返回非重复行。
df_unique = df_customer.distinct()
处理 null 值
若要处理 null 值,请使用 na.drop
方法删除包含 null 值的行。 使用此方法,可以指定是要删除包含 any
null 值的行,还是要删除包含 all
null 值的行。
若要删除任何 null 值,请使用以下示例之一。
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
如果只想筛除全部为 null 值的行,请使用以下方法:
df_customer_no_nulls = df_customer.na.drop("all")
通过指定以下命令,可以对列的子集应用此操作,如下所示:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
若要填写缺失值,请使用 fill
方法。 可以选择将此方法应用于所有列或列的子集。 在下面的示例中,帐户余额 c_acctbal
为 null 值的帐户余额将填入 0
。
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
若要将字符串替换为其他值,请使用 replace
方法。 在下面的示例中,任何空地址字符串都将替换为 UNKNOWN
一词:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
追加行
若要追加行,需要使用 union
方法创建新的 DataFrame。 在以下示例中,将之前创建的 DataFrame df_that_one_customer
和 df_filtered_customer
组合在一起,它将返回一个包含三个客户的 DataFrame:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
注意
还可以将 DataFrame 写入表,然后追加新行,从而将其组合在一起。 对于生产工作负载,随着数据规模的增长,将数据源递增地处理到目标表可以大幅降低延迟和计算成本。 查看将数据输入 Databricks 湖屋。
对行进行排序
重要
对大量数据进行排序的成本可能很高,而且如果存储已排序的数据并使用 Spark 重新加载数据,则不能保证顺序。 请确保你在使用排序时是有明确意图的。
若要按一列或多列对行进行排序,请使用 sort
或 orderBy
方法。 默认情况下,这些方法按升序排序:
df_customer.orderBy(col("c_acctbal"))
若要按降序筛选,请使用 desc
:
df_customer.sort(col("c_custkey").desc())
以下示例演示如何对两列进行排序:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
若要限制在 DataFrame 排序后要返回的行数,请使用 limit
方法。 以下示例仅显示前 10
个结果:
display(df_sorted.limit(10))
联接数据帧
若要联接两个或多个 DataFrame,请使用 join
方法。 可以在 how
(联接类型)和 on
(基于哪些列进行联接)参数中指定联接 DataFrame 的方式。 常见的联接类型包括:
inner
:这是默认的联接类型,它返回的 DataFrame 仅保留那些在 DataFrame 中的on
参数有匹配项的行。left
:此类型会保留第一个指定 DataFrame 的所有行,以及第二个指定 DataFrame 中与第一个 DataFrame 有匹配项的行。outer
:无论是否有匹配项,外部联接都会保留这两个 DataFrame 中的所有行。
有关联接的详细信息,请参阅在 Azure Databricks 上使用联接。 有关 PySpark 支持的联接列表,请参阅 DataFrame 联接。
以下示例返回一个 DataFrame,其中 orders
DataFrame 的每一行都与 customers
DataFrame 中的相应行联接。 使用的是内联,因为预期每个订单都对应于一个客户。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
若要基于多个条件进行联接,请使用布尔运算符(例如 &
和 |
)分别指定 AND
和 OR
。 以下示例添加了一个附加条件,仅筛选 o_totalprice
大于 500,000
的行:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
聚合数据
若要聚合 DataFrame 中的数据(类似于 SQL 中的 GROUP BY
),请使用 groupBy
方法指定要作为分组依据的列,并使用 agg
方法指定聚合。 从 pyspark.sql.functions
中导入常见聚合,包括 avg
、sum
、max
和 min
。 以下示例显示了按市场细分划分的平均客户余额:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
某些聚合是操作,这意味着它们会触发计算。 在这种情况下,就无需使用其他操作来输出结果。
若要对 DataFrame 中的行进行计数,请使用 count
方法:
df_customer.count()
链接调用
转换 DataFrame 的方法会返回 DataFrame,在调用操作之前,Spark 不会对转换执行操作。 这种延迟计算意味着你可以将多个方法链接起来,既方便又易读。 以下示例演示了如何链接筛选、聚合和排序:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
可视化 DataFrame
若要在笔记本中可视化 DataFrame,请单击 DataFrame 左上角的表旁边的 + 符号,然后选择“可视化”,根据 DataFrame 添加一个或多个图表。 有关可视化效果的详细信息,请参阅 Databricks 笔记本中的可视化效果。
display(df_order)
若要执行其他可视化,Databricks 建议使用适用于 Spark 的 Pandas API。 使用 .pandas_api()
,可以强制转换为 Spark DataFrame 对应的 Pandas API。 有关详细信息,请参阅 Spark 上的 Pandas API。
保存数据
转换数据后,可以使用 DataFrameWriter
方法保存数据。 可以在 DataFrameWriter 中找到这些方法的完整列表。 以下部分演示了如何将 DataFrame 另存为表和数据文件集合。
将 DataFrame 另存为表
若要将 DataFrame 另存为 Unity Catalog 中的表,请使用 write.saveAsTable
方法,并按照 <catalog-name>.<schema-name>.<table-name>
格式指定路径。
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
将 DataFrame 写入为 CSV
若要将 DataFrame 写入为 *.csv
格式,请使用 write.csv
方法,并指定格式和选项。 默认情况下,如果指定路径存在数据,写入操作将失败。 可以指定以下模式之一以执行不同的操作:
overwrite
使用 DataFrame 内容覆盖目标路径中的所有现有数据。append
将 DataFrame 的内容追加到目标路径中的数据。ignore
在目标路径中存在数据时将写入操作静默失败。
以下示例演示了如何使用 DataFrame 内容覆盖数据,以 CSV 文件保存:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
后续步骤
若要在 Databricks 上利用更多 Spark 功能,请参阅: