time_weighted_avg_fl()
函数 time_weighted_avg_fl()
是一个用户定义的函数 (UDF),它在输入时间箱中计算给定时间范围内指标的时间加权平均值。此函数类似于 summarize 运算符。 函数按时间箱聚合指标,但它不计算每个箱中指标值的简单 avg(),而是按每个值的持续时间进行加权。 持续时间定义为从当前值的时间戳到下一个值的时间戳。
计算时间加权平均值有两种方法。 此函数将当前采样值向前填充,直至下一个采样值。 或者,time_weighted_avg2_fl() 在连续样本之间线性插值的指标值。
语法
T | invoke time_weighted_avg_fl(
t_col,
y_col,
key_col,
stime,
etime,
dt)
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
t_col | string |
✔️ | 包含记录时间戳的列的名称。 |
y_col | string |
✔️ | 包含记录指标值的列的名称。 |
key_col | string |
✔️ | 包含记录分区键的列的名称。 |
stime | datetime |
✔️ | 聚合窗口的开始时间。 |
etime | datetime |
✔️ | 聚合窗口的结束时间。 |
dt | timespan |
✔️ | 聚合时间箱。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let time_weighted_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, stime:datetime, etime:datetime, dt:timespan)
{
let tbl_ex = tbl | extend _ts = column_ifexists(t_col, datetime(null)), _val = column_ifexists(y_col, 0.0), _key = column_ifexists(key_col, '');
let _etime = etime + dt;
let gridTimes = range _ts from stime to _etime step dt | extend _val=real(null), dummy=1;
let keys = materialize(tbl_ex | summarize by _key | extend dummy=1);
gridTimes
| join kind=fullouter keys on dummy
| project-away dummy, dummy1
| union tbl_ex
| where _ts between (stime.._etime)
| partition hint.strategy=native by _key (
order by _ts asc, _val nulls last
| scan declare(f_value:real=0.0) with (step s: true => f_value = iff(isnull(_val), s.f_value, _val);) // fill forward null values
| extend diff_t=(next(_ts)-_ts)/1m
)
| where isnotnull(diff_t)
| summarize tw_sum=sum(f_value*diff_t), t_sum =sum(diff_t) by bin_at(_ts, dt, stime), _key
| where t_sum > 0 and _ts <= etime
| extend tw_avg = tw_sum/t_sum
| project-away tw_sum, t_sum
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let time_weighted_avg_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, stime:datetime, etime:datetime, dt:timespan)
{
let tbl_ex = tbl | extend _ts = column_ifexists(t_col, datetime(null)), _val = column_ifexists(y_col, 0.0), _key = column_ifexists(key_col, '');
let _etime = etime + dt;
let gridTimes = range _ts from stime to _etime step dt | extend _val=real(null), dummy=1;
let keys = materialize(tbl_ex | summarize by _key | extend dummy=1);
gridTimes
| join kind=fullouter keys on dummy
| project-away dummy, dummy1
| union tbl_ex
| where _ts between (stime.._etime)
| partition hint.strategy=native by _key (
order by _ts asc, _val nulls last
| scan declare(f_value:real=0.0) with (step s: true => f_value = iff(isnull(_val), s.f_value, _val);) // fill forward null values
| extend diff_t=(next(_ts)-_ts)/1m
)
| where isnotnull(diff_t)
| summarize tw_sum=sum(f_value*diff_t), t_sum =sum(diff_t) by bin_at(_ts, dt, stime), _key
| where t_sum > 0 and _ts <= etime
| extend tw_avg = tw_sum/t_sum
| project-away tw_sum, t_sum
};
let tbl = datatable(ts:datetime, val:real, key:string) [
datetime(2021-04-26 00:00), 100, 'Device1',
datetime(2021-04-26 00:45), 300, 'Device1',
datetime(2021-04-26 01:15), 200, 'Device1',
datetime(2021-04-26 00:00), 600, 'Device2',
datetime(2021-04-26 00:30), 400, 'Device2',
datetime(2021-04-26 01:30), 500, 'Device2',
datetime(2021-04-26 01:45), 300, 'Device2'
];
let minmax=materialize(tbl | summarize mint=min(ts), maxt=max(ts));
let stime=toscalar(minmax | project mint);
let etime=toscalar(minmax | project maxt);
let dt = 1h;
tbl
| invoke time_weighted_avg_fl('ts', 'val', 'key', stime, etime, dt)
| project-rename val = tw_avg
| order by _key asc, _ts asc
输出
_ts | _key | val |
---|---|---|
2021-04-26 00:00:00.0000000 | Device1 | 150 |
2021-04-26 01:00:00.0000000 | Device1 | 225 |
2021-04-26 00:00:00.0000000 | Device2 | 500 |
2021-04-26 01:00:00.0000000 | Device2 | 400 |
Device1 的第一个值为 (45m*100 + 15m*300)/60m = 150,第二个值为 (15m*300 + 45m*200)/60m = 225。
Device2 的第一个值为 (30m*600 + 30m*400)/60m = 500,第二个值为 (30m*400 + 15m*500 + 15m*300)/60m = 400。