表流读取和写入Table streaming reads and writes

Delta Lake 通过 readStreamwriteStreamSpark 结构化流式处理深度集成。Delta Lake is deeply integrated with Spark Structured Streaming through readStream and writeStream. Delta Lake 克服了通常与流式处理系统和文件相关的许多限制,包括:Delta Lake overcomes many of the limitations typically associated with streaming systems and files, including:

  • 合并低延迟引入生成的小文件Coalescing small files produced by low latency ingest

  • 保持对多个流(或并发批处理作业)执行“仅一次”处理Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs)

  • 使用文件作为流源时,可以有效地发现哪些文件是新文件Efficiently discovering which files are new when using files as the source for a stream

用作流源的 Delta 表 Delta table as a stream source

将 Delta 表作为流源加载并在流式处理查询中使用它时,该查询将处理表中存在的所有数据以及流启动后到达的所有新数据。When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started.

可以将路径和表都作为流加载。You can load both paths and tables as a stream.

spark.readStream.format("delta").load("/mnt/delta/events")

or

spark.readStream.format("delta").table("events")

也可执行以下操作:You can also:

  • 通过设置 maxFilesPerTrigger 选项,控制 Delta Lake 提供以进行流式处理的任何微批处理的最大大小。Control the maximum size of any micro-batch that Delta Lake gives to streaming by setting the maxFilesPerTrigger option. 这会指定每个触发器中要考虑的新文件的最大数量。This specifies the maximum number of new files to be considered in every trigger. 默认值为 1000。The default is 1000.
  • 通过设置 maxBytesPerTrigger 选项来限制每个微批处理中处理多少数据的速率。Rate-limit how much data gets processed in each micro-batch by setting the maxBytesPerTrigger option. 这会设置“软最大值”,即批处理大约可以处理这一数量的数据,并且可能处理超出该限制的数据量。This sets a “soft max,” meaning that a batch processes approximately this amount of data and may process more than the limit. 如果将 Trigger.Once 用于流式处理,此选项将被忽略。If you use Trigger.Once for your streaming, this option is ignored. 如果将此选项与 maxFilesPerTrigger 结合使用,则微批处理将处理数据,直到达到 maxFilesPerTriggermaxBytesPerTrigger 限制。If you use this option in conjunction with maxFilesPerTrigger, the micro-batch processes data until either the maxFilesPerTrigger or maxBytesPerTrigger limit is reached.

忽略更新和删除Ignore updates and deletes

结构化流式处理不处理非追加的输入,并且会在对用作源的表进行了任何修改时引发异常。Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source. 可以通过两种主要策略处理无法自动向下游传播的更改:There are two main strategies for dealing with changes that cannot be automatically propagated downstream:

  • 可以删除输出和检查点,并从头开始重启流。You can delete the output and checkpoint and restart the stream from the beginning.
  • 可以设置以下两个选项之一:You can set either of these two options:
    • ignoreDeletes:忽略在分区边界删除数据的事务。ignoreDeletes: ignore transactions that delete data at partition boundaries.
    • ignoreChanges:如果由于数据更改操作(例如 UPDATEMERGE INTO、分区内的 DELETEOVERWRITE)而不得不在源表中重写文件,则重新处理更新。ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. 未更改的行仍可能发出,因此下游使用者应该能够处理重复项。Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. 删除不会传播到下游。Deletes are not propagated downstream. ignoreChanges 包括 ignoreDeletesignoreChanges subsumes ignoreDeletes. 因此,如果使用 ignoreChanges,则流不会因源表的删除或更新而中断。Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table.

示例Example

例如,假设你有一个表 user_events,其中包含 dateuser_emailaction 列,并按 date 对该表进行了分区。For example, suppose you have a table user_events with date, user_email, and action columns that is partitioned by date. user_events 表向外进行流式处理,由于 GDPR 的原因,需要从中删除数据。You stream out of the user_events table and you need to delete data from it due to GDPR.

在分区边界(即 WHERE 位于分区列上)执行删除操作时,文件已经按值进行了分段,因此删除操作直接从元数据中删除这些文件。When you delete at partition boundaries (that is, the WHERE is on a partition column), the files are already segmented by value so the delete just drops those files from the metadata. 因此,如果只想删除某些分区中的数据,则可以使用:Thus, if you just want to delete data from some partitions, you can use:

events.readStream
  .format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

但是,如果必须基于 user_email 删除数据,则需要使用:However, if you have to delete data based on user_email, then you will need to use:

events.readStream
  .format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

如果使用 UPDATE 语句更新 user_email,则包含相关 user_email 的文件将被重写。If you update a user_email with the UPDATE statement, the file containing the user_email in question is rewritten. 使用 ignoreChanges 时,新记录将与同一文件中的所有其他未更改记录一起传播到下游。When you use ignoreChanges, the new record is propagated downstream with all other unchanged records that were in the same file. 逻辑应该能够处理这些传入的重复记录。Your logic should be able to handle these incoming duplicate records.

指定初始位置Specify initial position

备注

此功能在 Databricks Runtime 7.3 及更高版本上可用。This feature is available on Databricks Runtime 7.3 and above.

可以使用以下选项来指定 Delta Lake 流式处理源的起点,而无需处理整个表。You can use the following options to specify the starting point of the Delta Lake streaming source without processing the entire table.

  • startingVersion:要从其开始的 Delta Lake 版本。startingVersion: The Delta Lake version to start from. 从此版本(含)开始的所有表更改都将由流式处理源读取。All table changes starting from this version (inclusive) will be read by the streaming source. 可以从命令 DESCRIBE HISTORY events 输出的 version 列中获取提交版本。You can obtain the commit versions from the version column of the command DESCRIBE HISTORY events output.
  • startingTimestamp:要从其开始的时间戳。startingTimestamp: The timestamp to start from. 在该时间戳(含)或之后提交的所有表更改都将由流式处理源读取。All table changes committed at or after the timestamp (inclusive) will be read by the streaming source. 它可以是以下任一项:It can be any one of:
    • '2018-10-18T22:15:12.013Z',即可以强制转换为时间戳的字符串'2018-10-18T22:15:12.013Z', that is, a string that can be cast to a timestamp
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',即日期字符串'2018-10-18', that is, a date string
    • 本身就是时间戳或可强制转换为时间戳的任何其他表达式,例如 current_timestamp() - interval 12 hoursdate_sub(current_date(), 1)Any other expression that is or can be cast to a timestamp, such as current_timestamp() - interval 12 hours, date_sub(current_date(), 1).

不能同时设置这两个选项,只需使用其中一个选项即可。You cannot set both options at the same time; you need to use only one of them. 这两个选项仅在启动新的流式处理查询时才生效。They take effect only when starting a new streaming query. 如果流式处理查询已启动且已在其检查点中记录进度,这些选项将被忽略。If a streaming query has started and the progress has been recorded in its checkpoint, these options are ignored.

重要

虽然可以从指定的版本或时间戳启动流式处理源,但流式处理源的架构始终是 Delta 表的最新架构。Although you can start the streaming source from a specified version or timestamp, the schema of the streaming source is always the latest schema of the Delta table. 必须确保在指定版本或时间戳之后,不对 Delta 表进行任何不兼容的架构更改。You must ensure there is no incompatible schema change to the Delta table after the specified version or timestamp. 否则,使用错误的架构读取数据时,流式处理源可能会返回不正确的结果。Otherwise the streaming source may return incorrect results when reading the data with an incorrect schema.

示例Example

例如,假设你有一个表 user_eventsFor example, suppose you have a table user_events. 如果要从版本 5 开始读取更改,可以使用:If you want to read changes since version 5, you can use:

events.readStream
  .format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

如果想了解自 2018 年 10 月 18 日以来进行的更改,可以使用:If you want to read changes since 2018-10-18, you can use:

events.readStream
  .format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

用作接收器的 Delta 表 Delta table as a sink

你也可以使用结构化流式处理将数据写入 Delta 表。You can also write data into a Delta table using Structured Streaming. 即使有针对表并行运行的其他流或批处理查询,Delta Lake 也可通过事务日志确保“仅一次”处理。The transaction log enables Delta Lake to guarantee exactly-once processing, even when there are other streams or batch queries running concurrently against the table.

追加模式 Append mode

默认情况下,流在追加模式下运行,这会将新记录添加到表中。By default, streams run in append mode, which adds new records to the table.

可以使用路径方法:You can use the path method:

PythonPython

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

ScalaScala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

或表方法:or table method:

PythonPython

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

ScalaScala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

完整模式Complete mode

你还可以使用结构化流式处理将整个表替换为每个批。You can also use Structured Streaming to replace the entire table with every batch. 一个示例用例是使用聚合来计算摘要:One example use case is to compute a summary using aggregation:

spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")

上述示例持续更新包含按客户划分的事件总数的表。The preceding example continuously updates a table that contains the aggregate number of events by customer.

对于延迟要求较为宽松的应用程序,可以使用一次性触发器来节省计算资源。For applications with more lenient latency requirements, you can save computing resources with one-time triggers. 使用这些触发器按给定计划更新汇总聚合表,从而仅处理自上次更新以来收到的新数据。Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived since the last update.