Azure Cosmos DB:在 Azure 平台上实现 lambda 体系结构Azure Cosmos DB: Implement a lambda architecture on the Azure platform

使用 Lambda 体系结构可对大型数据集进行高效的数据处理。Lambda architectures enable efficient data processing of massive data sets. Lambda 体系结构使用批处理、流式处理和服务层,将查询大数据时存在的延迟降到最低。Lambda architectures use batch-processing, stream-processing, and a serving layer to minimize the latency involved in querying big data.

若要在 Azure 上实现 lambda 体系结构,可以结合以下技术来加快实时大数据分析:To implement a lambda architecture on Azure, you can combine the following technologies to accelerate real-time big data analytics:

本文介绍基于原始多层设计的 lambda 体系结构的基础知识,以及可以简化操作的、“经过重建”的 lambda 体系结构的优势。This article describes the fundamentals of a lambda architecture based on the original multi-layer design and the benefits of a "rearchitected" lambda architecture that simplifies operations.

什么是 lambda 体系结构?What is a lambda architecture?

lambda 体系结构是一种通用、可缩放且容错的数据处理体系结构,可以解决批处理和速度延迟方案。A lambda architecture is a generic, scalable, and fault-tolerant data processing architecture to address batch and speed latency scenarios.

显示 lambda 体系结构的示意图

源: http://lambda-architecture.net/Source: http://lambda-architecture.net/

上图根据 http://lambda-architecture.net 中的内容描绘了 lambda 体系结构的基本原理。The basic principles of a lambda architecture are described in the preceding diagram as per http://lambda-architecture.net.

  1. 所有数据同时推送到批处理层和速度层。 All data is pushed into both the batch layer and speed layer.
  2. 批处理层包含主数据集(不可变、仅限追加的原始数据集),并预先计算批处理视图。The batch layer has a master dataset (immutable, append-only set of raw data) and pre-computes the batch views.
  3. 服务层包含快速查询的批处理视图。The serving layer has batch views for fast queries.
  4. 速度层补偿处理时间(针对服务层),只处理最新的数据。The speed layer compensates for processing time (to the serving layer) and deals with recent data only.
  5. 通过合并批处理视图和实时视图中的结果或者单独 ping 每个结果,可以应答所有查询。All queries can be answered by merging results from batch views and real-time views or pinging them individually.

阅读后续内容后,只需使用以下内容即可实现此体系结构:Upon further reading, we will be able to implement this architecture using only the following:

  • Azure Cosmos 容器Azure Cosmos container(s)
  • HDInsight (Apache Spark 2.1) 群集HDInsight (Apache Spark 2.1) cluster
  • Spark 连接器 1.0Spark Connector 1.0

速度层Speed layer

从操作的角度看,既要维护两个数据流,又要确保数据状态正确,可能是一项复杂的任务。From an operations perspective, maintaining two streams of data while ensuring the correct state of the data can be a complicated endeavor. 为了简化操作,可以利用 Azure Cosmos DB 更改源支持来保留批处理层的状态,同时通过速度层的更改源 API 来展示 Azure Cosmos DB 更改日志。 To simplify operations, utilize the Azure Cosmos DB change feed support to keep the state for the batch layer while revealing the Azure Cosmos DB change log via the Change Feed API for your speed layer.
突出显示 lambda 体系结构的新数据、速度层和主数据集部分的示意图Diagram highlighting the new data, speed layer, and master dataset portion of the lambda architecture

这些层中的要点包括:What's important in these layers:

  1. 所有数据只会推送到 Azure Cosmos DB,因此可以避免多重强制转换问题。 All data is pushed only into Azure Cosmos DB, thus you can avoid multi-casting issues.
  2. 批处理层包含主数据集(不可变、仅限追加的原始数据集),并预先计算批处理视图。The batch layer has a master dataset (immutable, append-only set of raw data) and pre-computes the batch views.
  3. 下一部分介绍服务层The serving layer is discussed in the next section.
  4. 速度层利用 HDInsight (Apache Spark) 读取 Azure Cosmos DB 更改源。The speed layer utilizes HDInsight (Apache Spark) to read the Azure Cosmos DB change feed. 这样,便可以持久保存数据,同时可并行查询和处理数据。This enables you to persist your data as well as to query and process it concurrently.
  5. 通过合并批处理视图和实时视图中的结果或者单独 ping 每个结果,可以应答所有查询。All queries can be answered by merging results from batch views and real-time views or pinging them individually.

代码示例:流式传输到 Azure Cosmos DB 更改源的 Spark 结构化数据Code Example: Spark structured streaming to an Azure Cosmos DB change feed

若要运行速度层中包含的 Azure Cosmos DB 更改源的快速原型,可以使用使用 Azure Cosmos DB 更改源和 Apache Spark 流处理更改示例中的 Twitter 数据来测试该原型。To run a quick prototype of the Azure Cosmos DB change feed as part of the speed layer, can test it out using Twitter data as part of the Stream Processing Changes using Azure Cosmos DB Change Feed and Apache Spark example. 若要立即开始生成 Twitter 输出,请参阅中将源从 Twitter 流式传输到 Cosmos DB 中的代码示例。To jump-start your Twitter output, see the code sample in Stream feed from Twitter to Cosmos DB. 沿用前面的示例,可将 Twitter 数据载入 Azure Cosmos DB,然后将 HDInsight (Apache Spark) 群集设置为连接到更改源。With the preceding example, you're loading Twitter data into Azure Cosmos DB and you can then set up your HDInsight (Apache Spark) cluster to connect to the change feed. 有关如何设置此配置的详细信息,请参阅 Apache Spark 到 Azure Cosmos DB 连接器的设置For more information on how to set up this configuration, see Apache Spark to Azure Cosmos DB Connector Setup.

以下代码片段演示如何配置 spark-shell 来运行一个结构化流作业,以连接到 Azure Cosmos DB 更改源。该项设置会检查实时 Twitter 数据流,以执行运行间隔计数。The following code snippet shows how to configure spark-shell to run a structured streaming job to connect to an Azure Cosmos DB change feed, which reviews the real-time Twitter data stream, to perform a running interval count.

// Import Libraries
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.codehaus.jackson.map.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.streaming._
import java.time._

// Configure connection to Azure Cosmos DB Change Feed
val sourceConfigMap = Map(
"Endpoint" -> "[COSMOSDB ENDPOINT]",
"Masterkey" -> "[MASTER KEY]",
"Database" -> "[DATABASE]",
"Collection" -> "[COLLECTION]",
"ConnectionMode" -> "Gateway",
"ChangeFeedCheckpointLocation" -> "checkpointlocation",
"changefeedqueryname" -> "Streaming Query from Cosmos DB Change Feed Interval Count")

// Start reading change feed as a stream
var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(sourceConfigMap).load()

// Start streaming query to console sink
val query = streamData.withColumn("countcol", streamData.col("id").substr(0, 0)).groupBy("countcol").count().writeStream.outputMode("complete").format("console").start()

有关完整代码示例,请参阅 azure-cosmosdb-spark/lambda/samples,其中包括:For complete code samples, see azure-cosmosdb-spark/lambda/samples, including:

此代码的输出是一个 spark-shell 控制台,它会不断地运行一个结构化流作业,用于针对 Azure Cosmos DB 更改源中的 Twitter 数据执行间隔计数。The output of this is a spark-shell console, which continuously runs a structured streaming job that performs an interval count against the Twitter data from the Azure Cosmos DB change feed. 下图显示了流作业的输出和间隔计数。The following image shows the output of the stream job and the interval counts.

流的输出显示了针对 Azure Cosmos DB 更改源中的 Twitter 数据执行的间隔计数

有关 Azure Cosmos DB 更改源的详细信息,请参阅:For more information on Azure Cosmos DB change feed, see:

批处理层和服务层Batch and serving layers

由于新数据将载入 Azure Cosmos DB(其中的更改源用于速度层),因此,Azure Cosmos DB 是主数据集(不可变、仅限追加的原始数据集)所在的位置。Since the new data is loaded into Azure Cosmos DB (where the change feed is being used for the speed layer), this is where the master dataset (an immutable, append-only set of raw data) resides. 从载入后开始,可以使用 HDInsight (Apache Spark) 执行从批处理层服务层的预先计算功能,如下图所示:From this point onwards, use HDInsight (Apache Spark) to perform the pre-compute functions from the batch layer to serving layer, as shown in the following image:

突出显示 lambda 体系结构的批处理层和服务层的示意图

这些层中的要点包括:What's important in these layers:

  1. 所有数据只会推送到 Azure Cosmos DB(以避免多重强制转换问题)。All data is pushed only into Azure Cosmos DB (to avoid multi-cast issues).
  2. 批处理层包含 Azure Cosmos DB 中存储的主数据集(不可变、仅限追加的原始数据集)。The batch layer has a master dataset (immutable, append-only set of raw data) stored in Azure Cosmos DB. 使用 HDI Spark 可以预先计算要存储在计算的批处理视图中的聚合。Using HDI Spark, you can pre-compute your aggregations to be stored in your computed batch views.
  3. 服务层是一个 Azure Cosmos 数据库,其中包含主数据集的集合以及计算的批处理视图。The serving layer is an Azure Cosmos database with collections for the master dataset and computed batch view.
  4. 本文稍后将介绍速度层The speed layer is discussed later in this article.
  5. 通过合并批处理视图和实时视图中的结果或者单独 ping 每个结果,可以应答所有查询。All queries can be answered by merging results from the batch views and real-time views, or pinging them individually.

代码示例:预先计算批处理视图Code example: Pre-computing batch views

为了展示如何针对主数据集执行从 Apache Spark 到 Azure Cosmos DB 的预先计算视图,请使用重建的 Lambda 体系结构 - 批处理层重建的 Lambda 体系结构 - 批处理层到服务层 Notebook 中的以下代码片段。To showcase how to execute pre-calculated views against your master dataset from Apache Spark to Azure Cosmos DB, use the following code snippets from the notebooks Lambda Architecture Rearchitected - Batch Layer and Lambda Architecture Rearchitected - Batch to Serving Layer. 此方案使用 Azure Cosmos DB 中存储的 Twitter 数据。In this scenario, use the Twitter data stored in Azure Cosmos DB.

让我们先使用以下 PySpark 代码与 Azure Cosmos DB 中的 Twitter 数据建立配置连接。Let's start by creating the configuration connection to the Twitter data within Azure Cosmos DB using the PySpark code below.

# Configuration to connect to Azure Cosmos DB
tweetsConfig = {
  "Endpoint" : "[Endpoint URL]",
  "Masterkey" : "[Master Key]",
  "Database" : "[Database]",
  "Collection" : "[Collection]", 
  "preferredRegions" : "[Preferred Regions]",
  "SamplingRatio" : "1.0",
  "schema_samplesize" : "200000",
  "query_custom" : "[Cosmos DB SQL Query]"
}

# Create DataFrame
tweets = spark.read.format("com.microsoft.azure.cosmosdb.spark").options(**tweetsConfig).load()

# Create Temp View (to run Spark SQL statements)
tweets.createOrReplaceTempView("tweets")

接下来,运行以下 Spark SQL 语句,确定推文集的前 10 个井号标签。Next, let's run the following Spark SQL statement to determine the top 10 hashtags of the set of tweets. 对于此 Spark SQL 查询,我们将在 Jupyter Notebook 中运行此代码,且不会紧接在此代码片段的后面提供输出条形图。For this Spark SQL query, we're running this in a Jupyter notebook without the output bar chart directly following this code snippet.

%%sql
select hashtags.text, count(distinct id) as tweets
from (
  select 
    explode(hashtags) as hashtags,
    id
  from tweets
) a
group by hashtags.text
order by tweets desc
limit 10

按井号标签显示推文数量的图表

创建查询后,让我们使用 Spark 连接器将它保存回到某个集合,以便将输出数据保存到不同的集合中。Now that you have your query, let's save it back to a collection by using the Spark Connector to save the output data into a different collection. 此示例使用 Scala 来展示连接。In this example, use Scala to showcase the connection. 与在前面的示例中一样,创建配置连接,以将 Apache Spark 数据帧保存到不同的 Azure Cosmos 容器。Similar to the previous example, create the configuration connection to save the Apache Spark DataFrame to a different Azure Cosmos container.

val writeConfigMap = Map(
    "Endpoint" -> "[Endpoint URL]",
    "Masterkey" -> "[Master Key]",
    "Database" -> "[Database]",
    "Collection" -> "[New Collection]", 
    "preferredRegions" -> "[Preferred Regions]",
    "SamplingRatio" -> "1.0",
    "schema_samplesize" -> "200000"
)

// Configuration to write
val writeConfig = Config(writeConfigMap)

指定 SaveMode(指示是要 Overwrite还是 Append 文档)后,创建与前面示例中的 Spark SQL 查询类似的 tweets_bytags 数据帧。After specifying the SaveMode (indicating whether to Overwrite or Append documents), create a tweets_bytags DataFrame similar to the Spark SQL query in the previous example. 创建 tweets_bytags 数据帧后,可以使用 write 方法和前面指定的 writeConfig 将其保存。With the tweets_bytags DataFrame created, you can save it using the write method using the previously specified writeConfig.

// Import SaveMode so you can Overwrite, Append, ErrorIfExists, Ignore
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

// Create new DataFrame of tweets tags
val tweets_bytags = spark.sql("select hashtags.text as hashtags, count(distinct id) as tweets from ( select explode(hashtags) as hashtags, id from tweets ) a group by hashtags.text order by tweets desc")

// Save to Cosmos DB (using Append in this case)
tweets_bytags.write.mode(SaveMode.Overwrite).cosmosDB(writeConfig)

现在,这最后一条语句已将 Spark 数据帧保存到新的 Azure Cosmos 容器;从 lambda 体系结构的角度来看,这就是服务层中的批处理视图This last statement now has saved your Spark DataFrame into a new Azure Cosmos container; from a lambda architecture perspective, this is your batch view within the serving layer.

资源Resources

有关完整代码示例,请参阅 azure-cosmosdb-spark/lambda/samples,其中包括:For complete code samples, see azure-cosmosdb-spark/lambda/samples including:

  • 重建的 Lambda 体系结构 - 批处理层HTML | ipynbLambda Architecture Rearchitected - Batch Layer HTML | ipynb
  • 重建的 Lambda 体系结构 - 批处理层到服务层HTML | ipynbLambda Architecture Rearchitected - Batch to Serving Layer HTML | ipynb

速度层Speed layer

如前所述,使用 Azure Cosmos DB 更改源库可以简化批处理层与速度层之间的操作。As previously noted, using the Azure Cosmos DB Change Feed Library allows you to simplify the operations between the batch and speed layers. 在此体系结构中,使用 Apache Spark(通过 HDInsight)可以针对数据执行结构化流查询。 In this architecture, use Apache Spark (via HDInsight) to perform the structured streaming queries against the data. 此外,还可以暂时保存结构化流查询的结果,使其他系统可以访问此数据。You may also want to temporarily persist the results of your structured streaming queries so other systems can access this data.

突出显示 lambda 体系结构的速度层的示意图

为此,请创建一个独立的 Azure Cosmos 容器,用于保存结构化流查询的结果。To do this, create a separate Azure Cosmos container to save the results of your structured streaming queries. 这样,就可以让其他系统(而不只是 Apache Spark)访问此信息。This allows you to have other systems access this information not just Apache Spark. 另外,使用 Cosmos DB 生存时间 (TTL) 功能,可以配置为在设置的期限后自动删除文档。As well with the Cosmos DB Time-to-Live (TTL) feature, you can configure your documents to be automatically deleted after a set duration. 有关 Azure Cosmos DB TTL 功能的详细信息,请参阅利用生存时间使 Azure Cosmos 容器中的数据自动过期For more information on the Azure Cosmos DB TTL feature, see Expire data in Azure Cosmos containers automatically with time to live

// Import Libraries
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark.config.Config
import org.codehaus.jackson.map.ObjectMapper
import com.microsoft.azure.cosmosdb.spark.streaming._
import java.time._

// Configure connection to Azure Cosmos DB Change Feed
val sourceCollectionName = "[SOURCE COLLECTION NAME]"
val sinkCollectionName = "[SINK COLLECTION NAME]"

val configMap = Map(
"Endpoint" -> "[COSMOSDB ENDPOINT]",
"Masterkey" -> "[COSMOSDB MASTER KEY]",
"Database" -> "[DATABASE NAME]",
"Collection" -> sourceCollectionName,
"ChangeFeedCheckpointLocation" -> "changefeedcheckpointlocation")

val sourceConfigMap = configMap.+(("changefeedqueryname", "Structured Stream replication streaming test"))

// Start to read the stream
var streamData = spark.readStream.format(classOf[CosmosDBSourceProvider].getName).options(sourceConfigMap).load()
val sinkConfigMap = configMap.-("collection").+(("collection", sinkCollectionName))

// Start the stream writer to new collection
val streamingQueryWriter = streamData.writeStream.format(classOf[CosmosDBSinkProvider].getName).outputMode("append").options(sinkConfigMap).option("checkpointLocation", "streamingcheckpointlocation")
var streamingQuery = streamingQueryWriter.start()

Lambda 体系结构:重建Lambda architecture: Rearchitected

如前面的部分中所述,可以使用以下组件来简化原始的 lambda 体系结构:As noted in the previous sections, you can simplify the original lambda architecture by using the following components:

  • Azure Cosmos DBAzure Cosmos DB
  • 使用 Azure Cosmos DB 更改源库来避免对批处理层与速度层之间的数据执行多重强制转换The Azure Cosmos DB Change Feed Library to avoid the need to multi-cast your data between the batch and speed layers
  • HDInsight 上的 Apache SparkApache Spark on HDInsight
  • Azure Cosmos DB 的 Spark 连接器The Spark Connector for Azure Cosmos DB

显示使用 Azure Cosmos DB、Spark 和 Azure Cosmos DB 更改源 API 重建 lambda 体系结构的示意图

使用这种设计只需两个托管服务:Azure Cosmos DB 和 HDInsight。With this design, you only need two managed services, Azure Cosmos DB and HDInsight. 这两个服务共同解决了 lambda 体系结构的批处理层、服务层和速度层需求。Together, they address the batch, serving, and speed layers of the lambda architecture. 这不仅简化了操作,而且也简化了数据流。This simplifies not only the operations but also the data flow.

  1. 所有数据将推送到 Azure Cosmos DB 进行处理All data is pushed into Azure Cosmos DB for processing
  2. 批处理层包含主数据集(不可变、仅限追加的原始数据集),并预先计算批处理视图。The batch layer has a master dataset (immutable, append-only set of raw data) and pre-computes the batch views
  3. 服务层包含快速查询的数据批处理视图。The serving layer has batch views of data for fast queries.
  4. 速度层补偿处理时间(针对服务层),只处理最新的数据。The speed layer compensates for processing time (to the serving layer) and deals with recent data only.
  5. 通过合并批处理视图和实时视图中的结果,可以应答所有查询。All queries can be answered by merging results from batch views and real-time views.

资源Resources

  • 新数据将源从 Twitter 流式传输到 CosmosDB,这是将新数据推送到 Azure Cosmos DB 的机制。New data: The stream feed from Twitter to CosmosDB, which is the mechanism to push new data into Azure Cosmos DB.
  • 批处理层: 批处理层由主数据集(不可变、仅限追加的原始数据集)组成,可以预先计算已推送到服务层的数据的批处理视图。Batch layer: The batch layer is composed of the master dataset (an immutable, append-only set of raw data) and the ability to pre-compute batch views of the data that are pushed into the serving layer.
    • 重建的 Lambda 体系结构 - 批处理层 Notebook ipynb | html 查询批处理视图的主数据集。 The Lambda Architecture Rearchitected - Batch Layer notebook ipynb | html queries the master dataset set of batch views.
  • 服务层: 服务层由预先计算的数据组成,这些数据生成用于快速查询的批处理视图(例如聚合、特定的切片器,等等)。Serving layer: The serving layer is composed of pre-computed data resulting in batch views (for example aggregations, specific slicers, etc.) for fast queries.
    • 重建的 Lambda 体系结构 - 批处理层到服务层 Notebook ipynb | html 将批处理数据推送到服务层;即,Spark 将查询推文的批处理集合、对其进行处理,然后将其存储到另一个集合(计算的批处理)中。The Lambda Architecture Rearchitected - Batch to Serving Layer notebook ipynb | html pushes the batch data to the serving layer; that is, Spark queries a batch collection of tweets, processes it, and stores it into another collection (a computed batch).
      • 速度层: 速度层由利用 Azure Cosmos DB 更改源读取并立即处理数据的 Spark 组成。Speed layer: The speed layer is composed of Spark utilizing the Azure Cosmos DB change feed to read and act on immediately. 还可以将数据保存到计算的 RT 中,使其他系统可以查询已处理的实时数据,而无需自行运行实时查询。 The data can also be saved to computed RT so that other systems can query the processed real-time data as opposed to running a real-time query themselves.
    • Cosmos DB 更改源中的流查询 scala 脚本执行 Azure Cosmos DB 更改源中的流查询,通过 spark-shell 计算间隔计数。The Streaming Query from Cosmos DB Change Feed scala script executes a streaming query from the Azure Cosmos DB change feed to compute an interval count from the spark-shell.
    • Cosmos DB 更改源中的流标记查询 scala 脚本执行 Azure Cosmos DB 更改源中的流查询,通过 spark-shell 计算标记的间隔计数。The Streaming Tags Query from Cosmos DB Change Feed scala script executes a streaming query from the Azure Cosmos DB change feed to compute an interval count of tags from the spark-shell.

后续步骤Next steps

azure-cosmosdb-spark GitHub 存储库下载 Spark 到 Azure Cosmos DB 的连接器(如果尚未下载),并浏览该存储库中的其他资源:If you haven't already, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark GitHub repository and explore the additional resources in the repo:

此外,还可以查看文章 Apache Spark SQL、数据框架和数据集指南以及 Azure HDInsight 上的 Apache SparkYou might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide and the Apache Spark on Azure HDInsight article.