随机执行查询Shuffle query

随机执行查询是一种保留语义的转换,适用于支持随机执行策略的一组运算符。Shuffle query is a semantic-preserving transformation for a set of operators that support shuffle strategy. 此查询可以显著提高性能,具体取决于实际数据。Depending on the actual data, this query can yield considerably better performance.

Kusto 中支持随机执行的运算符是 joinsummarizemake-seriesOperators that support shuffling in Kusto are join, summarize, and make-series.

可以使用查询参数 hint.strategy = shufflehint.shufflekey = <key> 设置随机执行查询策略。Set shuffle query strategy using the query parameter hint.strategy = shuffle or hint.shufflekey = <key>.

在表上定义数据分区策略Define a data partitioning policy on your table.

shufflekey 设置为表的哈希分区键以提高性能,因为需要在群集节点之间移动的数据量将减少。Set shufflekey as the table's hash partition key for better performance, as the amount of data required to move across cluster nodes is reduced.

语法Syntax

T | where Event=="Start" | project ActivityId, Started=Timestamp
| join hint.strategy = shuffle (T | where Event=="End" | project ActivityId, Ended=Timestamp)
  on ActivityId
| extend Duration=Ended - Started
| summarize avg(Duration)
T
| summarize hint.strategy = shuffle count(), avg(price) by supplier
T
| make-series hint.shufflekey = Fruit PriceAvg=avg(Price) default=0  on Purchase from datetime(2016-09-10) to datetime(2016-09-13) step 1d by Supplier, Fruit

此策略会在所有群集节点上共享负载,其中的每个节点将处理一个数据分区。This strategy will share the load on all cluster nodes, where each node will process one partition of the data. 如果键(join 键、summarize 键或 make-series 键)的基数较高且常规查询策略达到查询限制,则使用随机执行查询策略会很有用。It is useful to use the shuffle query strategy when the key (join key, summarize key, or make-series key) has a high cardinality and the regular query strategy hits query limits.

hint.strategy=shuffle 与 hint.shufflekey = key 之间的差异Difference between hint.strategy=shuffle and hint.shufflekey = key

hint.strategy=shuffle 表示随机执行的运算符的执行顺序会被所有键打乱。hint.strategy=shuffle means that the shuffled operator will be shuffled by all the keys. 例如,在以下查询中:For example, in this query:

T | where Event=="Start" | project ActivityId, Started=Timestamp
| join hint.strategy = shuffle (T | where Event=="End" | project ActivityId, Ended=Timestamp)
  on ActivityId, ProcessId
| extend Duration=Ended - Started
| summarize avg(Duration)

随机操作数据的哈希函数会同时使用 ActivityId 键和 ProcessId 键。The hash function that shuffles the data will use both keys ActivityId and ProcessId.

上面的查询等效于:The query above is equivalent to:

T | where Event=="Start" | project ActivityId, Started=Timestamp
| join hint.shufflekey = ActivityId hint.shufflekey = ProcessId (T | where Event=="End" | project ActivityId, Ended=Timestamp)
  on ActivityId, ProcessId
| extend Duration=Ended - Started
| summarize avg(Duration)

如果复合键足够唯一,但每个键都不够唯一,请使用此 hint 按随机执行的运算符的所有键随机操作数据。If the compound key is too unique, but each key is not unique enough, use this hint to shuffle the data by all the keys of the shuffled operator. 如果随机执行的运算符具有其他能够随机执行的运算符(例如 summarizejoin),则查询会变得更加复杂,那么就不会应用 hint.strategy=shuffle。When the shuffled operator has other shuffle-able operators, like summarize or join, the query becomes more complex and then hint.strategy=shuffle won't be applied.

例如:for example:

T
| where Event=="Start"
| project ActivityId, Started=Timestamp, numeric_column
| summarize count(), numeric_column = any(numeric_column) by ActivityId
| join
    hint.strategy = shuffle (T
    | where Event=="End"
    | project ActivityId, Ended=Timestamp, numeric_column
)
on ActivityId, numeric_column
| extend Duration=Ended - Started
| summarize avg(Duration)

如果你应用 hint.strategy=shuffle(而不是在查询规划期间忽略此策略),并根据复合键 [ActivityId, numeric_column] 随机操作数据,则结果将不正确。If you apply the hint.strategy=shuffle (instead of ignoring the strategy during query-planning) and shuffle the data by the compound key [ActivityId, numeric_column], the result won't be correct. summarize 运算符位于 join 运算符的左侧。The summarize operator is on the left side of the join operator. 此运算符将按 join 键的子集(在我们的示例中为 ActivityId)进行分组。This operator will group by a subset of the join keys, which in our case is ActivityId. 因此,summarize 将按 ActivityId 键进行分组,而数据将按复合键 [ActivityId, numeric_column] 进行分区。Thus, the summarize will group by the key ActivityId, while the data is partitioned by the compound key [ActivityId, numeric_column]. 按复合键 [ActivityId, numeric_column] 随机执行不一定意味着针对 ActivityId 键的随机执行有效,并且结果可能不正确。Shuffling by the compound key [ActivityId, numeric_column] doesn't necessarily mean that shuffling for the key ActivityId is valid, and the results may be incorrect.

此示例假设用于复合键的哈希函数是 binary_xor(hash(key1, 100) , hash(key2, 100))This example assumes that the hash function used for a compound key is binary_xor(hash(key1, 100) , hash(key2, 100)):


datatable(ActivityId:string, NumericColumn:long)
[
"activity1", 2,
"activity1" ,1,
]
| extend hash_by_key = binary_xor(hash(ActivityId, 100) , hash(NumericColumn, 100))
ActivityIdActivityId NumericColumnNumericColumn hash_by_keyhash_by_key
activity1activity1 22 5656
activity1activity1 11 6565

这两条记录的复合键映射到不同的分区(56 和 65),但这两条记录具有相同的值 ActivityIdThe compound key for both records was mapped to different partitions (56 and 65), but these two records have the same value of ActivityId. join 左侧的 summarize 运算符预期 ActivityId 列的类似值将位于同一分区中。The summarize operator on the left side of the join expects similar values of the column ActivityId to be in the same partition. 此查询会产生不正确的结果。This query will produce incorrect results.

可以通过使用 hint.shufflekey 将联接的随机执行键指定为 hint.shufflekey = ActivityId 来解决此问题。You can solve this issue by using hint.shufflekey to specify the shuffle key on the join to hint.shufflekey = ActivityId. 此键是所有能够随机执行的运算符通用的。This key is common for all shuffle-able operators. 在本例中,随机执行是安全的,因为 joinsummarize 按同一键随机执行。The shuffling is safe in this case, because both join and summarize shuffle by the same key. 因此,所有类似的值都将位于同一个分区中,结果是正确的:Thus, all similar values will be in the same partition and the results are correct:

T
| where Event=="Start"
| project ActivityId, Started=Timestamp, numeric_column
| summarize count(), numeric_column = any(numeric_column) by ActivityId
| join
    hint.shufflekey = ActivityId (T
    | where Event=="End"
    | project ActivityId, Ended=Timestamp, numeric_column
)
on ActivityId, numeric_column
| extend Duration=Ended - Started
| summarize avg(Duration)
ActivityIdActivityId NumericColumnNumericColumn hash_by_keyhash_by_key
activity1activity1 22 5656
activity1activity1 11 6565

在随机执行查询中,默认分区数是群集节点数。In shuffle query, the default partitions number is the cluster nodes number. 可以使用 hint.num_partitions = total_partitions 语法来重写此数目,该语法将控制分区数。This number can be overridden by using the syntax hint.num_partitions = total_partitions, which will control the number of partitions.

如果群集具有少量的群集节点且其中的默认分区数也较小,而查询仍然失败或需要较长的执行时间,则此提示非常有用。This hint is useful when the cluster has a small number of cluster nodes where the default partitions number will be small too and the query still fails or takes long execution time.

备注

具有许多分区可能会消耗更多的群集资源并降低性能。Having many partitions may consume more cluster resources and degrade performance. 应改为从 hint.strategy = shuffle 着手,谨慎选择分区数,然后逐渐开始增加分区。Instead, choose the partition number carefully by starting with the hint.strategy = shuffle and start increasing the partitions gradually.

示例Examples

下面的示例展示了随机执行 summarize 如何显著提高性能。The following example shows how shuffle summarize improves performance considerably.

源表有 150M 记录,分组依据键的基数是 10M,它分布在 10 个群集节点上。The source table has 150M records and the cardinality of the group by key is 10M, which is spread over 10 cluster nodes.

运行常规 summarize 策略时,查询将在 1 分 8 秒之后结束,内存使用量峰值大约为 3 GB:Running the regular summarize strategy, the query ends after 1:08 and the memory usage peak is ~3 GB:

orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count
计数Count
10861086

使用随机执行 summarize 策略时,查询将在大约 7 秒之后结束,内存使用量峰值为 0.43 GB:While using shuffle summarize strategy, the query ends after ~7 seconds and the memory usage peak is 0.43 GB:

orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count
计数Count
10861086

下面的示例展示了对具有两个群集节点的群集的改进,表中包含 60M 记录,分组依据键的基数为 2M。The following example shows the improvement on a cluster that has two cluster nodes, the table has 60M records, and the cardinality of the group by key is 2M.

在不使用 hint.num_partitions 的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:Running the query without hint.num_partitions will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:

lineitem    
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

如果将分区数设置为 10,则查询将在 23 秒后结束:setting partitions number to 10, the query will end after 23 seconds:

lineitem    
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

下面的示例展示了随机执行 join 如何显著提高性能。The following example shows how shuffle join improves performance considerably.

示例是在包含 10 个节点的群集上采样的,数据分散在所有这些节点上。The examples were sampled on a cluster with 10 nodes where the data is spread over all these nodes.

左表有 15M 记录,其中 join 键的基数大约为 14M。The left table has 15M records where the cardinality of the join key is ~14M. join 的右侧有 150M 记录,而 join 键的基数为 10M。The right side of the join is with 150M records and the cardinality of the join key is 10M. 运行 join 的常规策略时,查询将在大约 28 秒之后结束,内存使用量峰值大约为 1.43 GB:Running the regular strategy of the join, the query ends after ~28 seconds and the memory usage peak is 1.43 GB:

customer
| join
    orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

使用随机执行 join 策略时,查询将在大约 4 秒之后结束,内存使用量峰值为 0.3 GB:While using shuffle join strategy, the query ends after ~4 seconds and the memory usage peak is 0.3 GB:

customer
| join
    hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

尝试对更大的数据集执行相同的查询,其中,join 的左侧为 150M,键的基数为 148M。Trying the same queries on a larger dataset where left side of the join is 150M and the cardinality of the key is 148M. join 的右侧为 1.5B,而键的基数大约为 100M。The right side of the join is 1.5B, and the cardinality of the key is ~100M.

使用默认的 join 策略的查询会达到 Kusto 限制并且在 4 分钟后超时。The query with the default join strategy hits Kusto limits and times-out after 4 mins. 使用随机执行 join 策略时,查询将在大约 34 秒之后结束,内存使用量峰值为 1.23 GB。While using shuffle join strategy, the query ends after ~34 seconds and the memory usage peak is 1.23 GB.

下面的示例展示了对具有两个群集节点的群集的改进,表中包含 60M 记录,join 键的基数为 2M。The following example shows the improvement on a cluster that has two cluster nodes, the table has 60M records, and the cardinality of the join key is 2M. 在不使用 hint.num_partitions 的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:Running the query without hint.num_partitions will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey   part
on $left.l_partkey == $right.p_partkey
| consume

如果将分区数设置为 10,则查询将在 23 秒后结束:setting partitions number to 10, the query will end after 23 seconds:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey  hint.num_partitions = 10    part
on $left.l_partkey == $right.p_partkey
| consume