使用“恰好一次”事件处理创建 Apache Spark 流式处理作业Create Apache Spark Streaming jobs with exactly-once event processing

系统中发生某种故障后,流处理应用程序会采取不同的方法来应对消息的重新处理:Stream processing applications take different approaches to how they handle re-processing messages after some failure in the system:

  • 至少一次:保证处理每条消息,但消息可能会处理多次。At least once: Each message is guaranteed to be processed, but it may get processed more than once.
  • 最多一次:不一定会处理每条消息。At most once: Each message may or may not be processed. 如果处理某条消息,只会将它处理一次。If a message is processed, it is only processed once.
  • 恰好一次:保证处理每条消息一次且只有一次。Exactly once: Each message is guaranteed to be processed once and only once.

本文介绍如何配置 Spark 流,以实现“恰好一次”处理。This article shows you how to configure Spark Streaming to achieve exactly-once processing.

Apache Spark 流式处理的“恰好一次”语义Exactly-once semantics with Apache Spark Streaming

首先,考虑在出现问题后所有系统故障点如何重启,以及如何避免数据丢失。First, consider how all system points of failure restart after having an issue, and how you can avoid data loss. Spark 流应用程序包含:A Spark Streaming application has:

  • 一个输入源An input source
  • 从输入源提取数据的一个或多个接收器进程One or more receiver processes that pull data from the input source
  • 处理数据的任务Tasks that process the data
  • 一个输出接收器An output sink
  • 一个管理长时间运行的作业的驱动程序进程A driver process that manages the long-running job

“恰好一次”语义要求数据在任何时间点都不会丢失,并且不管故障发生在哪个位置,消息处理都可重启。Exactly-once semantics require that no data is lost at any point, and that message processing is restartable, regardless of where the failure occurs.

可重播源Replayable sources

Spark 流应用程序从中读取事件的源必须可重播。The source your Spark Streaming application is reading your events from must be replayable. 这意味着,如果已检测到消息,但在持久保存或处理该消息之前系统发生故障,则源必须再次提供同一消息。This means that in cases where the message was retrieved but then the system failed before the message could be persisted or processed, the source must provide the same message again.

在 Azure 中,Azure 事件中心和 HDInsight 上的 Apache Kafka 提供可重播源。In Azure, both Azure Event Hubs and Apache Kafka on HDInsight provide replayable sources. 可重播源的另一个示例是 Apache Hadoop HDFS 等容错的文件系统、Azure 存储 Blob 或 Azure Data Lake Storage,其中的所有数据将永久保留,随时都可以重新读取整个数据。Another example of a replayable source is a fault-tolerant file system like Apache Hadoop HDFS, Azure Storage blobs, or Azure Data Lake Storage, where all data is kept forever and at any point you can re-read the data in its entirety.

可靠的接收器Reliable receivers

在 Spark 流中,事件中心和 Kafka 等源具有可靠的接收器,其中的每个接收器可跟踪它在读取源时的进度。In Spark Streaming, sources like Event Hubs and Kafka have reliable receivers, where each receiver keeps track of its progress reading the source. 可靠的接收器将其状态持久保存在以下容错存储中:Apache ZooKeeper,或者写入 HDFS 的 Spark 流检查点。A reliable receiver persists its state into fault-tolerant storage, either within Apache ZooKeeper or in Spark Streaming checkpoints written to HDFS. 如果此类接收器发生故障并随后重启,它可以从上次中断的位置继续拾取数据。If such a receiver fails and is later restarted, it can pick up where it left off.

使用预写日志Use the Write-Ahead Log

Spark 流支持使用预写日志,其中每个收到的事件首先写入容错存储中的 Spark 检查点目录,然后存储在弹性分布式数据集 (RDD) 中。Spark Streaming supports the use of a Write-Ahead Log, where each received event is first written to Spark's checkpoint directory in fault-tolerant storage and then stored in a Resilient Distributed Dataset (RDD). 在 Azure 中,容错存储是由 Azure 存储或 Azure Data Lake Storage 提供支持的 HDFS。In Azure, the fault-tolerant storage is HDFS backed by either Azure Storage or Azure Data Lake Storage. 在 Spark 流应用程序中,可以通过将 spark.streaming.receiver.writeAheadLog.enable 配置设置指定为 true,为所有接收器启用预写日志。In your Spark Streaming application, the Write-Ahead Log is enabled for all receivers by setting the spark.streaming.receiver.writeAheadLog.enable configuration setting to true. 预写日志针对驱动程序和执行器的故障提供容错。The Write-Ahead Log provides fault tolerance for failures of both the driver and the executors.

对于针对事件数据运行任务的辅助角色,将会根据定义在多个辅助角色之间复制和分配每个 RDD。For workers running tasks against the event data, each RDD is by definition both replicated and distributed across multiple workers. 如果由于运行任务的辅助角色崩溃而导致该任务失败,则会在拥有事件数据副本的另一个辅助角色上重启该任务,因此事件不会丢失。If a task fails because the worker running it crashed, the task will be restarted on another worker that has a replica of the event data, so the event is not lost.

使用驱动程序的检查点Use checkpoints for drivers

作业驱动程序需要可重启。The job drivers need to be restartable. 如果运行 Spark 流应用程序的驱动程序崩溃,该驱动程序及其运行的所有接收器、任务和存储事件数据的任何 RDD 都会关闭。If the driver running your Spark Streaming application crashes, it takes down with it all running receivers, tasks, and any RDDs storing event data. 在这种情况下,需要能够保存作业的进度,以便稍后可以恢复。In this case, you need to be able to save the progress of the job so you can resume it later. 为此,可以在容错存储中定期创建 DStream 的有向无环图 (DAG) 检查点。This is accomplished by checkpointing the Directed Acyclic Graph (DAG) of the DStream periodically to fault-tolerant storage. DAG 元数据包括用于创建流应用程序的配置、肜于定义应用程序的操作,以及已排队但尚未完成的任何批。The DAG metadata includes the configuration used to create the streaming application, the operations that define the application, and any batches that are queued but not yet completed. 此元数据可让发生故障的驱动程序根据检查点信息重启。This metadata enables a failed driver to be restarted from the checkpoint information. 驱动程序重启时,会启动新的接收器,这些接收器可自行通过预写日志将事件数据恢复到 RDD。When the driver restarts, it will launch new receivers that themselves recover the event data back into RDDs from the Write-Ahead Log.

可以执行两个步骤在 Spark 流中启用检查点。Checkpoints are enabled in Spark Streaming in two steps.

  1. 在 StreamingContext 对象中配置检查点的存储路径:In the StreamingContext object, configure the storage path for the checkpoints:

    val ssc = new StreamingContext(spark, Seconds(1))
    ssc.checkpoint("/path/to/checkpoints")
    

    在 HDInsight 中,应将这些检查点保存到群集上附加的默认存储:Azure 存储或 Azure Data Lake Storage。In HDInsight, these checkpoints should be saved to the default storage attached to your cluster, either Azure Storage or Azure Data Lake Storage.

  2. 接下来,在 DStream 上指定检查点间隔(以秒为单位)。Next, specify a checkpoint interval (in seconds) on the DStream. 在每个间隔内,会将派生自输入事件的状态数据持久保存到存储。At each interval, state data derived from the input event is persisted to storage. 在从源事件重新生成状态时,持久保存的状态数据可以减少所需的计算。Persisted state data can reduce the computation needed when rebuilding the state from the source event.

    val lines = ssc.socketTextStream("hostname", 9999)
    lines.checkpoint(30)
    ssc.start()
    ssc.awaitTermination()
    

使用幂等接收器Use idempotent sinks

作业将结果写入到的目标接收器必须能够处理多次提供相同结果的情况。The destination sink to which your job writes results must be able to handle the situation where it is given the same result more than once. 该接收器必须能够检测此类重复结果并将其忽略。The sink must be able to detect such duplicate results and ignore them. 可以使用相同的数据多次调用幂等接收器,而不会更改状态。An idempotent sink can be called multiple times with the same data with no change of state.

若要创建幂等接收器,可以实现相应的逻辑来首先检查数据存储中是否存在传入的结果。You can create idempotent sinks by implementing logic that first checks for the existence of the incoming result in the datastore. 如果已存在该结果,则从 Spark 作业的角度看,写入操作看上去已成功,但实际上数据存储忽略了重复的数据。If the result already exists, the write should appear to succeed from the perspective of your Spark job, but in reality your data store ignored the duplicate data. 如果结果不存在,则接收器应将此新结果插入到其存储中。If the result does not exist, then the sink should insert this new result into its storage.

例如,可以在 Azure SQL 数据库中使用存储过程,将事件插入表中。For example, you could use a stored procedure with Azure SQL Database that inserts events into a table. 此存储过程先根据键字段查找事件,仅当未找到匹配的事件时,才将记录插入到表中。This stored procedure first looks up the event by key fields, and only when no matching event found is the record inserted into the table.

另一个示例是使用分区文件系统,例如 Azure 存储 Blob 或 Azure Data Lake Storage。Another example is to use a partitioned file system, like Azure Storage blobs or Azure Data Lake Storage. 在这种情况下,接收器逻辑不需要检查文件是否存在。In this case your sink logic does not need to check for the existence of a file. 如果表示事件的文件存在,只需使用相同的数据将其覆盖。If the file representing the event exists, it is simply overwritten with the same data. 否则,需要在计算的路径中创建一个新文件。Otherwise, a new file is created at the computed path.

后续步骤Next steps