shuffle 查询

shuffle 查询是一种保留语义的转换,可以结合支持 shuffle 策略的一组运算符使用。 根据涉及的数据,使用 shuffle 策略进行查询可以提高性能。 如果 shuffle 键(join 键、summarize 键、make-series 键或 partition 键)的基数较高,并且常规运算符查询达到查询限制,则使用 shuffle 查询策略效果更好。

可以在 shuffle 命令中使用以下运算符:

若要使用 shuffle 查询策略,请添加表达式 hint.strategy = shufflehint.shufflekey = <key>。 使用 hint.strategy=shuffle 时,所有键会导致运算符数据随机排布。 当复合键唯一,但每个键不够唯一时,请使用此表达式,以便使用 shuffle 运算符的所有键来随机排布数据。

使用 shuffle 策略对数据进行分区时,数据负载将在所有群集节点之间分担。 每个节点处理数据的一个分区。 默认分区数等于群集节点数。

可以使用 hint.num_partitions = total_partitions 语法来重写分区数,该语法将控制分区数。 如果群集具有少量的群集节点且默认分区数较小,而查询失败或需要较长的执行时间,则此语法非常有用。

注意

使用大量分区可能会消耗更多的群集资源并降低性能。 从 hint.strategy = shuffle 着手,谨慎选择分区数,然后逐渐开始增加分区。

在某些情况下,会忽略 hint.strategy = shuffle,且不会在 shuffle 策略中运行查询。 下列情况下测试输入无效:

  • join 运算符在左侧或右侧有另一个与 shuffle 兼容的运算符(joinsummarizemake-seriespartition)。
  • summarize 运算符出现在查询中另一个与 shuffle 兼容的运算符(joinsummarizemake-seriespartition)之后。

语法

With hint.strategy = shuffle

T|DataExpression|joinhint.strategy = shuffle(DataExpression)

T|summarizehint.strategy = shuffleDataExpression

T|Query| partition hint.strategy = shuffle(SubQuery)

使用 hint.shufflekey = hint.shufflekey

T|DataExpression|joinhint.shufflekey = key(DataExpression)

T|summarizehint.shufflekey = keyDataExpression

T|make-serieshint.shufflekey = keyDataExpression

T|Query| partition hint.shufflekey = key(SubQuery)

详细了解语法约定

参数

客户 类型​​ 必需 说明
T string 要由运算符处理其数据的表格源。
DataExpression string 隐式或显式表格转换表达式。
查询 string 对 T 记录运行的转换表达式。
string 使用 join 键、summarize 键、make-series 键或 partition 键。
SubQuery string 转换表达式。

注意

必须根据所选语法指定 DataExpression 或 Query。

示例

将 summarize 与 shuffle 一起使用

使用 summarize 运算符的 shuffle 策略查询会在所有群集节点上分担负载,其中的每个节点会处理一个数据分区。

StormEvents
| summarize hint.strategy = shuffle count(), avg(InjuriesIndirect) by State
| count 

输出

Count
67

将 join 与 shuffle 一起使用

StormEvents
| where State has "West"
| where EventType has "Flood"
| join hint.strategy=shuffle 
    (
    StormEvents
    | where EventType has "Hail"
    | project EpisodeId, State, DamageProperty
    )
    on State
| count

输出

Count
103

将 make-series 与 shuffle 一起使用

StormEvents
| where State has "North"
| make-series hint.shufflekey = State sum(DamageProperty) default = 0 on StartTime in range(datetime(2007-01-01 00:00:00.0000000), datetime(2007-01-31 23:59:00.0000000), 15d) by State

输出

状态 sum_DamageProperty StartTime
NORTH DAKOTA [60000,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
NORTH CAROLINA [20000,0,1000] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]
ATLANTIC NORTH [0,0,0] ["2006-12-31T00:00:00.0000000Z","2007-01-15T00:00:00.0000000Z","2007-01-30T00:00:00.0000000Z"]

将 partition 与 shuffle 一起使用

StormEvents
| partition hint.strategy=shuffle by EpisodeId
(
    top 3 by DamageProperty
    | project EpisodeId, State, DamageProperty
)
| count

输出

Count
22345

Compare hint.strategy=shuffle and hint.shufflekey=key

使用 hint.strategy=shuffle 时,所有键会导致随机运算符随机排布。 在以下示例中,查询使用 EpisodeIdEventId 作为键将数据随机排布:

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId
| count

输出

Count
14

下面的查询使用 hint.shufflekey = key。 以上查询相当于此查询。

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| join kind = inner hint.shufflekey = EpisodeId hint.shufflekey = EventId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

输出

Count
14

使用多个键随机排布数据

在某些情况下,将忽略 hint.strategy=shuffle,且不会在无序策略中运行查询。 例如,在以下示例中,join 的左侧是 summarize,因此使用 hint.strategy=shuffle 不会将 shuffle 策略应用于查询:

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.strategy=shuffle (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

输出

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

若要克服此问题并在 shuffle 策略中运行,请选择 summarizejoin 运算通用的键。 在本例中,此键为 EpisodeId。 使用提示 hint.shufflekeyjoin 的随机排布键指定为 hint.shufflekey = EpisodeId

StormEvents
| where StartTime > datetime(2007-01-01 00:00:00.0000000)
| summarize count() by EpisodeId, EventId
| join kind = inner hint.shufflekey=EpisodeId (StormEvents | where DamageCrops > 62000000) on EpisodeId, EventId

输出

EpisodeId EventId ... EpisodeId1 EventId1 ...
1030 4407 ... 1030 4407 ...
1030 13721 ... 1030 13721 ...
2477 12530 ... 2477 12530 ...
2103 10237 ... 2103 10237 ...
2103 10239 ... 2103 10239 ...
... ... ... ... ... ...

将 summarize 与 shuffle 一起使用以提高性能

此示例中,将 summarize 运算符与 shuffle 策略一起使用可以提高性能。 源表有 150M 记录,分组依据键的基数是 10M,它分布在 10 个群集节点上。

如果使用 summarize 运算符而不使用 shuffle 策略,查询将在 1 分 8 秒之后结束,内存使用量峰值大约为 3 GB:

orders
| summarize arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

输出

计数
1086

shuffle 策略与 summarize 一起使用时,查询将在大约 7 秒之后结束,内存使用量峰值为 0.43 GB:

orders
| summarize hint.strategy = shuffle arg_max(o_orderdate, o_totalprice) by o_custkey 
| where o_totalprice < 1000
| count

输出

计数
1086

以下示例演示了具有两个群集节点的群集上的性能,其中某个表包含 6000 万条记录,group by 键的基数为 200 万。

在不使用 hint.num_partitions 的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:

lineitem 
| summarize hint.strategy = shuffle dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

如果将分区数设置为 10,则查询将在 23 秒后结束:

lineitem 
| summarize hint.strategy = shuffle hint.num_partitions = 10 dcount(l_comment), dcount(l_shipdate) by l_partkey 
| consume

将 join 与 shuffle 一起使用以提高性能

以下示例演示如何将 shuffle 策略与 join 运算符一起使用以提高性能。

示例是在包含 10 个节点的群集上采样的,数据分散在所有这些节点上。

查询的左侧源表包含 1500 万条记录,join 键的基数大约为 1400 万。 查询的右侧源包含 1.5 亿条记录,join 键的基数为 1000 万。 查询在大约 28 秒之后结束,内存使用量峰值为 1.43 GB:

customer
| join
    orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

shuffle 策略与 join 运算符一起使用时,查询将在大约 4 秒之后结束,内存使用量峰值为 0.3 GB:

customer
| join
    hint.strategy = shuffle orders
on $left.c_custkey == $right.o_custkey
| summarize sum(c_acctbal) by c_nationkey

在另一个示例中,我们尝试对符合以下条件的较大数据集运行相同的查询:

  • join 的左侧源为 1.5 亿,键的基数为 1.48 亿。
  • join 的右侧源为 15 亿,键的基数大约为 1 亿。

仅使用 join 运算符的查询在 4 分钟之后达到限制并超时。 但是,将 shuffle 策略与 join 运算符一起使用时,查询将在大约 34 秒之后结束,内存使用量峰值为 1.23 GB。

以下示例演示了对具有两个群集节点的群集的改进,表中包含 6000 万条记录,join 键的基数为 200 万。 在不使用 hint.num_partitions 的情况下运行查询将只使用两个分区(作为群集节点数),以下查询将需要大约 1 分 10 秒的时间:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey   part
on $left.l_partkey == $right.p_partkey
| consume

如果将分区数设置为 10,则查询将在 23 秒之后结束:

lineitem
| summarize dcount(l_comment), dcount(l_shipdate) by l_partkey
| join
    hint.shufflekey = l_partkey  hint.num_partitions = 10    part
on $left.l_partkey == $right.p_partkey
| consume