使用 YARN 创建高可用性 Apache Spark 流式处理作业Create high-availability Apache Spark Streaming jobs with YARN

使用 Apache Spark 流式处理可以实现可缩放、高吞吐量、容错的应用程序进行数据流处理。Apache Spark Streaming enables you to implement scalable, high-throughput, fault-tolerant applications for data streams processing. 可将 HDInsight Spark 群集上的 Spark 流式处理应用程序连接到各种数据源,例如 Azure 事件中心、Azure IoT 中心、Apache KafkaApache Flume、Twitter、ZeroMQ、原始 TCP 套接字,或者通过这些应用程序监视 Apache Hadoop HDFS 文件系统中发生的更改。You can connect Spark Streaming applications on a HDInsight Spark cluster to different kinds of data sources, such as Azure Event Hubs, Azure IoT Hub, Apache Kafka, Apache Flume, Twitter, ZeroMQ, raw TCP sockets, or by monitoring the Apache Hadoop HDFS filesystem for changes. Spark 流支持容错,可保证只会处理任意给定的事件一次,即使某个节点发生故障。Spark Streaming supports fault tolerance with the guarantee that any given event is processed exactly once, even with a node failure.

Spark 流创建长时间运行的作业,在此期间,你可以对数据应用转换,然后将结果推送到文件系统、数据库、仪表板和控制台。Spark Streaming creates long-running jobs during which you are able to apply transformations to the data and then push the results out to filesystems, databases, dashboards, and the console. Spark 流处理微批数据,它首先收集定义的时间间隔内的一批事件。Spark Streaming processes micro-batches of data, by first collecting a batch of events over a defined time interval. 接下来,发送该批进行处理和输出。Next, that batch is sent on for processing and output. 批时间间隔通常以几分之一秒定义。Batch time intervals are typically defined in fractions of a second.

Spark Streaming

DStreamsDStreams

Spark 流使用离散流 (DStream) 表示连续的数据流。Spark Streaming represents a continuous stream of data using a discretized stream (DStream). 可以从事件中心或 Kafka 等输入源,或通过将转换应用到另一个 DStream,来创建此 DStream。This DStream can be created from input sources like Event Hubs or Kafka, or by applying transformations on another DStream. 事件抵达 Spark 流应用程序后,将以可靠的方式存储该事件。When an event arrives at your Spark Streaming application, the event is stored in a reliable way. 也就是说,会复制事件数据,使多个节点获取它的副本。That is, the event data is replicated so that multiple nodes have a copy of it. 这可以确保任一节点发生故障不会导致事件丢失。This ensures that the failure of any single node will not result in the loss of your event.

Spark 核心使用弹性分布式数据集 (RDD)。The Spark core uses resilient distributed datasets (RDDs). RDD 将数据分布到群集中的多个节点,其中每个节点通常完全在内存中维护其自身的数据,以实现最佳性能。RDDs distribute data across multiple nodes in the cluster, where each node generally maintains its data completely in-memory for best performance. 每个 RDD 表示在某个批间隔内收集的事件。Each RDD represents events collected over a batch interval. 批间隔时间过后,Spark 流会生成新的 RDD,其中包含该间隔内的所有数据。When the batch interval elapses, Spark Streaming produces a new RDD containing all the data in that interval. 此连续 RDD 集将收集到 DStream 中。This continuous set of RDDs is collected into a DStream. Spark 流应用程序处理每个批的 RDD 中存储的数据。A Spark Streaming application processes the data stored in each batch's RDD.

Spark DStream

Spark 结构化流作业Spark Structured Streaming jobs

Spark 结构化流在 Spark 2.0 中引入,在流式处理结构化数据时用作分析引擎。Spark Structured Streaming was introduced in Spark 2.0 as an analytic engine for use on streaming structured data. Spark 结构化流使用 SparkSQL 批处理引擎 API。Spark Structured Streaming uses the SparkSQL batching engine APIs. 与 Spark 流一样,Spark 结构化流针对连续抵达的数据微批运行自身的计算。As with Spark Streaming, Spark Structured Streaming runs its computations over continuously-arriving micro-batches of data. Spark 结构化流以包含无限行的输入表形式表示数据流。Spark Structured Streaming represents a stream of data as an Input Table with unlimited rows. 也就是说,随着新数据的抵达,输入表会不断增大。That is, the Input Table continues to grow as new data arrives. 此输入表由一个长时间运行的查询持续处理,结果将写出到输出表。This Input Table is continuously processed by a long running query, and the results are written out to an Output Table.

Spark 结构化流

在结构化流中,数据抵达系统后立即被引入输入表中。In Structured Streaming, data arrives at the system and is immediately ingested into the Input Table. 可以编写针对此输入表执行操作的查询。You write queries that perform operations against this Input Table. 查询输出将生成名为“结果表”的另一个表。The query output yields another table, called the Results Table. 结果表包含查询的结果,从中可以抽取要发送到外部数据存储(例如关系数据库)的数据。The Results Table contains results of your query, from which you draw data to send to an external datastore such a relational database. 触发器间隔用于设置处理输入表中的数据的时间。The trigger interval sets the timing for when data is processed from the Input Table. 默认情况下,结构化流会在数据抵达时尽快处理数据。By default, Structured Streaming processes the data as soon as it arrives. 但是,也可以将触发器配置为根据更长的间隔运行,以便在基于时间的批中处理流数据。However, you can also configure the trigger to run on a longer interval, so the streaming data is processed in time-based batches. 每当生成新数据时,可以完全刷新结果表中的数据,使之包含自开始执行流查询以来生成的所有输出数据(完整模式),或者只包含自上次处理查询以来生成的新数据(追加模式)。The data in the Results Table may be completely refreshed each time there is new data so that it includes all of the output data since the streaming query began (complete mode), or it may only contain just the data that is new since the last time the query was processed (append mode).

创建容错的 Spark 流作业Create fault-tolerant Spark Streaming jobs

若要为 Spark 流作业创建高可用性环境,请先编写单个作业的代码,以便在发生故障时能够恢复。To create a highly-available environment for your Spark Streaming jobs, start by coding your individual jobs for recovery in the event of failure. 此类自我恢复作业是容错的。Such self-recovering jobs are fault-tolerant.

RDD 包含多个属性,可帮助创建高可用性且容错的 Spark 流作业:RDDs have several properties that assist highly-available and fault-tolerant Spark Streaming jobs:

  • 在 RDD 中作为 DStream 存储的输入数据批将自动复制到内存中以实现容错。Batches of input data stored in RDDs as a DStream are automatically replicated in memory for fault-tolerance.
  • 工作节点故障导致数据丢失后,可以基于其他工作节点中复制的输入数据重新计算这些数据(前提是这些工作器节点可用)。Data lost because of worker failure can be recomputed from replicated input data on different workers, as long as those worker nodes are available.
  • 在一秒钟即可实现快速故障恢复,因为故障/延迟后的恢复是通过内存中计算发生的。Fast Fault Recovery can occur within one second, as recovery from faults/stragglers happens via computation in memory.

Spark 流的“恰好一次”语义Exactly-once semantics with Spark Streaming

若要创建一个处理每个事件一次(且仅一次)的应用程序,请考虑所有系统故障点在出现问题后如何重启,以及如何避免数据丢失。To create an application that processes each event once (and only once), consider how all system points of failure restart after having an issue, and how you can avoid data loss. “恰好一次”语义要求数据在任何时间点都不会丢失,并且不管故障发生在哪个位置,消息处理都可重启。Exactly-once semantics require that no data is lost at any point, and that message processing is restartable, regardless of where the failure occurs. 请参阅创建支持“恰好一次”事件处理的 Spark 流作业See Create Spark Streaming jobs with exactly-once event processing.

Spark 流式处理和 Apache Hadoop YARNSpark Streaming and Apache Hadoop YARN

在 HDInsight 中,群集工作由 Yet Another Resource Negotiator (YARN) 协调。In HDInsight, cluster work is coordinated by Yet Another Resource Negotiator (YARN). 设计 Spark 流的高可用性涉及到 Spark 流和 YARN 组件方面的技术。Designing high availability for Spark Streaming includes techniques for Spark Streaming, and also for YARN components. 下面显示了使用 YARN 的示例配置。An example configuration using YARN is shown below.

YARN 体系结构

以下部分描述此配置的设计注意事项。The following sections describe design considerations for this configuration.

故障规划Plan for failures

若要创建高可用性的 YARN 配置,应该针对可能发生的执行器或驱动程序故障做好规划。To create a YARN configuration for high-availability, you should plan for a possible executor or driver failure. 某些 Spark 流作业还包括数据保障要求,需要对此进行额外的配置和设置。Some Spark Streaming jobs also include data guarantee requirements that need additional configuration and setup. 例如,流应用程序可能存在零数据丢失业务保障要求,且不能出现宿主流系统或 HDInsight 群集中发生的任何错误。For example, a streaming application may have a business requirement for a zero-data-loss guarantee despite any error that occurs in the hosting streaming system or HDInsight cluster.

如果执行器发生故障,其任务和接收器将由 Spark 自动重启,因此无需进行配置更改。If an executor fails, its tasks and receivers are restarted by Spark automatically, so there is no configuration change needed.

但是,如果驱动程序发生故障,则其所有关联的执行器都会发生故障,并且所有已收到的块和计算结果都会丢失。However, if a driver fails, then all of its associated executors fail, and all received blocks and computation results are lost. 若要在发生驱动程序故障后进行恢复,可以根据创建支持“恰好一次”事件处理的 Spark 流作业使用 DStream 检查点To recover from a driver failure, use DStream checkpointing as described in Create Spark Streaming jobs with exactly-once event processing. DStream 检查点会定期将 DStreams 的有向无环图 (DAG) 保存到 Azure 存储等容错存储中。DStream checkpointing periodically saves the directed acyclic graph (DAG) of DStreams to fault-tolerant storage such as Azure Storage. 检查点可让 Spark 结构化流根据检查点信息重启有故障的驱动程序。Checkpointing allows Spark Structured Streaming to restart the failed driver from the checkpoint information. 重启此驱动程序会启动新的执行器,同时重启接收器。This driver restart launches new executors and also restarts receivers.

使用 DStream 检查点恢复驱动程序:To recover drivers with DStream checkpointing:

  • 使用配置设置 yarn.resourcemanager.am.max-attempts 在 YARN 上配置自动驱动程序重启。Configure automatic driver restart on YARN with the configuration setting yarn.resourcemanager.am.max-attempts.

  • 使用 streamingContext.checkpoint(hdfsDirectory) 在 HDFS 兼容的文件系统中设置检查点目录。Set a checkpoint directory in an HDFS-compatible file system with streamingContext.checkpoint(hdfsDirectory).

  • 重构源代码以使用检查点进行恢复,例如:Restructure source code to use checkpoints for recovery, for example:

        def creatingFunc() : StreamingContext = {
            val context = new StreamingContext(...)
            val lines = KafkaUtils.createStream(...)
            val words = lines.flatMap(...)
            ...
            context.checkpoint(hdfsDir)
        }
    
        val context = StreamingContext.getOrCreate(hdfsDir, creatingFunc)
        context.start()
    
  • 通过使用 sparkConf.set("spark.streaming.receiver.writeAheadLog.enable","true") 启用预写日志 (WAL) 来配置丢失数据的恢复,并使用 StorageLevel.MEMORY_AND_DISK_SER 针对输入 DStreams 禁用内存中复制。Configure lost data recovery by enabling the write-ahead log (WAL) with sparkConf.set("spark.streaming.receiver.writeAheadLog.enable","true"), and disable in-memory replication for input DStreams with StorageLevel.MEMORY_AND_DISK_SER.

总之,使用检查点、WAL 和可靠的接收器,可以实现“至少一次”数据恢复策略:To summarize, using checkpointing + WAL + reliable receivers, you will be able to deliver "at least once" data recovery:

  • 只要收到的数据没有丢失,并且输出是幂等的或事务性的,则会恰好处理数据一次。Exactly once, so long as received data is not lost and the outputs are either idempotent or transactional.
  • 结合新的 Kafka Direct 方法,使用 Kafka 作为复制的日志,而不是使用接收器或 WAL,可以恰好处理数据一次。Exactly once, with the new Kafka Direct approach which uses Kafka as a replicated log, rather than using receivers or WALs.

高可用性的一般考虑因素Typical concerns for high availability

  • 与批处理作业相比,流作业的监视更困难。It is more difficult to monitor streaming jobs than batch jobs. Spark 流作业通常要运行很长时间,而 YARN 只会在作业完成后才聚合日志。Spark Streaming jobs are typically long-running, and YARN doesn't aggregate logs until a job finishes. 在应用程序或 Spark 升级期间,Spark 检查点会丢失,因此,在升级期间需要清除检查点目录。Spark checkpoints are lost during application or Spark upgrades, and you'll need to clear the checkpoint directory during an upgrade.

  • 将 YARN 群集模式配置为即使客户端发生故障也要运行驱动程序。Configure your YARN cluster mode to run drivers even if a client fails. 设置驱动程序自动重启:To set up automatic restart for drivers:

    spark.yarn.maxAppAttempts = 2
    spark.yarn.am.attemptFailuresValidityInterval=1h
    
  • Spark 和 Spark 流 UI 提供一个可配置的指标系统。Spark and the Spark Streaming UI have a configurable metrics system. 还可以使用 Graphite/Grafana 等其他库来下载仪表板指标,例如“处理的记录数”、“驱动程序和执行器上的内存/GC 使用率”、“总延迟时间”、“群集利用率”,等等。You can also use additional libraries, such as Graphite/Grafana to download dashboard metrics such as 'num records processed', 'memory/GC usage on driver & executors', 'total delay', 'utilization of the cluster' and so forth. 在结构化流 2.1 或更高版本中,可以使用 StreamingQueryListener 来收集其他指标。In Structured Streaming version 2.1 or greater, you can use StreamingQueryListener to gather additional metrics.

  • 应将长时间运行的作业分段。You should segment long-running jobs. 将 Spark 流应用程序提交到群集后,必须定义运行作业的 YARN 队列。When a Spark Streaming application is submitted to the cluster, the YARN queue where the job runs must be defined. 可以使用 YARN 容量计划程序将长时间运行的作业提交到单独的队列。You can use a YARN Capacity Scheduler to submit long-running jobs to separate queues.

  • 正常关闭流应用程序。Shut down your streaming application gracefully. 如果偏移量是已知的,并且所有应用程序状态存储在外部,则可以在适当的位置以编程方式停止流应用程序。If your offsets are known, and all application state is stored externally, then you can programmatically stop your streaming application at the appropriate place. 一种方法是使用 Spark 中的“线程挂钩”,每隔 n 秒检查外部标志。One technique is to use "thread hooks" in Spark, by checking for an external flag every n seconds. 也可以在启动应用程序时使用 HDFS 中创建的标记文件,然后在想要停止应用程序时删除该文件。You can also use a marker file that is created on HDFS when starting the application, then removed when you want to stop. 对于标记文件方法,可以在 Spark 应用程序中使用一个单独的线程,用于调用如下所示的代码:For a marker file approach, use a separate thread in your Spark application that calls code similar to this:

    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
    // to be able to recover on restart, store all offsets in an external database
    

后续步骤Next steps