教程:在 PySpark 数据帧中加载和转换数据
本教程介绍如何在 Azure Databricks 中使用 Apache Spark Python (PySpark) 数据帧 API 加载和转换美国城市数据。
本教程结束时,你可了解数据帧是什么并熟悉以下任务:
另请参阅 Apache Spark PySpark API 参考。
什么是数据帧?
数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。
Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。
要求
若要完成以下教程,必须满足以下要求:
- 你已登录到 Azure Databricks 工作区。
- 你有权创建启用了 Unity Catalog 的计算。
注意
如果没有群集控制特权,只要拥有群集访问权限,就仍然可以完成以下大部分步骤。
在主页的侧边栏中,可以访问 Azure Databricks 实体:工作区浏览器、目录资源管理器、工作流和计算。 工作区是存储 Azure Databricks 资产(如笔记本和库)的根文件夹。
步骤 1:使用 Python 创建数据帧
若要了解如何在 Azure Databricks 笔记本中导航,请参阅 Databricks 笔记本界面和控件。
单击 图标打开新笔记本。
将以下代码复制并粘贴到空的笔记本单元格中,然后按
Shift+Enter
运行该单元格。 下面的代码示例将创建一个包含城市人口数据的名为df1
的数据帧,并显示该数据帧的内容。# Create a data frame from the given data data = [[295, "South Bend", "Indiana", "IN", 101190, 112.9]] columns = ["rank", "city", "state", "code", "population", "price"] df1 = spark.createDataFrame(data, schema="rank LONG, city STRING, state STRING, code STRING, population LONG, price DOUBLE") display(df1)
步骤 2:将数据从文件加载到数据帧中
将 /databricks-datasets
目录中更多的城市人口数据添加到 df2
。
若要将 data_geo.csv
文件中的数据载入数据帧 df2
,请将以下代码复制并粘贴到新的空笔记本单元格中。 按 Shift+Enter
运行该单元格。
可以从许多受支持的文件格式加载数据。 以下示例使用 /databricks-datasets
目录(可从大多数工作区访问)中提供的数据集。 参阅示例数据集。
# Create second dataframe from a file
df2 = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
步骤 3:查看数据帧并与之交互
使用以下方法查看城市人口数据帧并与之交互。
合并数据帧
将第一个数据帧 df1
的内容与包含 data_geo.csv
内容的数据帧 df2
合并。
在笔记本中使用以下示例代码创建一个新的数据帧,该数据帧会使用联合操作将一个数据帧的行添加到另一个数据帧:
# Returns a DataFrame that combines the rows of df1 and df2
df = df1.union(df2)
查看数据帧
若要以表格格式查看美国城市数据,请在笔记本单元格中使用 Azure Databricks display()
命令。
display(df)
打印数据帧架构
Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。
在笔记本中使用以下 .printSchema()
方法打印数据帧的架构。 使用生成的元数据与数据帧的内容进行交互。
df.printSchema()
注意
Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。
筛选数据帧中的行
使用 .filter()
或 .where()
来筛选各行,发现数据集中人口最多的五个城市。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。
# Filter rows using .filter()
filtered_df = df.filter(df["rank"] < 6)
display(filtered_df)
# Filter rows using .where()
filtered_df = df.where(df["rank"] < 6)
display(filtered_df)
从数据帧中选择列
使用 select()
方法了解城市位于哪个州。 通过将一个或多个列名传递给 .select()
来选择列,如以下示例中所示:
# Select columns from a DataFrame
select_df = df.select("City", "State")
display(select_df)
创建子集数据帧
创建包含人口最多的十个城市的子集数据帧并显示生成的数据。 在笔记本中使用以下代码,组合 select 和 filter 查询来限制返回的行和列:
# Create a subset DataFrame
subset_df = df.filter(df["rank"] < 11).select("City")
display(subset_df)
步骤 4:保存数据帧
可以将数据帧保存到表,或者将数据帧写入一个或多个文件。
将数据帧保存到表
默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE
表权限。 以下示例将数据帧的内容保存到名为 us_cities
的表中:
# Save DataFrame to a table
df.write.saveAsTable("us_cities")
大多数 Spark 应用程序都以分布式方式处理大型数据集。 Spark 会写出文件目录,而不是单个文件。 Delta Lake 会拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。
将数据帧保存到 JSON 文件
以下示例保存 JSON 文件的目录:
# Write a DataFrame to a directory of files
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
从 JSON 文件读取数据帧
# Read a DataFrame from a JSON file
df3 = spark.read.format("json").json("/tmp/json_data")
display(df3)
其他任务:在 PySpark 中运行 SQL 查询
Spark 数据帧提供以下选项,用于将 SQL 与 Python 合并在一起。 可以在为本教程创建的同一笔记本中运行以下代码。
将列指定为 SQL 查询
使用 selectExpr()
方法可将每一列指定为 SQL 查询,如以下示例中所示:
display(df.selectExpr("`rank`", "upper(city) as big_name"))
导入 expr()
可以从 pyspark.sql.functions
导入 expr()
函数,以便在要指定列的任何位置使用 SQL 语法,如以下示例中所示:
from pyspark.sql.functions import expr
display(df.select("rank", expr("lower(city) as little_name")))
运行任意 SQL 查询
你可以使用 spark.sql()
来运行任意 SQL 查询,如以下示例中所示:
query_df = spark.sql("SELECT * FROM us_cities")
display(query_df)
实现 SQL 查询参数化
可以使用 Python 格式设置来实现 SQL 查询参数化,如以下示例中所示:
# Define the table name
table_name = "us_cities"
# Query the DataFrame using the constructed SQL query
query_df = spark.sql(f"SELECT * FROM {table_name}")
display(query_df)