time_window_rolling_avg_fl()
函数 time_window_rolling_avg_fl()
是一个用户定义的函数 (UDF),它计算所需值在固定持续时间段内的移动平均。
可以使用 series_fir() 计算常规时序(即具有固定间隔)在固定时段内的移动平均,因为可以将固定时段转换为宽度固定且系数相同的筛选器。 但是,针对非常规时序的计算更复杂,因为窗口中的实际样本数会有所不同。 仍可使用功能强大的扫描实现此目的。
仅在更改时(而非以固定间隔)发出指标值的用例需要此类滚动窗口计算。 例如,在 IoT 中,边缘设备仅在更改时将指标发送到云,从而优化通信带宽。
语法
T | invoke time_window_rolling_avg_fl(
t_col,
y_col,
key_col,
dt [,
direction ])
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
t_col | string |
✔️ | 包含记录时间戳的列的名称。 |
y_col | string |
✔️ | 包含记录指标值的列的名称。 |
key_col | string |
✔️ | 包含记录分区键的列的名称。 |
dt | timespan |
✔️ | 滚动窗口的持续时间。 |
direction | int |
聚合方向。 可能的值为 +1 或 -1。 滚动窗口分别从当前时间开始向前/向后设置。 默认值为 -1,因为向后滚动窗口是适用于流式处理方案的唯一可能方法。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
tbl_ex
| partition hint.strategy=shuffle by key
(
extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
| mv-expand timestamp to typeof(datetime), delta to typeof(long)
| sort by timestamp asc, delta desc
| scan declare (cum_sum:double=0.0, cum_count:long=0) with
(
step s: true => cum_count = s.cum_count + delta,
cum_sum = s.cum_sum + delta * value;
)
| extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
| where delta == -direction
| project timestamp, value, avg_value, key
)
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
tbl_ex
| partition hint.strategy=shuffle by key
(
extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
| mv-expand timestamp to typeof(datetime), delta to typeof(long)
| sort by timestamp asc, delta desc
| scan declare (cum_sum:double=0.0, cum_count:long=0) with
(
step s: true => cum_count = s.cum_count + delta,
cum_sum = s.cum_sum + delta * value;
)
| extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
| where delta == -direction
| project timestamp, value, avg_value, key
)
};
let tbl = datatable(ts:datetime, val:real, key:string) [
datetime(8:00), 1, 'Device1',
datetime(8:01), 2, 'Device1',
datetime(8:05), 3, 'Device1',
datetime(8:05), 10, 'Device2',
datetime(8:09), 20, 'Device2',
datetime(8:40), 4, 'Device1',
datetime(9:00), 5, 'Device1',
datetime(9:01), 6, 'Device1',
datetime(9:05), 30, 'Device2',
datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)
输出
timestamp | 值 | avg_value | key |
---|---|---|---|
2021-11-29 08:05:00.0000000 | 10 | 10 | Device2 |
2021-11-29 08:09:00.0000000 | 20 | 15 | Device2 |
2021-11-29 09:05:00.0000000 | 30 | 30 | Device2 |
2021-11-29 08:00:00.0000000 | 1 | 1 | Device1 |
2021-11-29 08:01:00.0000000 | 2 | 1.5 | Device1 |
2021-11-29 08:05:00.0000000 | 3 | 2 | Device1 |
2021-11-29 08:40:00.0000000 | 4 | 4 | Device1 |
2021-11-29 09:00:00.0000000 | 5 | 5 | Device1 |
2021-11-29 09:01:00.0000000 | 6 | 5.5 | Device1 |
2021-11-29 09:50:00.0000000 | 7 | 7 | Device1 |
8:05 的第一个值 (10) 仅包含一个值,该值处于向后 10 分钟的窗口,第二个值 (15) 是 8:09 和 8:05 的两个样本的平均值,以此类推。