time_weighted_avg2_fl()
函数 time_weighted_avg2_fl()
是一个用户定义的函数 (UDF),它在输入时间箱中计算给定时间范围内指标的时间加权平均值。此函数类似于 summarize 运算符。 函数按时间箱聚合指标,但它不计算每个箱中指标值的简单 avg(),而是按每个值的持续时间进行加权。 持续时间定义为从当前值的时间戳到下一个值的时间戳。
计算时间加权平均值有两种方法。 此函数对连续样本之间的度量值进行线性插值。 或者,time_weighted_avg_fl() 将当前采样值向前填充到下一个采样值。
语法
T | invoke time_weighted_avg2_fl(
t_col,
y_col,
key_col,
stime,
etime,
dt)
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
t_col | string |
✔️ | 包含记录时间戳的列的名称。 |
y_col | string |
✔️ | 包含记录指标值的列的名称。 |
key_col | string |
✔️ | 包含记录分区键的列的名称。 |
stime | datetime |
✔️ | 聚合窗口的开始时间。 |
etime | datetime |
✔️ | 聚合窗口的结束时间。 |
dt | timespan |
✔️ | 聚合时间箱。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let time_weighted_avg2_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, stime:datetime, etime:datetime, dt:timespan)
{
let tbl_ex = tbl | extend _ts = column_ifexists(t_col, datetime(null)), _val = column_ifexists(y_col, 0.0), _key = column_ifexists(key_col, '');
let _etime = etime + dt;
let gridTimes = range _ts from stime to _etime step dt | extend _val=real(null), dummy=1;
let keys = materialize(tbl_ex | summarize by _key | extend dummy=1);
gridTimes
| join kind=fullouter keys on dummy
| project-away dummy, dummy1
| union tbl_ex
| where _ts between (stime.._etime)
| partition hint.strategy=native by _key (
order by _ts desc, _val nulls last
| scan declare(val1:real=0.0, t1:datetime) with ( // fill backward null values
step s: true => val1=iff(isnull(_val), s.val1, _val), t1=iff(isnull(_val), s.t1, _ts);)
| extend dt1=(t1-_ts)/1m
| order by _ts asc, _val nulls last
| scan declare(val0:real=0.0, t0:datetime) with ( // fill forward null values
step s: true => val0=iff(isnull(_val), s.val0, _val), t0=iff(isnull(_val), s.t0, _ts);)
| extend dt0=(_ts-t0)/1m
| extend _twa_val=iff(dt0+dt1 == 0, _val, ((val0*dt1)+(val1*dt0))/(dt0+dt1))
| scan with ( // fill forward null twa values
step s: true => _twa_val=iff(isnull(_twa_val), s._twa_val, _twa_val);)
| extend diff_t=(next(_ts)-_ts)/1m
)
| where isnotnull(diff_t)
| order by _key asc, _ts asc
| extend next_twa_val=iff(_key == next(_key), next(_twa_val), _twa_val)
| summarize tw_sum=sum((_twa_val+next_twa_val)*diff_t/2.0), t_sum =sum(diff_t) by bin_at(_ts, dt, stime), _key
| where t_sum > 0 and _ts <= etime
| extend tw_avg = tw_sum/t_sum
| project-away tw_sum, t_sum
| order by _key asc, _ts asc
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let time_weighted_avg2_fl=(tbl:(*), t_col:string, y_col:string, key_col:string, stime:datetime, etime:datetime, dt:timespan)
{
let tbl_ex = tbl | extend _ts = column_ifexists(t_col, datetime(null)), _val = column_ifexists(y_col, 0.0), _key = column_ifexists(key_col, '');
let _etime = etime + dt;
let gridTimes = range _ts from stime to _etime step dt | extend _val=real(null), dummy=1;
let keys = materialize(tbl_ex | summarize by _key | extend dummy=1);
gridTimes
| join kind=fullouter keys on dummy
| project-away dummy, dummy1
| union tbl_ex
| where _ts between (stime.._etime)
| partition hint.strategy=native by _key (
order by _ts desc, _val nulls last
| scan declare(val1:real=0.0, t1:datetime) with ( // fill backward null values
step s: true => val1=iff(isnull(_val), s.val1, _val), t1=iff(isnull(_val), s.t1, _ts);)
| extend dt1=(t1-_ts)/1m
| order by _ts asc, _val nulls last
| scan declare(val0:real=0.0, t0:datetime) with ( // fill forward null values
step s: true => val0=iff(isnull(_val), s.val0, _val), t0=iff(isnull(_val), s.t0, _ts);)
| extend dt0=(_ts-t0)/1m
| extend _twa_val=iff(dt0+dt1 == 0, _val, ((val0*dt1)+(val1*dt0))/(dt0+dt1))
| scan with ( // fill forward null twa values
step s: true => _twa_val=iff(isnull(_twa_val), s._twa_val, _twa_val);)
| extend diff_t=(next(_ts)-_ts)/1m
)
| where isnotnull(diff_t)
| order by _key asc, _ts asc
| extend next_twa_val=iff(_key == next(_key), next(_twa_val), _twa_val)
| summarize tw_sum=sum((_twa_val+next_twa_val)*diff_t/2.0), t_sum =sum(diff_t) by bin_at(_ts, dt, stime), _key
| where t_sum > 0 and _ts <= etime
| extend tw_avg = tw_sum/t_sum
| project-away tw_sum, t_sum
| order by _key asc, _ts asc
};
let tbl = datatable(ts:datetime, val:real, key:string) [
datetime(2021-04-26 00:00), 100, 'Device1',
datetime(2021-04-26 00:45), 300, 'Device1',
datetime(2021-04-26 01:15), 200, 'Device1',
datetime(2021-04-26 00:00), 600, 'Device2',
datetime(2021-04-26 00:30), 400, 'Device2',
datetime(2021-04-26 01:30), 500, 'Device2',
datetime(2021-04-26 01:45), 300, 'Device2'
];
let minmax=materialize(tbl | summarize mint=min(ts), maxt=max(ts));
let stime=toscalar(minmax | project mint);
let etime=toscalar(minmax | project maxt);
let dt = 1h;
tbl
| invoke time_weighted_avg2_fl('ts', 'val', 'key', stime, etime, dt)
| project-rename val = tw_avg
| order by _key asc, _ts asc
输出
_ts | _key | val |
---|---|---|
2021-04-26 00:00:00.0000000 | Device1 | 218.75 |
2021-04-26 01:00:00.0000000 | Device1 | 206.25 |
2021-04-26 00:00:00.0000000 | Device2 | 462.5 |
2021-04-26 01:00:00.0000000 | Device2 | 412.5 |
Device1 的第一个值为 (45m*(100+300)/2 + 15m*(300+250)/2)/60m = 218.75,第二个值为 (15m*(250+200)/2 + 45m*200)/60m = 206.25。
Device2 的第一个值为 (30m*(600+400)/2 + 30m*(400+450)/2)/60m = 462.5,第二个值为 (30m*(450+500)/2 + 15m*(500+300)/2 + 15m*300)/60m = 412.5。