随机执行查询Shuffle query
随机执行查询是一种保留语义的转换,适用于支持随机执行策略的一组运算符。Shuffle query is a semantic-preserving transformation for a set of operators that support shuffle strategy. 此查询可以显著提高性能,具体取决于实际数据。Depending on the actual data, this query can yield considerably better performance.
Kusto 中支持随机执行的运算符是 join、summarize 和 make-series。Operators that support shuffling in Kusto are join, summarize, and make-series.
可以使用查询参数 hint.strategy = shuffle
或 hint.shufflekey = <key>
设置随机执行查询策略。Set shuffle query strategy using the query parameter hint.strategy = shuffle
or hint.shufflekey = <key>
.
语法Syntax
T | where Event=="Start" | project ActivityId, Started=Timestamp
| join hint.strategy = shuffle (T | where Event=="End" | project ActivityId, Ended=Timestamp)
on ActivityId
| extend Duration=Ended - Started
| summarize avg(Duration)
T
| summarize hint.strategy = shuffle count(), avg(price) by supplier
T
| make-series hint.shufflekey = Fruit PriceAvg=avg(Price) default=0 on Purchase from datetime(2016-09-10) to datetime(2016-09-13) step 1d by Supplier, Fruit
此策略会在所有群集节点上共享负载,其中的每个节点将处理一个数据分区。This strategy will share the load on all cluster nodes, where each node will process one partition of the data.
如果键(join
键、summarize
键或 make-series
键)的基数较高且常规查询策略达到查询限制,则使用随机执行查询策略会很有用。It is useful to use the shuffle query strategy when the key (join
key, summarize
key, or make-series
key) has a high cardinality and the regular query strategy hits query limits.
hint.strategy=shuffle 与 hint.shufflekey = key 之间的差异Difference between hint.strategy=shuffle and hint.shufflekey = key
hint.strategy=shuffle
表示随机执行的运算符的执行顺序会被所有键打乱。hint.strategy=shuffle
means that the shuffled operator will be shuffled by all the keys.
例如,在以下查询中:For example, in this query:
T | where Event=="Start" | project ActivityId, Started=Timestamp
| join hint.strategy = shuffle (T | where Event=="End" | project ActivityId, Ended=Timestamp)
on ActivityId, ProcessId
| extend Duration=Ended - Started
| summarize avg(Duration)
随机操作数据的哈希函数会同时使用 ActivityId 键和 ProcessId 键。The hash function that shuffles the data will use both keys ActivityId and ProcessId.
上面的查询等效于:The query above is equivalent to:
T | where Event=="Start" | project ActivityId, Started=Timestamp
| join hint.shufflekey = ActivityId hint.shufflekey = ProcessId (T | where Event=="End" | project ActivityId, Ended=Timestamp)
on ActivityId, ProcessId
| extend Duration=Ended - Started
| summarize avg(Duration)
如果复合键足够唯一,但每个键都不够唯一,请使用此 hint
按随机执行的运算符的所有键随机操作数据。If the compound key is too unique, but each key is not unique enough, use this hint
to shuffle the data by all the keys of the shuffled operator.
如果随机执行的运算符具有其他能够随机执行的运算符(例如 summarize
或 join
),则查询会变得更加复杂,那么就不会应用 hint.strategy=shuffle。When the shuffled operator has other shuffle-able operators, like summarize
or join
, the query becomes more complex and then hint.strategy=shuffle won't be applied.
例如:for example:
T
| where Event=="Start"
| project ActivityId, Started=Timestamp, numeric_column
| summarize count(), numeric_column = any(numeric_column) by ActivityId
| join
hint.strategy = shuffle (T
| where Event=="End"
| project ActivityId, Ended=Timestamp, numeric_column
)
on ActivityId, numeric_column
| extend Duration=Ended - Started
| summarize avg(Duration)
如果你应用 hint.strategy=shuffle
(而不是在查询规划期间忽略此策略),并根据复合键 [ActivityId
, numeric_column
] 随机操作数据,则结果将不正确。If you apply the hint.strategy=shuffle
(instead of ignoring the strategy during query-planning) and shuffle the data by the compound key [ActivityId
, numeric_column
], the result won't be correct.
summarize
运算符位于 join
运算符的左侧。The summarize
operator is on the left side of the join
operator. 此运算符将按 join
键的子集(在我们的示例中为 ActivityId
)进行分组。This operator will group by a subset of the join
keys, which in our case is ActivityId
. 因此,summarize
将按 ActivityId
键进行分组,而数据将按复合键 [ActivityId
, numeric_column
] 进行分区。Thus, the summarize
will group by the key ActivityId
, while the data is partitioned by the compound key [ActivityId
, numeric_column
].
按复合键 [ActivityId
, numeric_column
] 随机执行不一定意味着针对 ActivityId
键的随机执行有效,并且结果可能不正确。Shuffling by the compound key [ActivityId
, numeric_column
] doesn't necessarily mean that shuffling for the key ActivityId
is valid, and the results may be incorrect.
此示例假设用于复合键的哈希函数是 binary_xor(hash(key1, 100) , hash(key2, 100))
:This example assumes that the hash function used for a compound key is binary_xor(hash(key1, 100) , hash(key2, 100))
:
datatable(ActivityId:string, NumericColumn:long)
[
"activity1", 2,
"activity1" ,1,
]
| extend hash_by_key = binary_xor(hash(ActivityId, 100) , hash(NumericColumn, 100))
ActivityIdActivityId | NumericColumnNumericColumn | hash_by_keyhash_by_key |
---|---|---|
activity1activity1 | 22 | 5656 |
activity1activity1 | 11 | 6565 |
这两条记录的复合键映射到不同的分区(56 和 65),但这两条记录具有相同的值 ActivityId
。The compound key for both records was mapped to different partitions (56 and 65), but these two records have the same value of ActivityId
. join
左侧的 summarize
运算符预期 ActivityId
列的类似值将位于同一分区中。The summarize
operator on the left side of the join
expects similar values of the column ActivityId
to be in the same partition. 此查询会产生不正确的结果。This query will produce incorrect results.
可以通过使用 hint.shufflekey
将联接的随机执行键指定为 hint.shufflekey = ActivityId
来解决此问题。You can solve this issue by using hint.shufflekey
to specify the shuffle key on the join to hint.shufflekey = ActivityId
. 此键是所有能够随机执行的运算符通用的。This key is common for all shuffle-able operators.
在本例中,随机执行是安全的,因为 join
和 summarize
按同一键随机执行。The shuffling is safe in this case, because both join
and summarize
shuffle by the same key. 因此,所有类似的值都将位于同一个分区中,结果是正确的:Thus, all similar values will be in the same partition and the results are correct:
T
| where Event=="Start"
| project ActivityId, Started=Timestamp, numeric_column
| summarize count(), numeric_column = any(numeric_column) by ActivityId
| join
hint.shufflekey = ActivityId (T
| where Event=="End"
| project ActivityId, Ended=Timestamp, numeric_column
)
on ActivityId, numeric_column
| extend Duration=Ended - Started
| summarize avg(Duration)
ActivityIdActivityId | NumericColumnNumericColumn | hash_by_keyhash_by_key |
---|---|---|
activity1activity1 | 22 | 5656 |
activity1activity1 | 11 | 6565 |
在随机执行查询中,默认分区数是群集节点数。In shuffle query, the default partitions number is the cluster nodes number. 可以使用 hint.num_partitions = total_partitions
语法来重写此数目,该语法将控制分区数。This number can be overridden by using the syntax hint.num_partitions = total_partitions
, which will control the number of partitions.
如果群集具有少量的群集节点且其中的默认分区数也较小,而查询仍然失败或需要较长的执行时间,则此提示非常有用。This hint is useful when the cluster has a small number of cluster nodes where the default partitions number will be small too and the query still fails or takes long execution time.
备注
具有许多分区可能会消耗更多的群集资源并降低性能。Having many partitions may consume more cluster resources and degrade performance. 应改为从 hint.strategy = shuffle 着手,谨慎选择分区数,然后逐渐开始增加分区。Instead, choose the partition number carefully by starting with the hint.strategy = shuffle and start increasing the partitions gradually.
示例Examples
下面的示例展示了随机执行 summarize
如何显著提高性能。The following example shows how shuffle summarize
improves performance considerably.
源表有 150M 记录,分组依据键的基数是 10M,它分布在 10 个群集节点上。The source table has 150M records and the cardinality of the group by key is 10M, which is spread over 10 cluster nodes.
运行常规 summarize
策略时,查询将在 1 分 8 秒之后结束,内存使用量峰值大约为 3 GB:Running the regular summarize
strategy, the query ends after 1:08 and the memory usage peak is ~3 GB:
orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey
| where o_totalprice < 1000
| count
计数Count |
---|
10861086 |
使用随机执行 summarize
策略时,查询将在大约 7 秒之后结束,内存使用量峰值为 0.43 GB:While using shuffle summarize
strategy, the query ends after ~7 seconds and the memory usage peak is 0.43 GB:
orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey
| where o_totalprice < 1000
| count
计数Count |
---|
10861086 |
下面的示例展示了对具有两个群集节点的群集的改进,表中包含 60M 记录,分组依据键的基数为 2M。The following example shows the improvement on a cluster that has two cluster nodes, the table has 60M records, and the cardinality of the group by key is 2M.
在不使用 hint.num_partitions
的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:Running the query without hint.num_partitions
will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:
lineitem
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey
| consume
如果将分区数设置为 10,则查询将在 23 秒后结束:setting partitions number to 10, the query will end after 23 seconds:
lineitem
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey
| consume
下面的示例展示了随机执行 join
如何显著提高性能。The following example shows how shuffle join
improves performance considerably.
示例是在包含 10 个节点的群集上采样的,数据分散在所有这些节点上。The examples were sampled on a cluster with 10 nodes where the data is spread over all these nodes.
左表有 15M 记录,其中 join
键的基数大约为 14M。The left table has 15M records where the cardinality of the join
key is ~14M. join
的右侧有 150M 记录,而 join
键的基数为 10M。The right side of the join
is with 150M records and the cardinality of the join
key is 10M.
运行 join
的常规策略时,查询将在大约 28 秒之后结束,内存使用量峰值大约为 1.43 GB:Running the regular strategy of the join
, the query ends after ~28 seconds and the memory usage peak is 1.43 GB:
customer
| join
orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey
使用随机执行 join
策略时,查询将在大约 4 秒之后结束,内存使用量峰值为 0.3 GB:While using shuffle join
strategy, the query ends after ~4 seconds and the memory usage peak is 0.3 GB:
customer
| join
hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey
尝试对更大的数据集执行相同的查询,其中,join
的左侧为 150M,键的基数为 148M。Trying the same queries on a larger dataset where left side of the join
is 150M and the cardinality of the key is 148M. join
的右侧为 1.5B,而键的基数大约为 100M。The right side of the join
is 1.5B, and the cardinality of the key is ~100M.
使用默认的 join
策略的查询会达到 Kusto 限制并且在 4 分钟后超时。The query with the default join
strategy hits Kusto limits and times-out after 4 mins.
使用随机执行 join
策略时,查询将在大约 34 秒之后结束,内存使用量峰值为 1.23 GB。While using shuffle join
strategy, the query ends after ~34 seconds and the memory usage peak is 1.23 GB.
下面的示例展示了对具有两个群集节点的群集的改进,表中包含 60M 记录,join
键的基数为 2M。The following example shows the improvement on a cluster that has two cluster nodes, the table has 60M records, and the cardinality of the join
key is 2M.
在不使用 hint.num_partitions
的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:Running the query without hint.num_partitions
will use only two partitions (as cluster nodes number) and the following query will take ~1:10 mins:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
hint.shufflekey = l_partkey part
on $left.l_partkey == $right.p_partkey
| consume
如果将分区数设置为 10,则查询将在 23 秒后结束:setting partitions number to 10, the query will end after 23 seconds:
lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
hint.shufflekey = l_partkey hint.num_partitions = 10 part
on $left.l_partkey == $right.p_partkey
| consume