使用 R 处理 DataFrame 和表
本文介绍如何使用 R 包(如 SparkR、 sparklyr 和 dplyr)来处理 R data.frame
、Spark DataFrame 和内存中表。
请注意,使用 SparkR、sparklyr 和 dplyr 时,可能会发现可以使用所有这些包完成特定操作,并且可以使用最熟悉的包。 例如,若要运行查询,可以调用函数,例如 SparkR::sql
、sparklyr::sdf_sql
和 dplyr::select
。 在其他情况下可能只需使用其中一到两个包来完成操作,你选择的操作取决于自己具体的使用情况。 例如,调用 sparklyr::sdf_quantile
的方式与调用 dplyr::percentile_approx
的方式略有不同,即使这两个函数都计算分位数。
可以将 SQL 用作 SparkR 和 sparklyr 之间的媒介。 例如,可以使用 SparkR::sql
查询使用 sparklyr 创建的表。 可以使用 sparklyr::sdf_sql
查询使用 SparkR 创建的表。 dplyr
代码在运行之前都会在内存中被转换为 SQL。 另请参阅 API 互操作性和 SQL 转换。
加载 SparkR、sparklyr 和 dplyr
SparkR、sparklyr 和 dplyr 包包含在 Azure Databricks 群集上安装的 Databricks Runtime 中。 因此,在开始调用这些包之前,无需调用通常会使用的 install.package
。 但仍然必须先使用 library
加载这些包。 例如,在 Azure Databricks 工作区的 R 笔记本中,在笔记本单元格中运行以下代码以加载 SparkR、sparklyr 和 dplyr:
library(SparkR)
library(sparklyr)
library(dplyr)
将 sparklyr 连接到群集
加载 sparklyr 后,必须调用 sparklyr::spark_connect
来连接到群集,以便指定 databricks
连接方法。 例如,在笔记本单元中运行以下代码以连接到托管笔记本的群集:
sc <- spark_connect(method = "databricks")
相比之下,Azure Databricks 笔记本已在群集上建立用于 SparkR 的 SparkSession
笔记本,因此在开始调用 SparkR 之前无需调用 SparkR::sparkR.session
。
将 JSON 数据文件上传到工作区
本文中的许多代码示例基于 Azure Databricks 工作区中特定位置的数据,其中包含特定的列名称和数据类型。 此代码示例的数据来自 GitHub 中的一个名为 book.json
的 JSON 文件。 若要获取此文件并将其上传到工作区:
- 转到 GitHub 上的 books.json 文件,并使用文本编辑器将其内容复制到名为
books.json
的文件中(该文件位于本地计算机上)。 - 在 Azure Databricks 工作区边栏中,单击“目录”。
- 单击“创建表”。
- 在“上传文件”选项卡上,将
books.json
文件从本地计算机拖放到“拖放文件以进行上传”框中。 或者选择“单击以浏览”,然后在本地计算机中浏览到books.json
文件。
默认情况下,Azure Databricks 通过路径 /FileStore/tables/books.json
将本地 books.json
文件上传到工作区中的 DBFS 位置。
请勿单击“使用 UI 创建表”或“在笔记本中创建表”。 本文中的代码示例使用 DBFS 位置中上传的 books.json
文件的数据。
将 JSON 数据读取到 DataFrame
使用 sparklyr::spark_read_json
将上传的 JSON 文件读取到 DataFrame 中,指定连接、JSON 文件的路径和数据的内部表表示名称。 对于此示例,必须指定 book.json
文件包含多行。 在此处指定列的架构是可选操作。 否则,在默认情况下,sparklyr 将推理列的架构。 例如,在笔记本单元中运行以下代码,以将上传的 JSON 文件的数据读入名为 jsonDF
的数据帧:
jsonDF <- spark_read_json(
sc = sc,
name = "jsonTable",
path = "/FileStore/tables/books.json",
options = list("multiLine" = TRUE),
columns = c(
author = "character",
country = "character",
imageLink = "character",
language = "character",
link = "character",
pages = "integer",
title = "character",
year = "integer"
)
)
输出数据帧的前几行
可以使用 SparkR::head
、SparkR::show
或 sparklyr::collect
打印 DataFrame 的第一行。 默认情况下,head
会打印前六行。 show
和 collect
打印前 10 行。 例如,在笔记本单元格中运行以下代码以打印名为 jsonDF
的 DataFrame 的第一行:
head(jsonDF)
# Source: spark<?> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Akk… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid Em… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/Ir… images… Arabic "htt… 288 One … 1200
# … with abbreviated variable names ¹imageLink, ²language
show(jsonDF)
# Source: spark<jsonTable> [?? x 8]
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
collect(jsonDF)
# A tibble: 100 × 8
# author country image…¹ langu…² link pages title year
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int>
# 1 Chinua Achebe Nigeria images… English "htt… 209 Thin… 1958
# 2 Hans Christian Andersen Denmark images… Danish "htt… 784 Fair… 1836
# 3 Dante Alighieri Italy images… Italian "htt… 928 The … 1315
# 4 Unknown Sumer and Ak… images… Akkadi… "htt… 160 The … -1700
# 5 Unknown Achaemenid E… images… Hebrew "htt… 176 The … -600
# 6 Unknown India/Iran/I… images… Arabic "htt… 288 One … 1200
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350
# 8 Jane Austen United Kingd… images… English "htt… 226 Prid… 1813
# 9 Honoré de Balzac France images… French "htt… 443 Le P… 1835
# 10 Samuel Beckett Republic of … images… French… "htt… 256 Moll… 1952
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
运行 SQL 查询,对表进行写入和读取
可以使用 dplyr 函数在 DataFrame 上运行 SQL 查询。 例如,在笔记本单元格中运行以下代码,使用 dplyr::group_by
和 dployr::count
从名为 jsonDF
的 DataFrame 中按作者获取计数。 使用 dplyr::arrange
和 dplyr::desc
按计数降序对结果进行排序。 随后打印前 10 行(默认)。
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n))
# Source: spark<?> [?? x 2]
# Ordered by: desc(n)
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Gustave Flaubert 2
# 8 Homer 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with more rows
# ℹ Use `print(n = ...)` to see more rows
然后可以使用 sparklyr::spark_write_table
将结果写入 Azure Databricks 中的表。 例如,在笔记本单元中运行以下代码以重新运行查询,然后将结果写入名为 json_books_agg
的表中:
group_by(jsonDF, author) %>%
count() %>%
arrange(desc(n)) %>%
spark_write_table(
name = "json_books_agg",
mode = "overwrite"
)
若要验证是否已创建表,可以结合使用 sparklyr::sdf_sql
和 SparkR::showDF
来显示表的数据。 例如,在笔记本单元中运行以下代码以在表中查询某个数据帧,然后使用 sparklyr::collect
默认输出该数据帧的前 10 行:
collect(sdf_sql(sc, "SELECT * FROM json_books_agg"))
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
还可以用 sparklyr::spark_read_table
执行类似操作。 例如,在笔记本单元格中运行以下代码,将上文名为 jsonDF
的 DataFrame 查询并汇入到一个 DataFrame 中,然后使用 sparklyr::collect
打印 DataFrame 的前 10 行(默认):
fromTable <- spark_read_table(
sc = sc,
name = "json_books_agg"
)
collect(fromTable)
# A tibble: 82 × 2
# author n
# <chr> <dbl>
# 1 Fyodor Dostoevsky 4
# 2 Unknown 4
# 3 Leo Tolstoy 3
# 4 Franz Kafka 3
# 5 William Shakespeare 3
# 6 William Faulkner 2
# 7 Homer 2
# 8 Gustave Flaubert 2
# 9 Gabriel García Márquez 2
# 10 Thomas Mann 2
# … with 72 more rows
# ℹ Use `print(n = ...)` to see more rows
在数据帧中添加列并计算列值
可以使用 dplyr 函数将列添加到数据帧并计算列的值。
例如,在笔记本单元中运行以下代码,以获取名为 jsonDF
的数据帧的内容。 使用 dplyr::mutate
添加名为 today
的列,并在此新列中填充当前时间戳。 然后将这些内容写入名为 withDate
的新数据帧,并使用 dplyr::collect
默认输出新数据帧的前 10 行。
注意
dplyr::mutate
仅接受符合 Hive 的内置函数(也称为 UDF)和内置聚合函数(也称为 UDAF)的参数。 有关一般信息,请参阅 Hive 函数。 有关本部分中与日期相关的函数的信息,请参阅日期函数。
withDate <- jsonDF %>%
mutate(today = current_timestamp())
collect(withDate)
# A tibble: 100 × 9
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:32:59
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:32:59
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:32:59
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:32:59
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:32:59
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:32:59
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:32:59
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:32:59
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:32:59
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:32:59
# … with 90 more rows, and abbreviated variable names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows
现在请用 dplyr::mutate
向 withDate
DataFrame 的内容添加另外两列。 新的 month
和 year
列包含 today
列中的数字月份和年份。 然后将这些内容写入名为 withMMyyyy
的新数据帧,并结合使用 dplyr::select
和 dplyr::collect
默认输出新数据帧前 10 行的 author
、title
、month
和 year
列:
withMMyyyy <- withDate %>%
mutate(month = month(today),
year = year(today))
collect(select(withMMyyyy, c("author", "title", "month", "year")))
# A tibble: 100 × 4
# author title month year
# <chr> <chr> <int> <int>
# 1 Chinua Achebe Things Fall Apart 9 2022
# 2 Hans Christian Andersen Fairy tales 9 2022
# 3 Dante Alighieri The Divine Comedy 9 2022
# 4 Unknown The Epic Of Gilgamesh 9 2022
# 5 Unknown The Book Of Job 9 2022
# 6 Unknown One Thousand and One Nights 9 2022
# 7 Unknown Njál's Saga 9 2022
# 8 Jane Austen Pride and Prejudice 9 2022
# 9 Honoré de Balzac Le Père Goriot 9 2022
# 10 Samuel Beckett Molloy, Malone Dies, The Unnamable, the … 9 2022
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
现在使用 dplyr::mutate
将额外两列添加到 withMMyyyy
数据帧的内容。 新的 formatted_date
列包含 today
列中的 yyyy-MM-dd
部分,而新的 day
列包含新 formatted_date
列中的数字日期。 然后将这些内容写入名为 withUnixTimestamp
的新数据帧,并结合使用 dplyr::select
和 dplyr::collect
默认输出新数据帧前 10 行的 title
、formatted_date
和 day
列:
withUnixTimestamp <- withMMyyyy %>%
mutate(formatted_date = date_format(today, "yyyy-MM-dd"),
day = dayofmonth(formatted_date))
collect(select(withUnixTimestamp, c("title", "formatted_date", "day")))
# A tibble: 100 × 3
# title formatted_date day
# <chr> <chr> <int>
# 1 Things Fall Apart 2022-09-27 27
# 2 Fairy tales 2022-09-27 27
# 3 The Divine Comedy 2022-09-27 27
# 4 The Epic Of Gilgamesh 2022-09-27 27
# 5 The Book Of Job 2022-09-27 27
# 6 One Thousand and One Nights 2022-09-27 27
# 7 Njál's Saga 2022-09-27 27
# 8 Pride and Prejudice 2022-09-27 27
# 9 Le Père Goriot 2022-09-27 27
# 10 Molloy, Malone Dies, The Unnamable, the trilogy 2022-09-27 27
# … with 90 more rows
# ℹ Use `print(n = ...)` to see more rows
创建临时视图
可以在内存中创建基于现有 DataFrame 的命名临时视图。 例如,在笔记本单元中运行以下代码,以使用 SparkR::createOrReplaceTempView
获取上述名为 jsonTable
的数据帧的内容,并基于这些内容创建名为 timestampTable
的临时视图。 然后,使用 sparklyr::spark_read_table
读取该临时视图的内容。 使用 sparklyr::collect
默认输出临时表的前 10 行:
createOrReplaceTempView(withTimestampDF, viewName = "timestampTable")
spark_read_table(
sc = sc,
name = "timestampTable"
) %>% collect()
# A tibble: 100 × 10
# author country image…¹ langu…² link pages title year today
# <chr> <chr> <chr> <chr> <chr> <int> <chr> <int> <dttm>
# 1 Chinua A… Nigeria images… English "htt… 209 Thin… 1958 2022-09-27 21:11:56
# 2 Hans Chr… Denmark images… Danish "htt… 784 Fair… 1836 2022-09-27 21:11:56
# 3 Dante Al… Italy images… Italian "htt… 928 The … 1315 2022-09-27 21:11:56
# 4 Unknown Sumer … images… Akkadi… "htt… 160 The … -1700 2022-09-27 21:11:56
# 5 Unknown Achaem… images… Hebrew "htt… 176 The … -600 2022-09-27 21:11:56
# 6 Unknown India/… images… Arabic "htt… 288 One … 1200 2022-09-27 21:11:56
# 7 Unknown Iceland images… Old No… "htt… 384 Njál… 1350 2022-09-27 21:11:56
# 8 Jane Aus… United… images… English "htt… 226 Prid… 1813 2022-09-27 21:11:56
# 9 Honoré d… France images… French "htt… 443 Le P… 1835 2022-09-27 21:11:56
# 10 Samuel B… Republ… images… French… "htt… 256 Moll… 1952 2022-09-27 21:11:56
# … with 90 more rows, 1 more variable: month <chr>, and abbreviated variable
# names ¹imageLink, ²language
# ℹ Use `print(n = ...)` to see more rows, and `colnames()` to see all variable names
对 DataFrame 执行统计分析
可以使用 sparklyr 和 dplyr 进行统计分析。
例如创建一个 DataFrame 来运行统计信息。 为此,请在笔记本单元格中运行以下代码,使用 sparklyr::sdf_copy_to
将内置到 R 中的 iris
数据集的内容写入名为 iris
的 DataFrame 中。 使用 sparklyr::sdf_collect
打印临时表的前 10 行(默认):
irisDF <- sdf_copy_to(
sc = sc,
x = iris,
name = "iris",
overwrite = TRUE
)
sdf_collect(irisDF, "row-wise")
# A tibble: 150 × 5
# Sepal_Length Sepal_Width Petal_Length Petal_Width Species
# <dbl> <dbl> <dbl> <dbl> <chr>
# 1 5.1 3.5 1.4 0.2 setosa
# 2 4.9 3 1.4 0.2 setosa
# 3 4.7 3.2 1.3 0.2 setosa
# 4 4.6 3.1 1.5 0.2 setosa
# 5 5 3.6 1.4 0.2 setosa
# 6 5.4 3.9 1.7 0.4 setosa
# 7 4.6 3.4 1.4 0.3 setosa
# 8 5 3.4 1.5 0.2 setosa
# 9 4.4 2.9 1.4 0.2 setosa
# 10 4.9 3.1 1.5 0.1 setosa
# … with 140 more rows
# ℹ Use `print(n = ...)` to see more rows
现在使用 dplyr::group_by
按 Species
列对行进行分组。 使用 dplyr::summarize
和 dplyr::percentile_approx
通过 Species
按 Sepal_Length
列的第 25、50、75、100 分位数计算汇总统计信息。 使用 sparklyr::collect
打印结果:
注意
dplyr::summarize
仅接受符合 Hive 的内置函数(也称为 UDF)和内置聚合函数(也称为 UDAF)的参数。 有关一般信息,请参阅 Hive 函数。 有关 percentile_approx
的信息,请参阅内置聚合函数 (UDAF)。
quantileDF <- irisDF %>%
group_by(Species) %>%
summarize(
quantile_25th = percentile_approx(
Sepal_Length,
0.25
),
quantile_50th = percentile_approx(
Sepal_Length,
0.50
),
quantile_75th = percentile_approx(
Sepal_Length,
0.75
),
quantile_100th = percentile_approx(
Sepal_Length,
1.0
)
)
collect(quantileDF)
# A tibble: 3 × 5
# Species quantile_25th quantile_50th quantile_75th quantile_100th
# <chr> <dbl> <dbl> <dbl> <dbl>
# 1 virginica 6.2 6.5 6.9 7.9
# 2 versicolor 5.6 5.9 6.3 7
# 3 setosa 4.8 5 5.2 5.8
可以使用以下方法计算类似的结果,例如使用 sparklyr::sdf_quantile
:
print(sdf_quantile(
x = irisDF %>%
filter(Species == "virginica"),
column = "Sepal_Length",
probabilities = c(0.25, 0.5, 0.75, 1.0)
))
# 25% 50% 75% 100%
# 6.2 6.5 6.9 7.9