时间范围联接Time window join

我们经常需要基于某个高基数键(例如操作 ID 或会话 ID)在两个大型数据集之间进行联接,并通过对左右两侧的日期/时间列之间的“时间距离”添加限制来进一步限制需要与每个左侧 ($left) 记录进行匹配的右侧 ($right) 记录。It's often useful to join between two large data sets on some high-cardinality key, such as an operation ID or a session ID, and further limit the right-hand-side ($right) records that need to match up with each left-hand-side ($left) record by adding a restriction on the "time-distance" between datetime columns on the left and on the right.

函数在联接中十分有用,例如在以下场景中:The function is useful in a join, like in the following scenario:

  • 基于某个高基数键(例如操作 ID 或会话 ID)在两个大型数据集之间进行联接。Join between two large data sets according to some high-cardinality key, such as an operation ID or a session ID.
  • 通过对左右两侧的日期/时间列之间的“时间距离”添加限制,限制需要与每个左侧 ($left) 记录进行匹配的右侧 ($right) 记录。Limit the right-hand-side ($right) records that need to match up with each left-hand-side ($left) record, by adding a restriction on the "time-distance" between datetime columns on the left and on the right.

上述操作不同于一般的 Kusto 联接操作,因为对于在左右两侧数据集之间匹配高基数键的 equi-join 部分,系统还可以应用距离函数并使用它大大加快联接速度。The above operation differs from the usual Kusto join operation, since for the equi-join part of matching the high-cardinality key between the left and right data sets, the system can also apply a distance function and use it to considerably speed up the join.

备注

距离函数的行为与等式不同(也就是说,当 dist(x,y) 和 dist(y,z) 都为 true 时,它并不遵循 dist(x,z) 也为 true 的规则。)在内部,我们有时将其称为“对角联接”。A distance function doesn't behave like equality (that is, when both dist(x,y) and dist(y,z) are true it doesn't follow that dist(x,z) is also true.) Internally, we sometimes refer to this as "diagonal join".

例如,如果你希望标识相对较小的时间范围内的事件序列,假设你有一个具有以下架构的表 TFor example, if you want to identify event sequences within a relatively small time window, assume that you have a table T with the following schema:

  • SessionId:一个 string 类型的列,其中包含相关 ID。SessionId: A column of type string with correlation IDs.
  • EventType:一个 string 类型的列,用于标识记录的事件类型。EventType: A column of type string that identifies the event type of the record.
  • Timestamp:一个 datetime 类型的列,用于指示记录描述的事件是何时发生的。Timestamp: A column of type datetime indicates when the event described by the record happened.
let T = datatable(SessionId:string, EventType:string, Timestamp:datetime)
[
    '0', 'A', datetime(2017-10-01 00:00:00),
    '0', 'B', datetime(2017-10-01 00:01:00),
    '1', 'B', datetime(2017-10-01 00:02:00),
    '1', 'A', datetime(2017-10-01 00:03:00),
    '3', 'A', datetime(2017-10-01 00:04:00),
    '3', 'B', datetime(2017-10-01 00:10:00),
];
T
SessionIdSessionId EventTypeEventType 时间戳Timestamp
00 AA 2017-10-01 00:00:00.00000002017-10-01 00:00:00.0000000
00 BB 2017-10-01 00:01:00.00000002017-10-01 00:01:00.0000000
11 BB 2017-10-01 00:02:00.00000002017-10-01 00:02:00.0000000
11 AA 2017-10-01 00:03:00.00000002017-10-01 00:03:00.0000000
33 AA 2017-10-01 00:04:00.00000002017-10-01 00:04:00.0000000
33 BB 2017-10-01 00:10:00.00000002017-10-01 00:10:00.0000000

问题陈述Problem statement

我们的查询应当回答以下问题:Our query should answer the following question:

查找 1min 时间范围内在事件类型 A 后跟有事件类型 B 的所有会话 ID。Find all the session IDs in which event type A was followed by an event type B within a 1min time window.

备注

在上面的示例数据中,这样的唯一会话 ID 是 0In the sample data above, the only such session ID is 0.

从语义上讲,以下查询可以回答此问题,尽管效率低下。Semantically, the following query answers this question, albeit inefficiently.

T 
| where EventType == 'A'
| project SessionId, Start=Timestamp
| join kind=inner
    (
    T 
    | where EventType == 'B'
    | project SessionId, End=Timestamp
    ) on SessionId
| where (End - Start) between (0min .. 1min)
| project SessionId, Start, End 

SessionIdSessionId 开始Start 结束End
00 2017-10-01 00:00:00.00000002017-10-01 00:00:00.0000000 2017-10-01 00:01:00.00000002017-10-01 00:01:00.0000000

为了优化此查询,我们可以进行重写,将时间范围表示为联接键,如下所述。To optimize this query, we can rewrite it as described below so that the time window is expressed as a join key.

重写查询以考虑时间范围Rewrite the query to account for the time window

重写查询,将 datetime 值“离散化”为大小为时间范围大小一半的 Bucket。Rewrite the query so that the datetime values are "discretized" into buckets whose size is half the size of the time window. 使用 Kusto 的 equi-join 来比较这些 Bucket ID。Use Kusto's equi-join to compare those bucket IDs.

let lookupWindow = 1min;
let lookupBin = lookupWindow / 2.0; // lookup bin = equal to 1/2 of the lookup window
T 
| where EventType == 'A'
| project SessionId, Start=Timestamp,
          // TimeKey on the left side of the join is mapped to a discrete time axis for the join purpose
          TimeKey = bin(Timestamp, lookupBin)
| join kind=inner
    (
    T 
    | where EventType == 'B'
    | project SessionId, End=Timestamp,
              // TimeKey on the right side of the join - emulates event 'B' appearing several times
              // as if it was 'replicated'
              TimeKey = range(bin(Timestamp-lookupWindow, lookupBin),
                              bin(Timestamp, lookupBin),
                              lookupBin)
    // 'mv-expand' translates the TimeKey array range into a column
    | mv-expand TimeKey to typeof(datetime)
    ) on SessionId, TimeKey 
| where (End - Start) between (0min .. lookupWindow)
| project SessionId, Start, End 
SessionIdSessionId 开始Start 结束End
00 2017-10-01 00:00:00.00000002017-10-01 00:00:00.0000000 2017-10-01 00:01:00.00000002017-10-01 00:01:00.0000000

可运行的查询引用(包含内联表)Runnable query reference (with table inlined)

let T = datatable(SessionId:string, EventType:string, Timestamp:datetime)
[
    '0', 'A', datetime(2017-10-01 00:00:00),
    '0', 'B', datetime(2017-10-01 00:01:00),
    '1', 'B', datetime(2017-10-01 00:02:00),
    '1', 'A', datetime(2017-10-01 00:03:00),
    '3', 'A', datetime(2017-10-01 00:04:00),
    '3', 'B', datetime(2017-10-01 00:10:00),
];
let lookupWindow = 1min;
let lookupBin = lookupWindow / 2.0;
T 
| where EventType == 'A'
| project SessionId, Start=Timestamp, TimeKey = bin(Timestamp, lookupBin)
| join kind=inner
    (
    T 
    | where EventType == 'B'
    | project SessionId, End=Timestamp,
              TimeKey = range(bin(Timestamp-lookupWindow, lookupBin),
                              bin(Timestamp, lookupBin),
                              lookupBin)
    | mv-expand TimeKey to typeof(datetime)
    ) on SessionId, TimeKey 
| where (End - Start) between (0min .. lookupWindow)
| project SessionId, Start, End 
SessionIdSessionId 开始Start 结束End
00 2017-10-01 00:00:00.00000002017-10-01 00:00:00.0000000 2017-10-01 00:01:00.00000002017-10-01 00:01:00.0000000

50M 数据查询50M data query

下一个查询模拟包含 50M 记录和大约 10M ID 的一个数据集,运行该查询的方法如上所述。The next query emulates a data set of 50M records and ~10M IDs and runs the query with the technique described above.

let T = range x from 1 to 50000000 step 1
| extend SessionId = rand(10000000), EventType = rand(3), Time=datetime(2017-01-01)+(x * 10ms)
| extend EventType = case(EventType <= 1, "A",
                          EventType <= 2, "B",
                          "C");
let lookupWindow = 1min;
let lookupBin = lookupWindow / 2.0;
T 
| where EventType == 'A'
| project SessionId, Start=Time, TimeKey = bin(Time, lookupBin)
| join kind=inner
    (
    T 
    | where EventType == 'B'
    | project SessionId, End=Time, 
              TimeKey = range(bin(Time-lookupWindow, lookupBin), 
                              bin(Time, lookupBin),
                              lookupBin)
    | mv-expand TimeKey to typeof(datetime)
    ) on SessionId, TimeKey 
| where (End - Start) between (0min .. lookupWindow)
| project SessionId, Start, End 
| count 
计数Count
12761276