扩展 Azure 流分析作业以增加吞吐量Scale an Azure Stream Analytics job to increase throughput

本文介绍如何优化流分析查询,增加流分析作业的吞吐量。This article shows you how to tune a Stream Analytics query to increase throughput for Streaming Analytics jobs. 可以使用以下指南来扩展作业,以便处理较高负载并充分利用更多的系统资源(如更多带宽、更多 CPU 资源、更多内存)。You can use the following guide to scale your job to handle higher load and take advantage of more system resources (such as more bandwidth, more CPU resources, more memory). 作为先决条件,你可能需要阅读以下文章:As a prerequisite, you may need to read the following articles:

案例 1 - 在各个输入分区中,查询本质上是完全可并行的Case 1 - Your query is inherently fully parallelizable across input partitions

如果在各个输入分区中,查询本质上是完全可并行的,则可以按照以下步骤操作:If your query is inherently fully parallelizable across input partitions, you can follow the following steps:

  1. 通过使用 PARTITION BY 关键字来创作查询使之易并行。Author your query to be embarrassingly parallel by using PARTITION BY keyword. 请参阅此页易并行作业部分中的更多详细信息。See more details in the Embarrassingly parallel jobs section on this page.
  2. 根据查询中使用的输出类型,某些输出可能是不可并行的,或者需要进一步配置来实现易并行。Depending on output types used in your query, some output may either be not parallelizable, or need further configuration to be embarrassingly parallel. 例如,PowerBI 输出不可并行。For example, PowerBI output is not parallelizable. 请始终先合并输出,然后再将其发送到输出接收器。Outputs are always merged before sending to the output sink. Blob、表、ADLS、服务总线和 Azure Function 会自动并行化。Blobs, Tables, ADLS, Service Bus, and Azure Function are automatically parallelized. SQL 和 SQL DW 输出具有并行化选项。SQL and SQL DW outputs have an option for parallelization. 事件中心需要将 PartitionKey 配置设置为与 PARTITION BY 字段(通常是 PartitionId)匹配。Event Hub needs to have the PartitionKey configuration set to match with the PARTITION BY field (usually PartitionId). 对于事件中心,还要格外注意匹配所有输入和所有输出的分区数量,以避免分区之间的交叉。For Event Hub, also pay extra attention to match the number of partitions for all inputs and all outputs to avoid cross-over between partitions.
  3. 使用 6 SU(即单个计算节点的全部容量)来运行查询,以度量最大可实现的吞吐量,如果你使用的是 GROUP BY,则度量作业能处理的组数(基数)。Run your query with 6 SU (which is the full capacity of a single computing node) to measure maximum achievable throughput, and if you are using GROUP BY, measure how many groups (cardinality) the job can handle. 达到系统资源限制的作业,一般症状将如下所示。General symptoms of the job hitting system resource limits are the following.
    • SU 利用率指标超过 80%。SU % utilization metric is over 80%. 该指示内存使用率较高。This indicates memory usage is high. 此处描述了导致此指标增加的因素。The factors contributing to the increase of this metric are described here.
    • 输出时间戳滞后于时钟时间。Output timestamp is falling behind with respect to wall clock time. 根据查询逻辑,输出时间戳可能与时钟时间之间存在一个逻辑偏差。Depending on your query logic, the output timestamp may have a logic offset from the wall clock time. 但是,它们应该以大致相同的速度增进。However, they should progress at roughly the same rate. 如果输出时间戳进一步滞后,则指示系统工作时间过长。If the output timestamp is falling further and further behind, it's an indicator that the system is overworking. 它可能是由于下游输出接收器限制,或高 CPU 利用率所致。It can be a result of downstream output sink throttling, or high CPU utilization. 我们目前没有提供 CPU 利用率指标,因此很难区分两者。We don't provide CPU utilization metric at this time, so it can be difficult to differentiate the two.
      • 如果该问题是由接收器限制导致,则可能需要增加输出分区数(以及输入分区,以此使作业保持完全可并行化),或增加接收器的资源量(例如,CosmosDB 的请求单位数)。If the issue is due to sink throttling, you may need to increase the number of output partitions (and also input partitions to keep the job fully parallelizable), or increase the amount of resources of the sink (for example number of Request Units for CosmosDB).
    • 在作业关系图中,每个输出都有一个分区积压工作 (backlog) 事件指标。In job diagram, there is a per partition backlog event metric for each input. 如果积压工作事件指标持续增加,则同样表示系统资源受到约束(由于输出接收器限制或 CPU 使用率过高)。If the backlog event metric keeps increasing, it's also an indicator that the system resource is constrained (either because of output sink throttling, or high CPU).
  4. 在确定 6 SU 作业可以达到的上限之后,可以在添加更多的 SU 时线性推断出作业的处理容量,前提是没有任何使某些分区“紧迫”的数据倾斜。Once you have determined the limits of what a 6 SU job can reach, you can extrapolate linearly the processing capacity of the job as you add more SUs, assuming you don't have any data skew that makes certain partition "hot."

备注

选择合适的流单元数:由于流分析为每个添加的 6 SU 创建一个处理节点,因此,最好将节点数作为输入分区数的除数,以便分区可以均匀分布在各节点上。Choose the right number of Streaming Units: Because Stream Analytics creates a processing node for each 6 SU added, it�s best to make the number of nodes a divisor of the number of input partitions, so the partitions can be evenly distributed across the nodes. 例如,你已经度量出 6 SU 作业可以实现 4 MB/秒的处理速率,且输入分区计数为 4。For example, you have measured your 6 SU job can achieve 4 MB/s processing rate, and your input partition count is 4. 可以选择使用 12 SU 来运行作业,以达到大约 8 MB/秒的处理速率,或使用 24 SU 来实现 16 MB/秒的处理速率。You can choose to run your job with 12 SU to achieve roughly 8 MB/s processing rate, or 24 SU to achieve 16 MB/s. 然后,你可以决定何时增加作业的 SU 数量以及增加至多少,以作为输入速率的一个函数。You can then decide when to increase SU number for the job to what value, as a function of your input rate.

案例 2 - 如果查询不是易并行。Case 2 - If your query is not embarrassingly parallel.

如果查询不是易并行,则可以执行以下步骤。If your query is not embarrassingly parallel, you can follow the following steps.

  1. 首先使用不带 PARTITION BY 的查询,以避免分区复杂性,然后使用 6 SU 运行查询,以度量最大负载,如案例 1 所示。Start with a query with no PARTITION BY first to avoid partitioning complexity, and run your query with 6 SU to measure maximum load as in Case 1.
  2. 如果能在吞吐量方面达到预期负载,则操作完成。If you can achieve your anticipated load in term of throughput, you are done. 或者,你可以选择度量在 3 SU 和 1 SU 上运行的相同作业,以找到适用于你方案的 SU 的最小数目。Alternatively, you may choose to measure the same job running at 3 SU and 1 SU, to find out the minimum number of SU that works for your scenario.
  3. 如果不能实现所需的吞吐量,请尝试尽可能地将查询分解为多个步骤(如果还没有多个步骤),并在查询中为每个步骤分配最多 6 个 SU。If you can't achieve the desired throughput, try to break your query into multiple steps if possible, if it doesn't have multiple steps already, and allocate up to 6 SU for each step in the query. 例如,如果有 3 个步骤,则在“规模”选项中分配 18 个 SU。For example if you have 3 steps, allocate 18 SU in the "Scale" option.
  4. 在运行此类作业时,流分析会将各步骤分配到自己包含专用 6 SU 资源的节点上。When running such a job, Stream Analytics puts each step on its own node with dedicated 6 SU resources.
  5. 如果仍未实现负载目标,可以尝试使用 PARTITION BY,从更接近输入的步骤开始。If you still haven't achieved your load target, you can attempt to use PARTITION BY starting from steps closer to the input. 对于不可自然分区的 GROUP BY 运算符,可以使用本地/全局聚合模式来执行分区的 GROUP BY,然后执行非分区 GROUP BY。For GROUP BY operator that may not be naturally partitionable, you can use the local/global aggregate pattern to perform a partitioned GROUP BY followed by a non-partitioned GROUP BY. 例如,如果想要计算每 3 分钟有多少车辆通过每个收费站,以及超出 6 SU 能够处理范围的数据量。For example, if you want to count how many cars going through each toll booth every 3 minutes, and the volume of the data is beyond what can be handled by 6 SU.

查询:Query:

WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId, PartitionId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId

在上述查询中,计算每个分区每个收费站的车辆数,然后将所有分区中的计数相加。In the query above, you are counting cars per toll booth per partition, and then adding the count from all partitions together.

分区后,对于该步骤的每个分区,最多分配 6 个 SU,每个分区最多有 6 个 SU,因此,每个分区都可以置于其自己的处理节点上。Once partitioned, for each partition of the step, allocate up to 6 SU, each partition having 6 SU is the maximum, so each partition can be placed on its own processing node.

备注

如果无法对查询进行分区,则在多步骤查询中添加额外的 SU 可能无法始终有效地提高吞吐量。If your query cannot be partitioned, adding additional SU in a multi-steps query may not always improve throughput. 获得性能的一种方法是,使用本地/全局聚合模式来减少初始步骤的数量,如前面的步骤 5 中所述。One way to gain performance is to reduce volume on the initial steps using local/global aggregate pattern, as described above in step 5.

案例 3 - 在作业中运行大量的独立查询。Case 3 - You are running lots of independent queries in a job.

对于某些 ISV 用例,如果在单个作业中处理来自多个租户的数据更经济有效,则可为每个租户使用单独的输入和输出,最终在单个作业中运行相当多(例如 20 个)的独立查询。For certain ISV use cases, where it's more cost-efficient to process data from multiple tenants in a single job, using separate inputs and outputs for each tenant, you may end up running quite a few (for example 20) independent queries in a single job. 假设条件是每个此类子查询的负载都相对较小。The assumption is each such subquery's load is relatively small. 在这种情况下,可以按照以下步骤操作。In this case, you can follow the following steps.

  1. 在这种情况下,请勿在查询中使用 PARTITION BYIn this case, do not use PARTITION BY in the query
  2. 如果使用事件中心,则将输入分区计数减少到可能的最低值 2。Reduce the input partition count to the lowest possible value of 2 if you are using Event Hub.
  3. 使用 6 SU 运行查询。Run the query with 6 SU. 通过每个子查询的预期负载,尽可能多地添加此类子查询,直到作业达到系统资源上限。With expected load for each subquery, add as many such subqueries as possible, until the job is hitting system resource limits. 有关发生这种情况时的症状,请参阅案例 1。Refer to Case 1 for the symptoms when this happens.
  4. 一旦达到以上度量的子查询上限,可开始向新作业添加子查询。Once you are hitting the subquery limit measured above, start adding the subquery to a new job. 作为独立查询数量的函数运行的作业数应是标准线性的,前提是没有任何负载偏移。The number of jobs to run as a function of the number of independent queries should be fairly linear, assuming you don�t have any load skew. 然后,可以预测你需要多少个 6 SU 作业,作为你想要提供的租户数的函数运行。You can then forecast how many 6 SU jobs you need to run as a function of the number of tenants you would like to serve.
  5. 将引用数据联接与此类查询结合使用时,应在使用相同引用数据进行联接之前将输入合并在一起。When using reference data join with such queries, union the inputs together before joining with the same reference data. 然后再根据需要拆分事件。Then, split out the events if necessary. 否则,每个引用数据联接会在内存中保留一份引用数据,可能导致不必要的内存使用。Otherwise, each reference data join keeps a copy of reference data in memory, likely blowing up the memory usage unnecessarily.

备注

每个作业中要放置多少租户?How many tenants to put in each job? 此查询模式通常具有大量子查询,并导致非常大且复杂的拓扑。This query pattern often has a large number of subqueries, and results in very large and complex topology. 作业控制器可能无法处理此类大型拓扑。The controller of the job may not be able to handle such a large topology. 根据经验,1 SU 作业应保持在 40 个租户以下,3 SU 和 6 SU 作业则为 60 个租户。As a rule of thumb, stay under 40 tenants for 1 SU job, and 60 tenants for 3 SU and 6 SU jobs. 如果即将超出控制器的容量,则不会成功启动作业。When you are exceeding the capacity of the controller, the job will not start successfully.

获取帮助Get help

如需获取进一步的帮助,可前往 Azure 流分析的 Microsoft 问答页面For further assistance, try our Microsoft Q&A question page for Azure Stream Analytics.

后续步骤Next steps