常见数据加载模式

自动加载程序简化了许多常见的数据引入任务。 此快速参考提供了几种流行模式的示例。

使用 glob 模式筛选目录或文件

在路径中提供 Glob 模式时,可用于筛选目录和文件。

模式 说明
? 匹配任何单一字符
* 与零个或多个字符匹配
[abc] 匹配字符集中的单个字符 {a,b,c}。
[a-z] 匹配字符范围 {a...z} 中的单个字符。
[^a] 匹配不是来自字符集或范围 {a} 的单个字符。 请注意,^ 字符必须立即出现在左括号的右侧。
{ab,cd} 匹配字符串集 {ab, cd} 中的字符串。
{ab,c{de, fh}} 匹配字符串集 {ab, cde, cfh} 中的字符串。

使用 path 来提供前缀模式,例如:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", <format>) \
  .schema(schema) \
  .load("<base-path>/*/files")

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", <format>)
  .schema(schema)
  .load("<base-path>/*/files")

重要

你需要使用 pathGlobFilter 选项来显式提供后缀模式。 path 仅提供前缀筛选器。

例如,如果只想分析包含采用不同后缀的文件的目录中的 png 文件,则可以执行以下操作:

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .option("pathGlobfilter", "*.png") \
  .load(<base-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .option("pathGlobfilter", "*.png")
  .load(<base-path>)

注意

自动加载程序的默认通配行为与其他 Spark 文件源的默认行为不同。 将 .option("cloudFiles.useStrictGlobber", "true") 添加到读取中,以使用将默认 Spark 行为与文件源匹配的通配。 有关通配的详细信息,请参见下表:

模式 文件路径 默认通配符 严格的通配符
/a/b /a/b/c/file.txt
/a/b /a/b_dir/c/file.txt
/a/b /a/b.txt
/a/b/ /a/b.txt
/a/*/c/ /a/b/c/file.txt
/a/*/c/ /a/b/c/d/file.txt
/a/*/c/ /a/b/x/y/c/file.txt
/a/*/c /a/b/c_file.txt
/a/*/c/ /a/b/c_file.txt
/a/*/c/ /a/*/cookie/file.txt
/a/b* /a/b.txt
/a/b* /a/b/file.txt
/a/{0.txt,1.txt} /a/0.txt
/a/*/{0.txt,1.txt} /a/0.txt
/a/b/[cde-h]/i/ /a/b/c/i/file.txt

启用简易 ETL

将数据引入 Delta Lake 而不丢失任何数据的一种简单方法是使用以下模式并在自动加载程序中启用架构推理。 Databricks 建议为此在某个 Azure Databricks 作业中运行以下代码,以便在源数据架构发生更改时自动重启流。 默认情况下,架构将推理为字符串类型,所有分析错误(如果所有内容保留为字符串,则不应会出现任何错误)将进入 _rescued_data,任何新列将导致流失败并使架构演变。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path-to-schema-location>")
  .load("<path-to-source-data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

防止结构良好的数据丢失

如果你知道自己的架构,但希望每次收到意外数据时知道这一情况,则 Databricks 建议使用 rescuedDataColumn

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

如果你希望在引入了与架构不匹配的新字段时流停止处理,可以添加:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

启用灵活的半结构化数据管道

当你从供应商收到了数据,而这些数据会将新列引入到他们提供的信息时,你可能不知道他们何时提供了信息,或者没有带宽来更新数据管道。 你现在可以利用架构演变来重启流,并让自动加载程序自动更新推理的架构。 对于供应商可能提供的一些“无架构”字段,还可以利用 schemaHints

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

转换嵌套的 JSON 数据

由于自动加载程序将顶级 JSON 列推断为字符串,因此可能会留下需要进一步转换的嵌套 JSON 对象。 可以使用半结构化数据访问 API 进一步转换复杂的 JSON 内容。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .load("<source-data-with-nested-json>") \
  .selectExpr(
    "*",
    "tags:page.name",    # extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int", # extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"     # extracts {"tags":{"eventType":...}}
  )

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<source-data-with-nested-json>")
  .selectExpr(
    "*",
    "tags:page.name",     // extracts {"tags":{"page":{"name":...}}}
    "tags:page.id::int",  // extracts {"tags":{"page":{"id":...}}} and casts to int
    "tags:eventType"      // extracts {"tags":{"eventType":...}}
  )

推断嵌套的 JSON 数据

如果你有嵌套数据,可以使用 cloudFiles.inferColumnTypes 选项推断数据和其他列类型的嵌套结构。

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>") \
  .option("cloudFiles.inferColumnTypes", "true") \
  .load("<source-data-with-nested-json>")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .option("cloudFiles.inferColumnTypes", "true")
  .load("<source-data-with-nested-json>")

加载不带标头的 CSV 文件

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

使用标头在 CSV 文件上强制实施架构

Python

df = spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option("rescuedDataColumn", "_rescued_data") \ # makes sure that you don't lose data
  .schema(<schema>) \ # provide a schema here for the files
  .load(<path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "csv")
  .option("header", "true")
  .option("rescuedDataColumn", "_rescued_data") // makes sure that you don't lose data
  .schema(<schema>) // provide a schema here for the files
  .load(<path>)

将图像或二进制数据引入 Delta Lake for ML

将数据存储在 Delta Lake 中后,就可以对数据运行分布式推理。 请参阅使用 pandas UDF 执行分布式推理

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "binaryFile") \
  .load("<path-to-source-data>") \
  .writeStream \
  .option("checkpointLocation", "<path-to-checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "binaryFile")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

DLT 的自动加载程序语法

增量实时表为自动加载程序提供略有修改的 Python 语法,并为自动加载程序添加了 SQL 支持。

以下示例使用自动加载程序从 CSV 和 JSON 文件创建数据集:

Python

@dlt.table
def customers():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/databricks-datasets/retail-org/customers/")
  )

@dlt.table
def sales_orders_raw():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("/databricks-datasets/retail-org/sales_orders/")
  )

SQL

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv")

CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/sales_orders/", "json")

可以将支持的格式选项用于自动加载程序。 使用 map() 函数,可将选项传递给 cloud_files() 方法。 选项是键值对,其中键和值是字符串。 下面介绍了在 SQL 中使用自动加载程序的语法:

CREATE OR REFRESH STREAMING TABLE <table-name>
AS SELECT *
  FROM cloud_files(
    "<file-path>",
    "<file-format>",
    map(
      "<option-key>", "<option_value",
      "<option-key>", "<option_value",
      ...
    )
  )

下面的示例从带有标头的制表符分隔的 CSV 文件中读取数据:

CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM cloud_files("/databricks-datasets/retail-org/customers/", "csv", map("delimiter", "\t", "header", "true"))

可使用 schema 手动指定格式;必须为不支持架构推理的格式指定 schema

Python

@dlt.table
def wiki_raw():
  return (
    spark.readStream.format("cloudFiles")
      .schema("title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
      .option("cloudFiles.format", "parquet")
      .load("/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet")
  )

SQL

CREATE OR REFRESH STREAMING TABLE wiki_raw
AS SELECT *
  FROM cloud_files(
    "/databricks-datasets/wikipedia-datasets/data-001/en_wikipedia/articles-only-parquet",
    "parquet",
    map("schema", "title STRING, id INT, revisionId INT, revisionTimestamp TIMESTAMP, revisionUsername STRING, revisionUsernameId INT, text STRING")
  )

注意

在使用自动加载程序读取文件时,增量实时表将自动配置和管理模式和检查点目录。 但是,如果你手动配置这些目录中的任何一个,执行完全刷新不会影响已配置目录的内容。 Databricks 建议使用自动配置的目录以避免在处理过程中出现意外的副作用。