本文通过演练一些简单的示例来说明 PySpark 的用法。 它假设你了解基本的 Apache Spark 概念,并在连接到计算的 Azure Databricks 笔记本中运行命令。 你将使用示例数据创建 DataFrame,对这些数据执行基本转换(包括行和列操作),合并多个 DataFrame 并聚合这些数据,可视化这些数据,然后将其保存到表或文件中。
本文中的一些示例使用 Databricks 提供的示例数据来演示如何使用 DataFrame 加载、转换和保存数据。 如果想要使用 Databricks 中尚不存在的你自己的数据,可以先将其上传,然后使用这些数据创建 DataFrame。 请参阅使用文件上传创建或修改表和将文件上传到 Unity Catalog 卷。
Databricks 在 samples
目录和 /databricks-datasets
目录中提供了示例数据。
- 若要访问
samples
目录中的示例数据,请使用格式samples.<schema-name>.<table-name>
。 本文使用samples.tpch
架构中的表,其中包含一家虚构企业的数据。customer
表包含客户的相关信息,orders
包含这些客户所下订单的相关信息。 - 使用
dbutils.fs.ls
浏览/databricks-datasets
中的数据。 使用 Spark SQL 或 DataFrame,通过文件路径来查询此位置中的数据。 若要详细了解 Databricks 提供的示例数据,请参阅示例数据集。
许多 PySpark 操作都要求使用 SQL 函数或与本机 Spark 类型交互。 可以仅直接导入所需的这些函数和类型,也可以导入整个模块。
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
由于某些导入的函数可能会替代 Python 内置函数,因此一些用户选择使用别名来导入这些模块。 以下示例展示了 Apache Spark 代码示例中常用的别名:
import pyspark.sql.types as T
import pyspark.sql.functions as F
有关数据类型的完整列表,请参阅 Spark 数据类型。
有关 PySpark SQL 函数的完整列表,请参阅 Spark 函数。
可通过多种方法来创建 DataFrame。 通常,需要根据数据源(例如表或文件集合)来定义 DataFrame。 然后,如 Apache Spark 基本概念部分中所述,使用 display
等操作触发要执行的转换。 display
方法可输出 DataFrame。
若要创建包含指定值的 DataFrame,请使用 createDataFrame
方法,其中行以元组列表的形式表示:
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
请注意,在输出中,df_children
列的数据类型是自动推断的。 也可以通过添加架构来指定类型。 架构是使用由 StructFields
组成的 StructType
定义的,其中指定了名称、数据类型和指示它们是否包含 null 值的布尔标志。 必须从 pyspark.sql.types
中导入数据类型。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
若要根据 Unity Catalog 中的表创建 DataFrame,请使用 table
方法,并通过格式 <catalog-name>.<schema-name>.<table-name>
来标识表。 单击左侧导航栏上的“目录”,使用“目录资源管理器”导航到表。 单击它,然后选择“复制表路径”,将表路径插入笔记本中。
以下示例会加载表 samples.tpch.customer
,但你也可以提供自己的表的路径。
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
若要根据上传到 Unity Catalog 卷的文件创建 DataFrame,请使用 read
属性。 此方法会返回一个 DataFrameReader
,然后可将其用于读取相应的格式。 单击左侧小边栏上的目录选项,并使用目录浏览器查找文件。 选择该文件,然后单击“复制卷文件路径”。
下面的示例从 *.csv
文件读取数据,但 DataFrameReader
支持上传许多其他格式的文件。 请参阅 DataFrameReader 方法。
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
有关 Unity Catalog 卷的详细信息,请参阅什么是 Unity Catalog 卷?。
若要根据 REST API 返回的 JSON 响应有效负载创建 DataFrame,请使用 Python requests
包来查询和分析响应。 必须导入包才能使用。 此示例使用来自美国食品和药物管理局药物申请数据库的数据。
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
有关在 Databricks 上处理 JSON 等半结构化数据的信息,请参阅对半结构化数据建模。
若要从转换后的 JSON 中选择特定字段或对象,请使用 []
表示法。 例如,若要选择本身就是产品数组的 products
字段,请使用以下表示法:
display(df_drugs.select(df_drugs["products"]))
还可以将方法调用链接在一起,以遍历多个字段。 例如,若要输出药物申请表中第一个产品的品牌名:
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
为了演示如何根据文件创建 DataFrame,此示例将加载 /databricks-datasets
目录中的 CSV 数据。
若要导航到示例数据集,可以使用 Databricks Utilties 文件系统命令。 以下示例使用 dbutils
列出在 /databricks-datasets
中可用的数据集:
display(dbutils.fs.ls('/databricks-datasets'))
或者,可以使用 %fs
访问 Databricks CLI 文件系统命令,如以下示例所示:
%fs ls '/databricks-datasets'
若要根据文件或文件目录创建 DataFrame,请在 load
方法中指定路径:
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
使用 DataFrame,可以利用内置方法对数据进行排序、筛选和聚合,从而轻松转换数据。 许多转换并不作为 DataFrame 的方法指定,而是在 spark.sql.functions
包中提供。 请参阅 Databricks Spark SQL 函数。
Spark 提供了许多基本列操作:
提示
若要输出 DataFrame 中的所有列,请使用 columns
,例如 df_customer.columns
。
可以使用 select
和 col
选择特定列。 col
函数位于 pyspark.sql.functions
子模块中。
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
还可以使用 expr
来引用列,它可以获取定义为字符串的表达式:
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
还可以使用 selectExpr
,它可接受 SQL 表达式:
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
若要使用字符串字面量选择列,请执行以下操作:
df_customer.select(
"c_custkey",
"c_acctbal"
)
若要从特定 DataFrame 显式选择列,可以使用 []
运算符或 .
运算符。 (.
运算符不能用于选择以整数开头的列或包含空格或特殊字符的列。)在联接 DataFrame 时,如果某些列具有相同的名称,这种方法尤其有用。
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
若要创建新列,请使用 withColumn
方法。 以下示例创建一个新列,该列包含一个布尔值,该值根据客户帐户余额 c_acctbal
是否超过 1000
来确定:
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
若要重命名列,请使用 withColumnRenamed
方法,该方法可接受现有列名称和新列名称:
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
如果要在聚合过程中同时重命名列,alias
方法特别有用:
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
在某些情况下,可能需要更改 DataFrame 中一列或多列的数据类型。 为此,请使用 cast
方法在列数据类型之间进行转换。 以下示例演示如何使用 col
方法引用列,将列从整数转换为字符串类型:
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
若要删除列,可以在选择时忽略这些列或使用 select(*) except
,也可以使用 drop
方法:
df_customer_flag_renamed.drop("balance_flag_renamed")
还可以一次性删除多列:
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
Spark 提供了许多基本行操作:
若要筛选行,请对 DataFrame 使用 filter
或 where
方法,以便仅返回特定行。 若要标识要筛选的列,请使用 col
方法或计算结果为列的表达式。
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
若要根据多个条件进行筛选,请使用逻辑运算符。 例如,&
和 |
可分别用于表示 AND
和 OR
条件。 以下示例筛选 c_nationkey
等于 20
且 c_acctbal
大于 1000
的行。
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
若要去除重复行,请使用 distinct
,它仅返回非重复行。
df_unique = df_customer.distinct()
若要处理 null 值,请使用 na.drop
方法删除包含 null 值的行。 使用此方法,可以指定是要删除包含 any
null 值的行,还是要删除包含 all
null 值的行。
若要删除任何 null 值,请使用以下示例之一。
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
如果只想筛除全部为 null 值的行,请使用以下方法:
df_customer_no_nulls = df_customer.na.drop("all")
通过指定以下命令,可以对列的子集应用此操作,如下所示:
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
若要填写缺失值,请使用 fill
方法。 可以选择将此方法应用于所有列或列的子集。 在下面的示例中,帐户余额 c_acctbal
为 null 值的帐户余额将填入 0
。
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
若要将字符串替换为其他值,请使用 replace
方法。 在下面的示例中,任何空地址字符串都将替换为 UNKNOWN
一词:
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
若要追加行,需要使用 union
方法创建新的 DataFrame。 在以下示例中,将之前创建的 DataFrame df_that_one_customer
和 df_filtered_customer
组合在一起,它将返回一个包含三个客户的 DataFrame:
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
备注
还可以将 DataFrame 写入表,然后追加新行,从而将其组合在一起。 对于生产工作负载,随着数据规模的增长,将数据源递增地处理到目标表可以大幅降低延迟和计算成本。 查看将数据输入 Databricks 湖屋。
重要
对大量数据进行排序的成本可能很高,而且如果存储已排序的数据并使用 Spark 重新加载数据,则不能保证顺序。 请确保你在使用排序时是有明确意图的。
若要按一列或多列对行进行排序,请使用 sort
或 orderBy
方法。 默认情况下,这些方法按升序排序:
df_customer.orderBy(col("c_acctbal"))
若要按降序筛选,请使用 desc
:
df_customer.sort(col("c_custkey").desc())
以下示例演示如何对两列进行排序:
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
若要限制在 DataFrame 排序后要返回的行数,请使用 limit
方法。 以下示例仅显示前 10
个结果:
display(df_sorted.limit(10))
若要联接两个或多个 DataFrame,请使用 join
方法。 可以在 how
(联接类型)和 on
(基于哪些列进行联接)参数中指定联接 DataFrame 的方式。 常见的联接类型包括:
inner
:这是默认的联接类型,它返回的 DataFrame 仅保留那些在 DataFrame 中的on
参数有匹配项的行。left
:此类型会保留第一个指定 DataFrame 的所有行,以及第二个指定 DataFrame 中与第一个 DataFrame 有匹配项的行。outer
:无论是否有匹配项,外部联接都会保留这两个 DataFrame 中的所有行。
有关联接的详细信息,请参阅在 Azure Databricks 上使用联接。 有关 PySpark 支持的联接列表,请参阅 DataFrame 联接。
以下示例返回一个 DataFrame,其中 orders
DataFrame 的每一行都与 customers
DataFrame 中的相应行联接。 使用的是内联,因为预期每个订单都对应于一个客户。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
若要基于多个条件进行联接,请使用布尔运算符(例如 &
和 |
)分别指定 AND
和 OR
。 以下示例添加了一个附加条件,仅筛选 o_totalprice
大于 500,000
的行:
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
若要聚合 DataFrame 中的数据(类似于 SQL 中的 GROUP BY
),请使用 groupBy
方法指定要作为分组依据的列,并使用 agg
方法指定聚合。 从 pyspark.sql.functions
中导入常见聚合,包括 avg
、sum
、max
和 min
。 以下示例显示了按市场细分划分的平均客户余额:
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
某些聚合是操作,这意味着它们会触发计算。 在这种情况下,就无需使用其他操作来输出结果。
若要对 DataFrame 中的行进行计数,请使用 count
方法:
df_customer.count()
转换 DataFrame 的方法会返回 DataFrame,在调用操作之前,Spark 不会对转换执行操作。 这种延迟计算意味着你可以将多个方法链接起来,既方便又易读。 以下示例演示了如何链接筛选、聚合和排序:
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
若要在笔记本中可视化 DataFrame,请单击 DataFrame 左上角的表旁边的 + 符号,然后选择“可视化”,根据 DataFrame 添加一个或多个图表。 有关可视化效果的详细信息,请参阅 Databricks 笔记本中的可视化效果。
display(df_order)
若要执行其他可视化,Databricks 建议使用适用于 Spark 的 Pandas API。 使用 .pandas_api()
,可以强制转换为 Spark DataFrame 对应的 Pandas API。 有关详细信息,请参阅 Spark 上的 Pandas API。
转换数据后,可以使用 DataFrameWriter
方法保存数据。 可以在 DataFrameWriter 中找到这些方法的完整列表。 以下部分演示了如何将 DataFrame 另存为表和数据文件集合。
若要将 DataFrame 另存为 Unity Catalog 中的表,请使用 write.saveAsTable
方法,并按照 <catalog-name>.<schema-name>.<table-name>
格式指定路径。
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
若要将 DataFrame 写入为 *.csv
格式,请使用 write.csv
方法,并指定格式和选项。 默认情况下,如果指定路径存在数据,写入操作将失败。 可以指定以下模式之一以执行不同的操作:
overwrite
使用 DataFrame 内容覆盖目标路径中的所有现有数据。append
将 DataFrame 的内容追加到目标路径中的数据。ignore
在目标路径中存在数据时将写入操作静默失败。
以下示例演示了如何使用 DataFrame 内容覆盖数据,以 CSV 文件保存:
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
若要在 Databricks 上利用更多 Spark 功能,请参阅: