Apache Spark 流式处理概述Overview of Apache Spark Streaming

Apache Spark 流式处理在 HDInsight Spark 群集上提供数据流式处理。Apache Spark Streaming provides data stream processing on HDInsight Spark clusters. 同时保证即便发生节点故障,任何输入事件也仅处理一次。With a guarantee that any input event is processed exactly once, even if a node failure occurs. Spark 流是一个长时间运行的作业,接收各种来源(包括 Azure 事件中心)的输入数据。A Spark Stream is a long-running job that receives input data from a wide variety of sources, including Azure Event Hubs. 此外:Azure IoT 中心、Apache Kafka、Apache Flume、Twitter、ZeroMQ、原始 TCP 套接字或来自监视 Apache Hadoop YARN 文件系统的输入数据。Also: Azure IoT Hub, Apache Kafka, Apache Flume, Twitter, ZeroMQ, raw TCP sockets, or from monitoring Apache Hadoop YARN filesystems. 与完全由事件驱动的进程不同,Spark 流以批处理方式将输入数据放入各时段。Unlike a solely event-driven process, a Spark Stream batches input data into time windows. 例如 2 秒的切片,然后使用映射、减少、联接和提取操作转化每批数据。Such as a 2-second slice, and then transforms each batch of data using map, reduce, join, and extract operations. 然后,Spark 流将转换后的数据写入文件系统、数据库、仪表板和控制台。The Spark Stream then writes the transformed data out to filesystems, databases, dashboards, and the console.

使用 HDInsight 和 Spark 流式处理的流处理

Spark 流式处理应用程序必须先等待一会,以收集事件的每个微批处理,才能发送该批处理进行处理。Spark Streaming applications must wait a fraction of a second to collect each micro-batch of events before sending that batch on for processing. 与此相反,事件驱动应用程序会立即处理每个事件。In contrast, an event-driven application processes each event immediately. Spark 流式处理延迟一般为几秒钟。Spark Streaming latency is typically under a few seconds. 微批处理方法的优点是数据处理效率更高和聚合计算更简单。The benefits of the micro-batch approach are more efficient data processing and simpler aggregate calculations.

引入 DStreamIntroducing the DStream

Spark 流式处理使用称为 DStream 的离散流表示传入数据的连续流。Spark Streaming represents a continuous stream of incoming data using a discretized stream called a DStream. 可以基于输入源(例如事件中心或 Kafka)或通过对另一个 DStream 应用转换来创建此 DStream。A DStream can be created from input sources such as Event Hubs or Kafka, or by applying transformations on another DStream.

DStream 可提供基于原始事件数据的抽象层。A DStream provides a layer of abstraction on top of the raw event data.

从单一事件开始,例如已连接调温器的温度读数。Start with a single event, say a temperature reading from a connected thermostat. 此事件到达 Spark 流式处理应用程序后,系统将以可靠方式存储事件,即在多个节点上进行复制。When this event arrives at your Spark Streaming application, the event is stored in a reliable way, where it is replicated on multiple nodes. 此容错功能可确保任何单个节点的故障都不会导致事件丢失。This fault-tolerance ensures that the failure of any single node will not result in the loss of your event. Spark 核心使用将数据分布到群集中的多个节点的数据结构,其中每个节点通常维护其自己的内存中数据,以实现最佳性能。The Spark core uses a data structure that distributes data across multiple nodes in the cluster, where each node generally maintains its own data in-memory for best performance. 此数据结构称为弹性分布式数据集 (RDD)。This data structure is called a resilient distributed dataset (RDD).

每个 RDD 表示在用户定义的时间范围(称为批处理间隔)内收集的事件。Each RDD represents events collected over a user-defined timeframe called the batch interval. 每个批处理间隔后,将生成新的 RDD,其中包含该间隔的所有数据。As each batch interval elapses, a new RDD is produced that contains all the data from that interval. 连续的 RDD 集将被收集到 DStream。The continuous set of RDDs are collected into a DStream. 例如,如果批处理间隔为 1 秒,则 DStream 将每秒发出一个批处理,其中包含一个 RDD(包含该秒期间引入的所有数据)。For example, if the batch interval is one second long, your DStream emits a batch every second containing one RDD that contains all the data ingested during that second. 处理 DStream 时,温度事件将出现在其中一个批处理中。When processing the DStream, the temperature event appears in one of these batches. Spark 流式处理应用程序处理包含事件的批处理并最终作用于每个 RDD 中存储的数据。A Spark Streaming application processes the batches that contain the events and ultimately acts on the data stored in each RDD.

温度事件的示例 DStreamExample DStream with Temperature Events

Spark 流式处理应用程序的结构Structure of a Spark Streaming application

Spark 流式处理应用程序是一个长时间运行的应用程序,从引入源接收数据,应用转换以处理数据,然后将数据推送到一个或多个目标。A Spark Streaming application is a long-running application that receives data from ingest sources, applies transformations to process the data, and then pushes the data out to one or more destinations. Spark 流式处理应用程序的结构包含静态部分和动态部分。The structure of a Spark Streaming application has a static part and a dynamic part. 静态部分定义数据的来源,要对数据执行哪些处理以及结果应发送到何处。The static part defines where the data comes from, what processing to do on the data, and where the results should go. 动态部分无限期运行应用程序,等待停止信号。The dynamic part is running the application indefinitely, waiting for a stop signal.

例如,下面的简单应用程序通过 TCP 套接字接收一行文本,并对每个单词出现的次数进行计数。For example, the following simple application receives a line of text over a TCP socket and counts the number of times each word appears.

定义应用程序Define the application

应用程序逻辑定义有四个步骤:The application logic definition has four steps:

  1. 创建 StreamingContext。Create a StreamingContext.
  2. 从 StreamingContext 创建 DStream。Create a DStream from the StreamingContext.
  3. 将转换应用于 DStream。Apply transformations to the DStream.
  4. 输出结果。Output the results.

此定义是静态的,且在运行该应用程序前没有处理任何数据。This definition is static, and no data is processed until you run the application.

创建 StreamingContextCreate a StreamingContext

从指向群集的 SparkContext 创建 StreamingContext。Create a StreamingContext from the SparkContext that points to your cluster. 创建 StreamingContext 时,指定批处理的大小(以秒为单位),例如:When creating a StreamingContext, you specify the size of the batch in seconds, for example:

import org.apache.spark._
import org.apache.spark.streaming._

val ssc = new StreamingContext(sc, Seconds(1))

创建 DStreamCreate a DStream

使用 StreamingContext 实例,为输入源创建输入 DStream。With the StreamingContext instance, create an input DStream for your input source. 在这种情况下,应用程序会监视默认附加存储中的新文件的外观。In this case, the application is watching for the appearance of new files in the default attached storage.

val lines = ssc.textFileStream("/uploads/Test/")

应用转换Apply transformations

通过对 DStream 应用转换来实现处理。You implement the processing by applying transformations on the DStream. 此应用程序将从文件中一次接收一行文本,将每行拆分成单词。然后使用 map-reduce 模式对每个单词出现的次数进行计数。This application receives one line of text at a time from the file, splits each line into words, and then uses a map-reduce pattern to count the number of times each word appears.

val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

输出结果Output results

通过应用输出操作,将转换结果推送到目标系统。Push the transformation results out to the destination systems by applying output operations. 在本例中,整个计算中的每个运行的结果都将打印到控制台输出中。In this case, the result of each run through the computation is printed in the console output.

wordCounts.print()

运行应用程序Run the application

启动流式处理应用程序并运行,直到收到终止信号。Start the streaming application and run until a termination signal is received.

ssc.start()
ssc.awaitTermination()

有关 Spark 流 API 的详细信息,请参阅 Apache Spark 流式处理编程指南For details on the Spark Stream API, see Apache Spark Streaming Programming Guide.

下面的示例应用程序是自包含的,因此可以在 Jupyter Notebook 内运行它。The following sample application is self-contained, so you can run it inside a Jupyter Notebook. 此示例将在类 DummySource 中创建模拟数据源,该源每五秒输出计数器的值以及当前时间(以毫秒为单位)。This example creates a mock data source in the class DummySource that outputs the value of a counter and the current time in milliseconds every five seconds. 新 StreamingContext 对象的批处理间隔为 30 秒。A new StreamingContext object has a batch interval of 30 seconds. 每创建一个批次时,流式处理应用程序都会检查生成的 RDD。Every time a batch is created, the streaming application examines the RDD produced. 然后将 RDD 转换为 Spark DataFrame 并通过 DataFrame 创建一个临时表。Then converts the RDD to a Spark DataFrame, and creates a temporary table over the DataFrame.

class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {

        /** Start the thread that simulates receiving data */
        def onStart() {
            new Thread("Dummy Source") { override def run() { receive() } }.start()
        }

        def onStop() {  }

        /** Periodically generate a random number from 0 to 9, and the timestamp */
        private def receive() {
            var counter = 0  
            while(!isStopped()) {
            store(Iterator((counter, System.currentTimeMillis)))
            counter += 1
            Thread.sleep(5000)
            }
        }
    }

    // A batch is created every 30 seconds
    val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))

    // Set the active SQLContext so that we can access it statically within the foreachRDD
    org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)

    // Create the stream
    val stream = ssc.receiverStream(new DummySource())

    // Process RDDs in the batch
    stream.foreachRDD { rdd =>

        // Access the SQLContext and create a table called demo_numbers we can query
        val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
        _sqlContext.createDataFrame(rdd).toDF("value", "time")
            .registerTempTable("demo_numbers")
    } 

    // Start the stream processing
    ssc.start()

在启动上面的应用程序以后,等待约 30 秒钟。Wait for about 30 seconds after starting the application above. 然后,你可以定期查询 DataFrame 来查看批处理中存在的当前值集,例如,使用以下 SQL 查询:Then, you can query the DataFrame periodically to see the current set of values present in the batch, for example using this SQL query:

%%sql
SELECT * FROM demo_numbers

生成的输出如下所示:The resulting output looks like the following output:

valuevalue timetime
10 个10 14973144652561497314465256
1111 14973144702721497314470272
1212 14973144752891497314475289
1313 14973144803101497314480310
1414 14973144853271497314485327
1515 14973144903461497314490346

存在六个值,因为 DummySource 每隔 5 秒创建一个值,且应用程序每隔 30 秒发出一个批处理。There are six values, since the DummySource creates a value every 5 seconds and the application emits a batch every 30 seconds.

滑动窗口Sliding windows

若要对某个时间段内的 DStream 执行聚合计算,例如获取最后 2 秒钟内的平均温度,可以使用 Spark 流式处理中包括的 sliding window 操作。To do aggregate calculations on your DStream over some time period, for example to get an average temperature over the last two seconds, use the sliding window operations included with Spark Streaming. 滑动窗口具有一个持续时间(窗口长度)和在期间计算窗口内容的时间间隔(即滑动间隔)。A sliding window has a duration (the window length) and the interval during which the window's contents are evaluated (the slide interval).

滑动窗口可以重叠,例如,可以定义长度为两秒的窗口,每一秒滑动一次。Sliding windows can overlap, for example, you can define a window with a length of two seconds, that slides every one second. 该操作意味着每次执行聚合计算时,窗口将包括上一个窗口最后一秒的数据。This action means every time you do an aggregation calculation, the window will include data from the last one second of the previous window. 以及下一秒的任何新数据。And any new data in the next one second.

温度事件的示例初始窗口

滑动后的温度事件的示例窗口

下面的示例将更新使用 DummySource 的代码,将批处理收集到持续时间为一分钟且滑动为一分钟的窗口中。The following example updates the code that uses the DummySource, to collect the batches into a window with a one-minute duration and a one-minute slide.

class DummySource extends org.apache.spark.streaming.receiver.Receiver[(Int, Long)](org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_2) {

    /** Start the thread that simulates receiving data */
    def onStart() {
        new Thread("Dummy Source") { override def run() { receive() } }.start()
    }

    def onStop() {  }

    /** Periodically generate a random number from 0 to 9, and the timestamp */
    private def receive() {
        var counter = 0  
        while(!isStopped()) {
            store(Iterator((counter, System.currentTimeMillis)))
            counter += 1
            Thread.sleep(5000)
        }
    }
}

// A batch is created every 30 seconds
val ssc = new org.apache.spark.streaming.StreamingContext(spark.sparkContext, org.apache.spark.streaming.Seconds(30))

// Set the active SQLContext so that we can access it statically within the foreachRDD
org.apache.spark.sql.SQLContext.setActive(spark.sqlContext)

// Create the stream
val stream = ssc.receiverStream(new DummySource())

// Process batches in 1 minute windows
stream.window(org.apache.spark.streaming.Minutes(1)).foreachRDD { rdd =>

    // Access the SQLContext and create a table called demo_numbers we can query
    val _sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.sparkContext)
    _sqlContext.createDataFrame(rdd).toDF("value", "time")
    .registerTempTable("demo_numbers")
} 

// Start the stream processing
ssc.start()

第一分钟后,会产生 12 个条目 - 窗口中收集到的两个批处理中各有 6 个条目。After the first minute, there are 12 entries - six entries from each of the two batches collected in the window.

valuevalue timetime
11 14973162941391497316294139
22 14973162991581497316299158
33 14973163041781497316304178
44 14973163092041497316309204
55 14973163142241497316314224
66 14973163192431497316319243
77 14973163242601497316324260
88 14973163292781497316329278
99 14973163342931497316334293
10 个10 14973163393141497316339314
1111 14973163443391497316344339
1212 14973163493611497316349361

Spark 流式传输 API 中可用的滑动窗口函数包括 window、countByWindow、reduceByWindow 和 countByValueAndWindow。The sliding window functions available in the Spark Streaming API include window, countByWindow, reduceByWindow, and countByValueAndWindow. 有关这些函数的详细信息,请参阅 Transformations on DStreams(DStreams 的转换)。For details on these functions, see Transformations on DStreams.

检查点Checkpointing

为提供复原和容错功能,Spark 流式处理依赖检查点,确保即使出现节点故障,流处理仍可以不间断地继续。To deliver resiliency and fault tolerance, Spark Streaming relies on checkpointing to ensure that stream processing can continue uninterrupted, even in the face of node failures. Spark 创建持久存储(Azure 存储或 Data Lake Storage)的检查点。Spark creates checkpoints to durable storage (Azure Storage or Data Lake Storage). 这些检查点存储流式处理应用程序元数据,例如应用程序定义的配置和操作。These checkpoints store streaming application metadata such as the configuration, and the operations defined by the application. 以及已排队但尚未处理的所有批处理。Also, any batches that were queued but not yet processed. 有时,检查点还包括将数据保存在 RDD 中以便更快速地基于由 Spark 托管的 RDD 中存在的内容重新生成数据状态。Sometimes, the checkpoints will also include saving the data in the RDDs to more quickly rebuild the state of the data from what is present in the RDDs managed by Spark.

部署 Spark 流式处理应用程序Deploying Spark Streaming applications

通常将 Spark 流式处理应用程序本地生成到 JAR 文件中。You typically build a Spark Streaming application locally into a JAR file. 然后通过将 JAR 文件复制到默认的附加存储将其部署到 HDInsight 上的 Spark。Then deploy it to Spark on HDInsight by copying the JAR file to the default attached storage. 可以使用 POST 操作,通过群集上可用的 LIVY REST API 启动该应用程序。You can start your application with the LIVY REST APIs available from your cluster using a POST operation. POST 的正文包括提供 JAR 路径的 JSON 文档。The body of the POST includes a JSON document that provides the path to your JAR. 和其 main 方法定义并运行流式处理应用程序的类的名称以及可选的作业资源要求(例如执行器、内存和核心的数量)。And the name of the class whose main method defines and runs the streaming application, and optionally the resource requirements of the job (such as the number of executors, memory, and cores). 以及应用程序代码所需的任何配置设置。Also, any configuration settings your application code requires.

部署 Spark 流式处理应用程序

此外,可以使用 GET 请求针对 LIVY 终结点检查所有应用程序的状态。The status of all applications can also be checked with a GET request against a LIVY endpoint. 最后,可以通过针对 LIVY 终结点发出 DELETE 请求终止正在运行的应用程序。Finally, you can end a running application by issuing a DELETE request against the LIVY endpoint. 有关 LIVY API 的详细信息,请参阅使用 Apache LIVY 执行远程作业For details on the LIVY API, see Remote jobs with Apache LIVY

后续步骤Next steps