Apache Spark 结构化流的概述Overview of Apache Spark Structured Streaming

Apache Spark 结构化流可以实现可缩放、高吞吐量、容错的应用程序来处理数据流。Apache Spark Structured Streaming enables you to implement scalable, high-throughput, fault-tolerant applications for processing data streams. 结构化流构建在 Spark SQL 引擎的基础之上,从 Spark SQL 数据帧和数据集方面改进了构造,因此,我们可以像编写批处理查询一样编写流查询。Structured Streaming is built upon the Spark SQL engine, and improves upon the constructs from Spark SQL Data Frames and Datasets so you can write streaming queries in the same way you would write batch queries.

结构化流应用程序在 HDInsight Spark 群集上运行,可从 Apache Kafka、TCP 套接字(调试时)、Azure 存储或 Azure Data Lake Storage 连接到流数据。Structured Streaming applications run on HDInsight Spark clusters, and connect to streaming data from Apache Kafka, a TCP socket (for debugging purposes), Azure Storage, or Azure Data Lake Storage. 后两个选项依赖于外部存储服务,可用于观察添加到存储中的新文件,以及处理其内容,如同这些文件经过流式处理一样。The latter two options, which rely on external storage services, enable you to watch for new files added into storage and process their contents as if they were streamed.

结构化流创建长时间运行的查询,在此期间,可对输入数据应用操作,例如选择、投影、聚合、开窗,以及将流数据帧与引用数据帧相联接。Structured Streaming creates a long-running query during which you apply operations to the input data, such as selection, projection, aggregation, windowing, and joining the streaming DataFrame with reference DataFrames. 接下来,可以使用自定义代码(例如 SQL 数据库或 Power BI)将结果输出到文件存储(Azure 存储 Blob 或 Data Lake Storage)或任何数据存储。Next, you output the results to file storage (Azure Storage Blobs or Data Lake Storage) or to any datastore by using custom code (such as SQL Database or Power BI). 结构化流可向控制台提供输出以用于本地调试,还可以向内存中表提供输出,以便可以在 HDInsight 中查看用于调试的生成数据。Structured Streaming also provides output to the console for debugging locally, and to an in-memory table so you can see the data generated for debugging in HDInsight.

使用 HDInsight 和 Spark 结构化流进行流处理Stream Processing with HDInsight and Spark Structured Streaming

备注

Spark 结构化流即将取代 Spark 流 (DStreams)。Spark Structured Streaming is replacing Spark Streaming (DStreams). 今后,结构化流会不断得到增强和维护,而 DStreams 只会保留维护模式。Going forward, Structured Streaming will receive enhancements and maintenance, while DStreams will be in maintenance mode only. 在现成支持的源和接收器方面,结构化流的功能目前不如 DStreams 那样全面,因此,在选择适当的 Spark 流处理选项之前,请先评估要求。Structured Streaming is currently not as feature-complete as DStreams for the sources and sinks that it supports out of the box, so evaluate your requirements to choose the appropriate Spark stream processing option.

流即表Streams as tables

Spark 结构化流以表的形式表示数据流,该表的深度受限,即,随着新数据的抵达,该表会不断增大。Spark Structured Streaming represents a stream of data as a table that is unbounded in depth, that is, the table continues to grow as new data arrives. 此输入表由一个长时间运行的查询持续处理,结果将发送到输出表:This input table is continuously processed by a long-running query, and the results sent to an output table:

结构化流的概念

在结构化流中,数据抵达系统后立即被引入输入表中。In Structured Streaming, data arrives at the system and is immediately ingested into an input table. 可以编写针对此输入表执行操作的查询(使用数据帧和数据集 API)。You write queries (using the DataFrame and Dataset APIs) that perform operations against this input table. 查询输出将生成另一个表(结果表)。The query output yields another table, the results table. 结果表包含查询的结果,从中可以抽取外部数据存储(例如关系数据库)的数据。The results table contains the results of your query, from which you draw data for an external datastore, such a relational database. 处理输入表中数据的时间由触发器间隔控制。The timing of when data is processed from the input table is controlled by the trigger interval. 默认情况下,触发器间隔为零,因此,结构化流会在数据抵达时尽快处理数据。By default, the trigger interval is zero, so Structured Streaming tries to process the data as soon as it arrives. 在实践中,这意味着结构化流在处理完前一查询的运行之后,会立即针对所有新收到的数据启动另一个处理运行。In practice, this means that as soon as Structured Streaming is done processing the run of the previous query, it starts another processing run against any newly received data. 可将触发器配置为根据某个间隔运行,以便在基于时间的批中处理流数据。You can configure the trigger to run at an interval, so that the streaming data is processed in time-based batches.

结果表中的数据可以只包含自上次处理查询以来生成的新数据(追加模式),或者,每当生成新数据时,可以完全刷新结果表中的数据,使表中包含自开始执行流查询以来生成的所有输出数据(完整模式)。The data in the results tables may contain only the data that is new since the last time the query was processed (append mode), or the table may be completely refreshed every time there is new data so the table includes all of the output data since the streaming query began (complete mode).

追加模式Append mode

在追加模式下,只有自上次运行查询以来添加到结果表的行才出现在结果表中,并写入外部存储。In append mode, only the rows added to the results table since the last query run are present in the results table and written to external storage. 例如,最简单的查询只是将输入表中的所有数据按原样复制到结果表。For example, the simplest query just copies all data from the input table to the results table unaltered. 在触发器间隔过去之后,将会处理新数据,代表这些新数据的行会显示在结果表中。Each time a trigger interval elapses, the new data is processed and the rows representing that new data appear in the results table.

假设你要处理来自温度传感器(例如恒温器)的遥测数据。Consider a scenario where you are processing telemetry from temperature sensors, such as a thermostat. 假设第一个触发器在时间 00:01 处理了设备 1 的一个事件,该设备的温度读数为 95 度。Assume the first trigger processed one event at time 00:01 for device 1 with a temperature reading of 95 degrees. 在查询的第一个触发器中,只有包含时间 00:01 的行会出现在结果表中。In the first trigger of the query, only the row with time 00:01 appears in the results table. 当另一个事件在时间 00:02 抵达时,唯一的新行是包含时间 00:02 的行,因此,结果表只包含该行。At time 00:02 when another event arrives, the only new row is the row with time 00:02 and so the results table would contain only that one row.

结构化流追加模式

使用追加模式时,查询将应用投影(选择它关注的列)、筛选(只选择与特定条件匹配的行)或联接(使用静态查找表中的数据来扩充数据)。When using append mode, your query would be applying projections (selecting the columns it cares about), filtering (picking only rows that match certain conditions) or joining (augmenting the data with data from a static lookup table). 使用追加模式可以轻松做到只将相关的新数据推送到外部存储。Append mode makes it easy to push only the relevant new data points out to external storage.

完整模式Complete mode

沿用上述情景,但这一次使用的是完整模式。Consider the same scenario, this time using complete mode. 在完整模式下,整个输出表根据触发器间隔刷新,因此,该表不仅包含来自最新触发器运行的数据,而且还包含来自所有运行的数据。In complete mode, the entire output table is refreshed on every trigger so the table includes data not just from the most recent trigger run, but from all runs. 可以使用完整模式将输入表中的数据按原样复制到结果表。You could use complete mode to copy the data unaltered from the input table to the results table. 在每个触发的运行中,新结果行会连同前面的所有行一起显示。On every triggered run, the new result rows appear along with all the previous rows. 输出结果表最终会存储自查询开始以来收集的所有数据,并且最终将会耗尽内存。The output results table will end up storing all of the data collected since the query began, and you would eventually run out of memory. 完整模式适合用于以某种方式汇总传入数据的聚合查询,因此,在每个触发器间隔,将使用新的摘要更新结果表。Complete mode is intended for use with aggregate queries that summarize the incoming data in some way, and so on every trigger the results table is updated with a new summary.

假设到目前为止已经处理了五秒钟的数据,接下来需要处理第六秒钟的数据。Assume so far there are five seconds' worth of data already processed, and it is time to process the data for the sixth second. 输入表包含时间 00:01 和时间 00:03 的事件。The input table has events for time 00:01 and time 00:03. 此示例查询的目的是给出设备每隔五秒的平均温度。The goal of this example query is to give the average temperature of the device every five seconds. 此查询的实现将应用一个聚合,以便提取处于每个 5 秒时间范围内的所有值,计算温度平均值,然后生成一行来显示该间隔内的平均温度。The implementation of this query applies an aggregate that takes all of the values that fall within each 5-second window, averages the temperature, and produces a row for the average temperature over that interval. 在第一个 5 秒时间范围结束时,会生成两个元组:(00:01, 1, 95) 和 (00:03, 1, 98)。At the end of the first 5-second window, there are two tuples: (00:01, 1, 95) and (00:03, 1, 98). 因此,对于时间范围 00:00-00:05,该聚合会生成包含平均温度 96.5 度的元组。So for the window 00:00-00:05 the aggregation produces a tuple with the average temperature of 96.5 degrees. 在下一个 5 秒时间范围内,时间 00:06 处只有一个数据点,因此,生成的平均温度为 98 度。In the next 5-second window, there is only one data point at time 00:06, so the resulting average temperature is 98 degrees. 在时间 00:10 处使用完整模式时,结果表包含时间范围 00:00-00:05 和 00:05 00:10 的行,因为查询会输出所有聚合行,而不仅仅是新行。At time 00:10, using complete mode, the results table has the rows for both windows 00:00-00:05 and 00:05-00:10 because the query outputs all the aggregated rows, not just the new ones. 因此,随着新时间范围的添加,结果表会不断增大。Therefore the results table continues to grow as new windows are added.

结构化流完整模式

使用所有使用完整模式的查询都会导致表无限增大。Not all queries using complete mode will cause the table to grow without bounds. 沿用前面的示例,假设查询不是按时间范围求温度的平均值,而是按设备 ID 求温度的平均值。Consider in the previous example that rather than averaging the temperature by time window, it averaged instead by device ID. 结果表包含固定数量的行(每个设备一个),这些行包含根据设备发送的所有数据点计算得出的平均设备温度。The result table contains a fixed number of rows (one per device) with the average temperature for the device across all data points received from that device. 收到新温度后,结果表将会更新,因此,表中的平均值始终是最新的。As new temperatures are received, the results table is updated so that the averages in the table are always current.

Spark 结构化流应用程序的组件Components of a Spark Structured Streaming application

这个简单的示例查询可以根据一小时时间范围汇总温度读数。A simple example query can summarize the temperature readings by hour-long windows. 在本例中,数据存储在 Azure 存储(附加为 HDInsight 群集的默认存储)中的 JSON 文件内:In this case, the data is stored in JSON files in Azure Storage (attached as the default storage for the HDInsight cluster):

{"time":1469501107,"temp":"95"}
{"time":1469501147,"temp":"95"}
{"time":1469501202,"temp":"95"}
{"time":1469501219,"temp":"95"}
{"time":1469501225,"temp":"95"}

这些 JSON 文件存储在 HDInsight 群集容器下的 temps 子文件夹中。These JSON files are stored in the temps subfolder underneath the HDInsight cluster's container.

定义输入源Define the input source

首先配置一个数据帧,用于描述数据源,以及该源所需的任何设置。First configure a DataFrame that describes the source of the data and any settings required by that source. 此示例从 Azure 存储中的 JSON 文件抽取数据,并在读取时向这些数据应用一个架构。This example draws from the JSON files in Azure Storage and applies a schema to them at read time.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

//Cluster-local path to the folder containing the JSON files
val inputPath = "/temps/" 

//Define the schema of the JSON files as having the "time" of type TimeStamp and the "temp" field of type String
val jsonSchema = new StructType().add("time", TimestampType).add("temp", StringType)

//Create a Streaming DataFrame by calling readStream and configuring it with the schema and path
val streamingInputDF = spark.readStream.schema(jsonSchema).json(inputPath) 

应用查询Apply the query

接下来,针对流数据帧应用包含所需操作的查询。Next, apply a query that contains the desired operations against the Streaming DataFrame. 在本例中,某个聚合会将所有行分组到 1 小时时间范围,然后计算该 1 小时时间范围内的最小、平均和最大温度。In this case, an aggregation groups all the rows into 1-hour windows, and then computes the minimum, average, and maximum temperatures in that 1-hour window.

val streamingAggDF = streamingInputDF.groupBy(window($"time", "1 hour")).agg(min($"temp"), avg($"temp"), max($"temp"))

定义输出接收器Define the output sink

接下来,针对在每个触发器间隔内添加到结果表中的行定义目标。Next, define the destination for the rows that are added to the results table within each trigger interval. 此示例只是将所有行输出到稍后可以使用 SparkSQL 查询的内存中表 tempsThis example just outputs all rows to an in-memory table temps that you can later query with SparkSQL. 完整输出模式可以确保每次都会输出所有时间范围的所有行。Complete output mode ensures that all rows for all windows are output every time.

val streamingOutDF = streamingAggDF.writeStream.format("memory").queryName("temps").outputMode("complete") 

启动查询Start the query

启动流查询,并一直运行到收到终止信号为止。Start the streaming query and run until a termination signal is received.

val query = streamingOutDF.start()  

查看结果View the results

运行查询时,在同一个 SparkSession 中,可以针对存储查询结果的 temps 表运行 SparkSQL 查询。While the query is running, in the same SparkSession, you can run a SparkSQL query against the temps table where the query results are stored.

select * from temps

此查询生成如下所示的结果:This query yields results similar to the following:

windowwindow min(temp)min(temp) avg(temp)avg(temp) max(temp)max(temp)
{u'start': u'2016-07-26T02:00:00.000Z', u'end'...{u'start': u'2016-07-26T02:00:00.000Z', u'end'... 9595 95.23157995.231579 9999
{u'start': u'2016-07-26T03:00:00.000Z', u'end'...{u'start': u'2016-07-26T03:00:00.000Z', u'end'... 9595 96.02304896.023048 9999
{u'start': u'2016-07-26T04:00:00.000Z', u'end'...{u'start': u'2016-07-26T04:00:00.000Z', u'end'... 9595 96.79713396.797133 9999
{u'start': u'2016-07-26T05:00:00.000Z', u'end'...{u'start': u'2016-07-26T05:00:00.000Z', u'end'... 9595 96.98463996.984639 9999
{u'start': u'2016-07-26T06:00:00.000Z', u'end'...{u'start': u'2016-07-26T06:00:00.000Z', u'end'... 9595 97.01474997.014749 9999
{u'start': u'2016-07-26T07:00:00.000Z', u'end'...{u'start': u'2016-07-26T07:00:00.000Z', u'end'... 9595 96.98097196.980971 9999
{u'start': u'2016-07-26T08:00:00.000Z', u'end'...{u'start': u'2016-07-26T08:00:00.000Z', u'end'... 9595 96.96599796.965997 9999

有关 Spark 结构化流 API 及其支持的输入数据源、操作和输出接收器的详细信息,请参阅 Apache Spark 结构化流编程指南For details on the Spark Structured Stream API, along with the input data sources, operations, and output sinks it supports, see Apache Spark Structured Streaming Programming Guide.

检查点和预写日志Checkpointing and write-ahead logs

为提供复原和容错功能,结构化流依赖检查点,确保即使出现节点故障,流处理仍可以不间断地继续。To deliver resiliency and fault tolerance, Structured Streaming relies on checkpointing to ensure that stream processing can continue uninterrupted, even with node failures. 在 HDInsight 中,Spark 会创建持久存储(Azure 存储或 Data Lake Storage)的检查点。In HDInsight, Spark creates checkpoints to durable storage, either Azure Storage or Data Lake Storage. 这些检查点存储有关流查询的进度信息。These checkpoints store the progress information about the streaming query. 此外,结构化流使用预写日志 (WAL)。In addition, Structured Streaming uses a write-ahead log (WAL). WAL 捕获已收到的、但尚未由查询处理的引入数据。The WAL captures ingested data that has been received but not yet processed by a query. 如果发生故障并从 WAL 重新开始处理,从源收到的任何事件不会丢失。If a failure occurs and processing is restarted from the WAL, any events received from the source are not lost.

部署 Spark 流式处理应用程序Deploying Spark Streaming applications

通常在本地将 Spark 流应用程序生成为 JAR 文件,然后通过将此 JAR 文件复制到 HDInsight 群集上附加的默认存储,在 Spark on HDInsight 中部署此应用程序。You typically build a Spark Streaming application locally into a JAR file and then deploy it to Spark on HDInsight by copying the JAR file to the default storage attached to your HDInsight cluster. 可以使用 POST 操作,通过群集上可用的 Apache Livy REST API 启动该应用程序。You can start your application with the Apache Livy REST APIs available from your cluster using a POST operation. POST 的正文包括提供 JAR 路径的 JSON 文档、其 main 方法定义并运行流应用程序的类的名称,可选的作业资源要求(例如执行器、内存和核心的数量)以及应用程序代码所需的任何配置设置。The body of the POST includes a JSON document that provides the path to your JAR, the name of the class whose main method defines and runs the streaming application, and optionally the resource requirements of the job (such as the number of executors, memory and cores), and any configuration settings your application code requires.

部署 Spark 流式处理应用程序

此外,可以使用 GET 请求针对 LIVY 终结点检查所有应用程序的状态。The status of all applications can also be checked with a GET request against a LIVY endpoint. 最后,可以通过针对 LIVY 终结点发出 DELETE 请求终止正在运行的应用程序。Finally, you can terminate a running application by issuing a DELETE request against the LIVY endpoint. 有关 LIVY API 的详细信息,请参阅使用 Apache LIVY 执行远程作业For details on the LIVY API, see Remote jobs with Apache LIVY

后续步骤Next steps