使用文件接收器进行流式处理:在更改检查点或输出目录的情况下出现恢复问题Streaming with File Sink: Problems with recovery if you change checkpoint or output directories

将数据流式传输到文件接收器时,应始终将检查点和输出目录一起更改。When you stream data into a file sink, you should always change both checkpoint and output directories together. 否则,可能会造成失败或意外的输出。Otherwise, you can get failures or unexpected outputs.

Apache Spark 将在名为 _spark_metadata 的输出目录中创建一个文件夹。Apache Spark creates a folder inside the output directory named _spark_metadata. 此文件夹包含每个批运行的预写日志。This folder contains write-ahead logs for every batch run. 这是在写入文件系统时 Spark 获得“恰好一次”保证的方式。This is how Spark gets exactly-once guarantees when writing to a file system. 此文件夹包含为每个批保存的文件(名为 0,1,2,3 等 + 19.compactn.compact 等)。This folder contains save files for each batch (named 0,1,2,3 etc + 19.compact, n.compact etc). 这些文件包括 JSON,以用于提供有关特定批的输出的详细信息。These files include JSON that gives details about the output for the particular batch. 借助于此数据,一旦某个批已成功,则任何重复的批输出都将被丢弃。With the help of this data, once a batch has succeeded, any duplicate batch output is discarded.

如果更改检查点目录,而不更改输出目录:If you change the checkpoint directory but not the output directory:

更改检查点目录时,流作业将再次从 0 开始批处理。When you change the checkpoint directory, the stream job will start batches again from 0. 由于 0 已存在于 _spark_metadata 文件夹中,因此将丢弃输出文件,即使该文件包含新数据。Since 0 is already present in the _spark_metadata folder, the output file will be discarded even if it has new data. 也就是说,如果上一次运行是在第 500 批停止的,则具有相同输出目录和不同检查点目录的下一次运行将仅提供针对第 501 批的输出。That is, if you stop the previous run on the 500th batch, the next run with same output directory and different checkpoint directory will give output only on the 501st batch. 之前的所有批都将以静默方式丢弃。All of the previous batches will be silently discarded.

如果更改输出目录,而不更改检查点目录:If you change the output directory but not the checkpoint directory:

如果只更改输出目录,则会丢失 _spark_metadata 文件夹中的所有批数据。When you change only the output directory, it loses all of the batch data from the _spark_metadata folder. 但 Spark 将按照检查点目录开始从下一批写入。But Spark starts writing from the next batch according to the checkpoint directory. 例如,如果在第 500 批停止了上一次运行,则新的流作业的第一次写入操作将在 _spark_metadata 中的文件 501 上,并且你将丢失所有旧批。For example, if the previous run was stopped at 500, the first write of the new stream job will be at file 501 on _spark_metadata and you lose all of the old batches. 当读回文件时,会收到 metadata for batch 0(or first compact file (19.compact)) is not found 错误。When you read the files back, you get the error metadata for batch 0(or first compact file (19.compact)) is not found.

no-alternative-textno-alternative-text