time_window_rolling_avg_fl()

适用于:✅Azure 数据资源管理器Azure MonitorMicrosoft Sentinel

函数 time_window_rolling_avg_fl() 是一个用户定义的函数 (UDF),它计算所需值在固定持续时间段内的移动平均。

可以使用 series_fir() 计算常规时序(即具有固定间隔)在固定时段内的移动平均,因为可以将固定时段转换为宽度固定且系数相同的筛选器。 但是,针对非常规时序的计算更复杂,因为窗口中的实际样本数会有所不同。 仍可使用功能强大的扫描实现此目的。

仅在更改时(而非以固定间隔)发出指标值的用例需要此类滚动窗口计算。 例如,在 IoT 中,边缘设备仅在更改时将指标发送到云,从而优化通信带宽。

语法

T | invoke time_window_rolling_avg_fl(t_col, y_col, key_col, dt [, direction ])

详细了解语法约定

参数

客户 类型​​ 必需 说明
t_col string ✔️ 包含记录时间戳的列的名称。
y_col string ✔️ 包含记录指标值的列的名称。
key_col string ✔️ 包含记录分区键的列的名称。
dt timespan ✔️ 滚动窗口的持续时间。
direction int 聚合方向。 可能的值为 +1 或 -1。 滚动窗口分别从当前时间开始向前/向后设置。 默认值为 -1,因为向后滚动窗口是适用于流式处理方案的唯一可能方法。

函数定义

可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句不能独立运行。 它必须后跟一个表格表达式语句。 若要运行 time_window_rolling_avg_fl() 的工作示例,请参阅示例

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
// Write your query to use the function here.

示例

以下示例使用 invoke 运算符运行函数。

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let time_window_rolling_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, dt:timespan, direction:int=int(-1))
{
    let tbl_ex = tbl | extend timestamp = column_ifexists(t_col, datetime(null)), value = column_ifexists(y_col, 0.0), key = column_ifexists(key_col, '');
    tbl_ex 
    | partition hint.strategy=shuffle by key 
    (
        extend timestamp=pack_array(timestamp, timestamp - direction*dt), delta = pack_array(-direction, direction)
        | mv-expand timestamp to typeof(datetime), delta to typeof(long)
        | sort by timestamp asc, delta desc    
        | scan declare (cum_sum:double=0.0, cum_count:long=0) with 
        (
            step s: true => cum_count = s.cum_count + delta, 
                            cum_sum = s.cum_sum + delta * value; 
        )
        | extend avg_value = iff(direction == 1, prev(cum_sum)/prev(cum_count), cum_sum/cum_count)
        | where delta == -direction 
        | project timestamp, value, avg_value, key
    )
};
let tbl = datatable(ts:datetime,  val:real, key:string) [
    datetime(8:00), 1, 'Device1',
    datetime(8:01), 2, 'Device1',
    datetime(8:05), 3, 'Device1',
    datetime(8:05), 10, 'Device2',
    datetime(8:09), 20, 'Device2',
    datetime(8:40), 4, 'Device1',
    datetime(9:00), 5, 'Device1',
    datetime(9:01), 6, 'Device1',
    datetime(9:05), 30, 'Device2',
    datetime(9:50), 7, 'Device1'
];
tbl
| invoke time_window_rolling_avg_fl('ts', 'val', 'key', 10m)

输出

timestamp avg_value key
2021-11-29 08:05:00.0000000 10 10 Device2
2021-11-29 08:09:00.0000000 20 15 Device2
2021-11-29 09:05:00.0000000 30 30 Device2
2021-11-29 08:00:00.0000000 1 1 Device1
2021-11-29 08:01:00.0000000 2 1.5 Device1
2021-11-29 08:05:00.0000000 3 2 Device1
2021-11-29 08:40:00.0000000 4 4 Device1
2021-11-29 09:00:00.0000000 5 5 Device1
2021-11-29 09:01:00.0000000 6 5.5 Device1
2021-11-29 09:50:00.0000000 7 7 Device1

8:05 的第一个值 (10) 仅包含一个值,该值处于向后 10 分钟的窗口,第二个值 (15) 是 8:09 和 8:05 的两个样本的平均值,以此类推。