本教程介绍如何在 Azure Databricks 中使用 Apache Spark Python (PySpark) 数据帧 API、Apache Spark Scala 数据帧 API 和 SparkR SparkDataFrame API 加载和转换数据。
本教程结束时,你可了解数据帧是什么并熟悉以下任务:
- 定义变量并将公共数据复制到 Unity Catalog 卷
- 使用 Python 创建数据帧
- 将数据从 CSV 文件加载到数据帧
- 查看数据帧并与之交互
- 保存数据帧
- 在 PySpark 中运行 SQL 查询
另请参阅 Apache Spark PySpark API 参考。
- 定义变量并将公共数据复制到 Unity Catalog 卷
- 使用 Scala 创建数据帧
- 将数据从 CSV 文件加载到数据帧
- 查看数据帧并与之交互
- 保存数据帧
- 在 Apache Spark 中运行 SQL 查询
另请参阅 Apache Spark Scala API 参考。
- 定义变量并将公共数据复制到 Unity Catalog 卷
- 创建 SparkR SparkDataFrame
- 将数据从 CSV 文件加载到数据帧
- 查看数据帧并与之交互
- 保存数据帧
- 在 SparkR 中运行 SQL 查询
另请参阅 Apache SparkR API 参考。
数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。
Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。
若要完成以下教程,必须满足以下要求:
若要使用本教程中的示例,必须已为工作区启用 Unity 目录。
本教程中的示例使用 Unity Catalog 卷来存储示例数据。 若要使用这些示例,请创建一个卷,并使用该卷的目录、架构和卷名称来设置示例使用的卷路径。
必须在 Unity Catalog 中具有以下权限:
- 本教程中使用的卷的
READ VOLUME
和WRITE VOLUME
或ALL PRIVILEGES
。 USE SCHEMA
或ALL PRIVILEGES
表示本教程使用的架构。USE CATALOG
或ALL PRIVILEGES
表示本教程使用的目录。
若要设置这些权限,请联系 Databricks 管理员或参阅 Unity Catalog 特权和安全对象。
- 本教程中使用的卷的
提示
有关本文的完整笔记文档,请参阅数据帧教程笔记文档。
此步骤定义要在本教程中使用的变量,然后将包含婴儿姓名数据的 CSV 文件从 health.data.ny.gov 加载到 Unity Catalog 卷。
单击
图标打开新笔记本。 若要了解如何浏览 Azure Databricks 笔记本,请参阅 自定义笔记本外观。
将以下代码复制并粘贴到新的空笔记本单元格中: 将
<catalog-name>
、<schema-name>
和<volume-name>
替换为 Unity Catalog 卷的目录名、模式名和卷名。 请将<table_name>
替换为你选择的表名称。 本教程稍后会将婴儿姓名数据加载到此表中。Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_table = catalog + "." + schema print(path_table) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val downloadUrl = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val fileName = "rows.csv" val tableName = "<table_name>" val pathVolume = s"/Volumes/$catalog/$schema/$volume" val pathTable = s"$catalog.$schema" print(pathVolume) // Show the complete path print(pathTable) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_table <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_table) # Show the complete path
按
Shift+Enter
以运行单元格并创建新的空白单元格。将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用 Databricks dbutuils 命令将
rows.csv
文件从 health.data.ny.gov 复制到 Unity Catalog 卷。Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")
Scala
dbutils.fs.cp(downloadUrl, s"$pathVolume/$fileName")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
此步骤使用测试数据创建名为 df1
的数据帧,然后显示其内容。
将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用测试数据创建数据帧,然后显示数据帧的内容和架构。
Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = as.integer(c(2021)), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = as.integer(c(42)) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
此步骤从之前加载到 Unity Catalog 卷的 CSV 文件中创建名为 df_csv
的数据帧。 请参阅 spark.read.csv。
将以下代码复制并粘贴到新的空笔记本单元格中: 此代码将婴儿姓名数据从 CSV 文件加载到数据帧
df_csv
,然后显示该数据帧的内容。Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val dfCsv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$pathVolume/$fileName") display(dfCsv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
可以从许多受支持的文件格式加载数据。
使用以下方法查看婴儿姓名数据帧并与之交互。
了解如何显示 Apache Spark 数据帧的架构。 Apache Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。
备注
Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用
.printSchema()
方法显示数据帧的架构,以便查看两个数据帧的架构 - 准备合并这两个数据帧。Python
df_csv.printSchema() df1.printSchema()
Scala
dfCsv.printSchema() df1.printSchema()
R
printSchema(df_csv) printSchema(df1)
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何重命名数据帧中的列。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于重命名
df1_csv
数据帧中的列,以匹配df1
数据帧中的相应列。 此代码使用 Apache SparkwithColumnRenamed()
方法。Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name") df_csv.printSchema
Scala
val dfCsvRenamed = dfCsv.withColumnRenamed("First Name", "First_Name") // when modifying a DataFrame in Scala, you must assign it to a new variable dfCsvRenamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name") printSchema(df_csv)
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何创建一个新的数据帧,用于将某个数据帧的行添加到另一个数据帧。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
union()
方法将第一个数据帧df
的内容与数据帧df_csv
合并,后者包含从 CSV 文件加载的婴儿姓名数据。Python
df = df1.union(df_csv) display(df)
Scala
val df = df1.union(dfCsvRenamed) display(df)
R
display(df <- union(df1, df_csv))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
使用 Apache Spark .filter()
或 .where()
方法筛选行,发现数据集中最受欢迎的婴儿姓名。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
.filter()
方法显示数据帧中计数超过 50 的行。Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
.where()
方法显示数据帧中计数超过 50 的行。Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
使用 select()
方法指定要从数据帧返回的列,了解婴儿名字的使用频率。 使用 Apache Spark orderby
和 desc
函数对结果进行排序。
Apache Spark 的 pyspark.sql 模块为 SQL 函数提供支持。 在这些函数中,本教程中使用的函数包括 Apache Spark orderBy()
、desc()
和 expr()
函数。 可以通过将这些函数根据需要导入到会话中来启用它们的使用。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入
desc()
函数,然后使用 Apache Sparkselect()
方法以及 Apache SparkorderBy()
和desc()
函数按降序显示最常用的姓名及其计数。Python
from pyspark.sql.functions import desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何从现有数据帧创建子集数据帧。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
filter
方法创建新的数据帧,以按年份、计数和性别限制数据。 它使用 Apache Sparkselect()
方法来限制列。 它还使用 Apache SparkorderBy()
和desc()
函数按计数对新的数据帧进行排序。Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") === 2009) && (df("Count") > 100) && (df("Sex") === "F")).select("First_Name", "County", "Count").orderBy(desc("Count")) display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count") display(subsetDF)
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何保存数据帧。 可以将数据帧保存到表,或者将数据帧写入一个或多个文件。
默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE
表权限。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用在本教程开始时定义的变量将数据帧的内容保存到表中。
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
Scala
df.write.mode("overwrite").saveAsTable(s"$pathTable" + "." + s"$tableName")
R
saveAsTable(df, paste(path_table, ".", table_name), mode = "overwrite")
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
大多数 Apache Spark 应用程序都以分布式方式处理大型数据集。 Apache Spark 会写出文件目录,而不是单个文件。 Delta Lake 拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于将数据帧保存到 JSON 文件的目录中。
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何使用 Apache Spark spark.read.format()
方法将 JSON 数据从目录读取到数据帧中。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码显示在上一示例中保存的 JSON 文件。
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
Apache Spark 数据帧提供以下选项,用于将 SQL 与 PySpark、Scala 和 R 合并在一起。可以在为本教程创建的同一笔记本中运行以下代码。
了解如何使用 Apache Spark selectExpr()
方法。 这是 select()
方法的变体,它用于接受 SQL 表达式并返回更新的数据帧。 此方法允许使用 SQL 表达式,例如 upper
。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
selectExpr()
方法和 SQLupper
表达式将字符串列转换为大写(并重命名列)。Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何导入并使用 Apache Spark expr()
函数,以在指定列的任何位置使用 SQL 语法。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入
expr()
函数,然后使用 Apache Sparkexpr()
函数和 SQLlower
表达式将字符串列转换为小写(并重命名列)。Python
from pyspark.sql.functions import expr display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name")) # expr() function is not supported in R, selectExpr in SparkR replicates this functionality
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
了解如何使用 Apache Spark spark.sql()
函数运行任意 SQL 查询。
将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark
spark.sql()
函数通过 SQL 语法来查询 SQL 表。Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $pathTable.$tableName"))
R
display(sql(paste("SELECT * FROM", path_table, ".", table_name)))
按
Shift+Enter
以运行单元格,然后移动到下一个单元格。
以下笔记本包含本教程中的示例查询。