series_rolling_fl()
适用于:✅Azure 数据资源管理器
函数 series_rolling_fl()
是一个用户定义的函数 (UDF),它对一个序列应用滚动聚合。 它采用包含多个序列(动态数值数组)的表,并对每个序列应用滚动聚合函数。
先决条件
- 必须在群集上启用 Python 插件。 这是函数中使用的内联 Python 所必需的。
语法
T | invoke series_rolling_fl(
y_series,
y_rolling_series,
n,
aggr,
aggr_params,
center)
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
y_series | string |
✔️ | 列的名称,其中包含要拟合的序列。 |
y_rolling_series | string |
✔️ | 存储滚动聚合序列的列的名称。 |
n | int |
✔️ | 滚动窗口的宽度。 |
aggr | string |
✔️ | 要使用的聚合函数的名称。 请参阅聚合函数。 |
aggr_params | string |
聚合函数的可选参数。 | |
center | bool |
指示滚动窗口是在当前点前后对称应用还是从当前点向后应用。 默认情况下,对于流数据的计算,center 为 false 。 |
聚合函数
此函数支持从序列计算标量的 numpy 或 scipy.stats 中的任何聚合函数。 以下列表并不详尽:
sum
mean
min
max
ptp (max-min)
percentile
median
std
var
gmean
(几何平均值)hmean
(调和平均值)mode
(最常见的值)moment
(第 n 个时刻)tmean
(剪裁平均值)tmin
tmax
tstd
iqr
(四分位差)
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
let code = ```if 1:
y_series = kargs["y_series"]
y_rolling_series = kargs["y_rolling_series"]
n = kargs["n"]
aggr = kargs["aggr"]
aggr_params = kargs["aggr_params"]
center = kargs["center"]
result = df
in_s = df[y_series]
func = getattr(np, aggr, None)
if not func:
import scipy.stats
func = getattr(scipy.stats, aggr)
if func:
result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
```;
tbl
| evaluate python(typeof(*), code, kwargs)
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
计算 9 个元素的滚动中值
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
let code = ```if 1:
y_series = kargs["y_series"]
y_rolling_series = kargs["y_rolling_series"]
n = kargs["n"]
aggr = kargs["aggr"]
aggr_params = kargs["aggr_params"]
center = kargs["center"]
result = df
in_s = df[y_series]
func = getattr(np, aggr, None)
if not func:
import scipy.stats
func = getattr(scipy.stats, aggr)
if func:
result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
```;
tbl
| evaluate python(typeof(*), code, kwargs)
};
//
// Calculate rolling median of 9 elements
//
demo_make_series1
| make-series num=count() on TimeStamp step 1h by OsVer
| extend rolling_med = dynamic(null)
| invoke series_rolling_fl('num', 'rolling_med', 9, 'median')
| render timechart
输出
计算 15 个元素的滚动最小值、最大值和第 75 个百分位
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
let code = ```if 1:
y_series = kargs["y_series"]
y_rolling_series = kargs["y_rolling_series"]
n = kargs["n"]
aggr = kargs["aggr"]
aggr_params = kargs["aggr_params"]
center = kargs["center"]
result = df
in_s = df[y_series]
func = getattr(np, aggr, None)
if not func:
import scipy.stats
func = getattr(scipy.stats, aggr)
if func:
result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
```;
tbl
| evaluate python(typeof(*), code, kwargs)
};
//
// Calculate rolling min, max & 75th percentile of 15 elements
//
demo_make_series1
| make-series num=count() on TimeStamp step 1h by OsVer
| extend rolling_min = dynamic(null), rolling_max = dynamic(null), rolling_pct = dynamic(null)
| invoke series_rolling_fl('num', 'rolling_min', 15, 'min', dynamic([null]))
| invoke series_rolling_fl('num', 'rolling_max', 15, 'max', dynamic([null]))
| invoke series_rolling_fl('num', 'rolling_pct', 15, 'percentile', dynamic([75]))
| render timechart
输出
计算滚动剪裁平均值
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
let code = ```if 1:
y_series = kargs["y_series"]
y_rolling_series = kargs["y_rolling_series"]
n = kargs["n"]
aggr = kargs["aggr"]
aggr_params = kargs["aggr_params"]
center = kargs["center"]
result = df
in_s = df[y_series]
func = getattr(np, aggr, None)
if not func:
import scipy.stats
func = getattr(scipy.stats, aggr)
if func:
result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
```;
tbl
| evaluate python(typeof(*), code, kwargs)
};
range x from 1 to 100 step 1
| extend y=iff(x % 13 == 0, 2.0, iff(x % 23 == 0, -2.0, rand()))
| summarize x=make_list(x), y=make_list(y)
| extend yr = dynamic(null)
| invoke series_rolling_fl('y', 'yr', 7, 'tmean', pack_array(pack_array(-2, 2), pack_array(false, false))) // trimmed mean: ignoring values outside [-2,2] inclusive
| render linechart
输出