基于某个高基数键(例如操作 ID 或会话 ID)在两个大型数据集之间进行联接,并通过对左右两侧 datetime
列之间的“时间距离”添加限制来进一步限制需要与每个左侧 ($left) 记录向匹配的右侧 ($right) 记录通常是有用的。
上述作不同于通常的联接作,因为对于匹配左右数据集之间的高基数键的 equi-join
部分,系统还可以应用距离函数,并使用它大大加快联接速度。
注意
距离函数的行为与等式不同(也就是说,当 dist(x,y) 和 dist(y,z) 都为 true 时,它并不遵循 dist(x,z) 也为 true 的规则。) 这有时称为“对角联接”。
用于标识没有时间窗口的事件序列的示例
为了在相对较小的时间范围内标识事件序列,此示例使用具有以下架构的表 T
:
-
SessionId
:一个string
类型的列,其中包含相关 ID。 -
EventType
:一个string
类型的列,用于标识记录的事件类型。 -
Timestamp
:一个datetime
类型的列,用于指示记录描述的事件是何时发生的。
SessionId(会话ID) | 事件类型 | 时间戳 |
---|---|---|
0 | 一个 | 2017-10-01T00:00:00Z |
0 | B | 2017-10-01T00:01:00Z |
1 | B | 2017-10-01T00:02:00Z |
1 | 一个 | 2017-10-01T00:03:00Z |
3 | 一个 | 2017-10-01T00:04:00Z |
3 | B | 2017-10-01T00:10:00Z |
以下查询创建数据集,然后标识事件类型 A
后跟 B
时间范围内事件类型 1min
的所有会话 ID。
let T = datatable(SessionId:string, EventType:string, Timestamp:datetime)
[
'0', 'A', datetime(2017-10-01 00:00:00),
'0', 'B', datetime(2017-10-01 00:01:00),
'1', 'B', datetime(2017-10-01 00:02:00),
'1', 'A', datetime(2017-10-01 00:03:00),
'3', 'A', datetime(2017-10-01 00:04:00),
'3', 'B', datetime(2017-10-01 00:10:00),
];
T
| where EventType == 'A'
| project SessionId, Start=Timestamp
| join kind=inner
(
T
| where EventType == 'B'
| project SessionId, End=Timestamp
) on SessionId
| where (End - Start) between (0min .. 1min)
| project SessionId, Start, End
输出
SessionId(会话ID) | 开始 | 结束 |
---|---|---|
0 | 2017-10-01 00:00:00.0000000 | 2017-10-01 00:01:00.0000000 |
使用时间窗口优化的示例
若要优化此查询,可以重写它以考虑时间范围。 THe 时间范围表示为联接键。 重写查询,将 datetime
值“离散化”为大小为时间范围大小一半的 Bucket。 使用 equi-join
比较存储桶 ID。
查询查找同一会话(SessionId)中的事件对,其中“A”事件后跟 1 分钟内的“B”事件。 它投影会话 ID、“A”事件的开始时间和“B”事件的结束时间。
let T = datatable(SessionId:string, EventType:string, Timestamp:datetime)
[
'0', 'A', datetime(2017-10-01 00:00:00),
'0', 'B', datetime(2017-10-01 00:01:00),
'1', 'B', datetime(2017-10-01 00:02:00),
'1', 'A', datetime(2017-10-01 00:03:00),
'3', 'A', datetime(2017-10-01 00:04:00),
'3', 'B', datetime(2017-10-01 00:10:00),
];
let lookupWindow = 1min;
let lookupBin = lookupWindow / 2.0;
T
| where EventType == 'A'
| project SessionId, Start=Timestamp, TimeKey = bin(Timestamp, lookupBin)
| join kind=inner
(
T
| where EventType == 'B'
| project SessionId, End=Timestamp,
TimeKey = range(bin(Timestamp-lookupWindow, lookupBin),
bin(Timestamp, lookupBin),
lookupBin)
| mv-expand TimeKey to typeof(datetime)
) on SessionId, TimeKey
| where (End - Start) between (0min .. lookupWindow)
| project SessionId, Start, End
输出
SessionId(会话ID) | 开始 | 结束 |
---|---|---|
0 | 2017-10-01 00:00:00.0000000 | 2017-10-01 00:01:00.0000000 |
500 万数据查询
下一个查询模拟 500 万条记录和大约 1M 会话 ID 的广泛数据集,并使用时间窗口技术运行查询。
let T = range x from 1 to 5000000 step 1
| extend SessionId = rand(1000000), EventType = rand(3), Time=datetime(2017-01-01)+(x * 10ms)
| extend EventType = case(EventType < 1, "A",
EventType < 2, "B",
"C");
let lookupWindow = 1min;
let lookupBin = lookupWindow / 2.0;
T
| where EventType == 'A'
| project SessionId, Start=Time, TimeKey = bin(Time, lookupBin)
| join kind=inner
(
T
| where EventType == 'B'
| project SessionId, End=Time,
TimeKey = range(bin(Time-lookupWindow, lookupBin),
bin(Time, lookupBin),
lookupBin)
| mv-expand TimeKey to typeof(datetime)
) on SessionId, TimeKey
| where (End - Start) between (0min .. lookupWindow)
| project SessionId, Start, End
| count
输出
计数 |
---|
3344 |