读取和写入 CSV 文件

本文提供使用 Python、Scala、R 和 SQL 通过 Azure Databricks 对 CSV 文件进行读写的示例。

注意

Databricks 建议 SQL 用户使用 read_files 表值函数读取 CSV 文件。 read_files 在 Databricks Runtime 13.1 及更高版本中可用。

还可以使用临时视图。 如果使用 SQL 直接读取 CSV 数据而不使用临时视图或 read_files,则存在以下限制:

选项

可以为 CSV 文件数据源配置几个选项。 有关支持的读取和写入选项,请参阅以下 Apache Spark 参考文章。

使用格式错误的 CSV 记录

使用指定的架构读取 CSV 文件时,文件中的数据可能与架构不匹配。 例如,包含城市名称的字段将不会分析为整数。 结果取决于分析程序运行的模式:

  • PERMISSIVE(默认):对于无法正确分析的字段,插入 null
  • DROPMALFORMED:删除包含无法分析的字段的行
  • FAILFAST:如果发现任何格式错误的数据,则中止读取

若要设置模式,请使用 mode 选项。

diamonds_df = (spark.read
  .format("csv")
  .option("mode", "PERMISSIVE")
  .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
)

PERMISSIVE 模式下,可以使用以下方法之一检查无法正确分析的行:

  • 你可以提供指向选项 badRecordsPath 的自定义路径,以将损坏的记录记录到文件中。
  • 你可以将列 _corrupt_record 添加到提供给 DataFrameReader 的架构中,以查看生成的 DataFrame 中的损坏的记录。

注意

badRecordsPath 选项优先于 _corrupt_record,这意味着写入所提供路径的格式错误的行不会在生成的 DataFrame 中显示。

使用补救数据列时,格式错误的记录的默认行为会发生变化。

查找格式错误的行的笔记本

获取笔记本

补救数据列

注意

Databricks Runtime 8.3(不受支持)及更高版本支持此功能。

使用 PERMISSIVE 模式时,可以启用补救的数据列来捕获任何未分析的数据,因为记录中的一个或多个字段存在以下问题之一:

  • 不存在于提供的架构中。
  • 与提供的架构的数据类型不匹配。
  • 与提供的架构中的字段名称大小写不匹配。

补救数据列以 JSON 文档形式返回,其中包含已补救的列和记录的源文件路径。 若要从补救的数据列中删除源文件路径,可以设置 SQL 配置 spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")。 你可以通过在读取数据时将选项 rescuedDataColumn 设置为某个列名来启用补救的数据列,例如,spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load(<path>)_rescued_data

分析记录时,CSV 分析器支持三种模式:PERMISSIVEDROPMALFORMEDFAILFAST。 与 rescuedDataColumn 一起使用时,数据类型不匹配不会导致在 DROPMALFORMED 模式下删除记录,或者在 FAILFAST 模式下引发错误。 只有损坏的记录(即不完整或格式错误的 CSV)会被删除或引发错误。

PERMISSIVE 模式下使用 rescuedDataColumn 时,以下规则适用于损坏的记录

  • 文件的第一行(标题行或数据行)设置预期的行长度。
  • 具有不同列数的行视为不完整。
  • 数据类型不匹配不视为记录受损。
  • 只有不完整和格式错误的 CSV 记录会视为损坏并记录到 _corrupt_record 列或 badRecordsPath

SQL 示例:读取 CSV 文件

以下 SQL 示例使用 read_files 读取 CSV 文件。

-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
  's3://<bucket>/<path>/<file>.csv',
  format => 'csv',
  header => true,
  mode => 'FAILFAST')

Scala、R 和 Python 示例:读取 CSV 文件

以下笔记本演示如何使用 Scala、R 和 Python 读取文件、显示示例数据和打印数据架构。 本节的示例使用钻石数据集。 指定数据集的路径以及所需的任何选项。

读取 CSV 文件的笔记本

获取笔记本

示例:指定架构

当 CSV 文件的架构已知时,可以用 schema 选项向 CSV 读取器指定所需的架构。

使用架构读取 CSV 文件的笔记本

获取笔记本

使用 read_files 的 SQL 示例:

SELECT * FROM read_files(
  's3://<bucket>/<path>/<file>.csv',
  format => 'csv',
  header => false,
  schema => 'id string, date date, event_time timestamp')

示例:读取列子集的错误

CSV 分析程序的行为取决于读取的列集。 如果指定的架构不正确,则结果会有很大差异,具体取决于访问的列的子集。 以下笔记本提供最常见的错误。

读取 CSV 文件的列子集的注意事项的笔记本

获取笔记本