series_rolling_fl()

适用于:✅Azure 数据资源管理器

函数 series_rolling_fl() 是一个用户定义的函数 (UDF),它对一个序列应用滚动聚合。 它采用包含多个序列(动态数值数组)的表,并对每个序列应用滚动聚合函数。

先决条件

  • 必须在群集上启用 Python 插件。 这是函数中使用的内联 Python 所必需的。

语法

T | invoke series_rolling_fl(y_series, y_rolling_series, n, aggr, aggr_params, center)

详细了解语法约定

参数

客户 类型​​ 必需 说明
y_series string ✔️ 列的名称,其中包含要拟合的序列。
y_rolling_series string ✔️ 存储滚动聚合序列的列的名称。
n int ✔️ 滚动窗口的宽度。
aggr string ✔️ 要使用的聚合函数的名称。 请参阅聚合函数
aggr_params string 聚合函数的可选参数。
center bool 指示滚动窗口是在当前点前后对称应用还是从当前点向后应用。 默认情况下,对于流数据的计算,center 为 false

聚合函数

此函数支持从序列计算标量的 numpyscipy.stats 中的任何聚合函数。 以下列表并不详尽:

函数定义

可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句不能独立运行。 它必须后跟一个表格表达式语句。 若要运行 series_rolling_fl() 的工作示例,请参阅示例

let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
    let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
    let code = ```if 1:
        y_series = kargs["y_series"]
        y_rolling_series = kargs["y_rolling_series"]
        n = kargs["n"]
        aggr = kargs["aggr"]
        aggr_params = kargs["aggr_params"]
        center = kargs["center"]
        result = df
        in_s = df[y_series]
        func = getattr(np, aggr, None)
        if not func:
            import scipy.stats
            func = getattr(scipy.stats, aggr)
        if func:
            result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
    ```;
    tbl
    | evaluate python(typeof(*), code, kwargs)
};
// Write your query to use the function here.

示例

以下示例使用 invoke 运算符运行函数。

计算 9 个元素的滚动中值

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
    let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
    let code = ```if 1:
        y_series = kargs["y_series"]
        y_rolling_series = kargs["y_rolling_series"]
        n = kargs["n"]
        aggr = kargs["aggr"]
        aggr_params = kargs["aggr_params"]
        center = kargs["center"]
        result = df
        in_s = df[y_series]
        func = getattr(np, aggr, None)
        if not func:
            import scipy.stats
            func = getattr(scipy.stats, aggr)
        if func:
            result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
    ```;
    tbl
    | evaluate python(typeof(*), code, kwargs)
};
//
//  Calculate rolling median of 9 elements
//
demo_make_series1
| make-series num=count() on TimeStamp step 1h by OsVer
| extend rolling_med = dynamic(null)
| invoke series_rolling_fl('num', 'rolling_med', 9, 'median')
| render timechart

输出

描述了 9 个元素的滚动中值的图形。

计算 15 个元素的滚动最小值、最大值和第 75 个百分位

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
    let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
    let code = ```if 1:
        y_series = kargs["y_series"]
        y_rolling_series = kargs["y_rolling_series"]
        n = kargs["n"]
        aggr = kargs["aggr"]
        aggr_params = kargs["aggr_params"]
        center = kargs["center"]
        result = df
        in_s = df[y_series]
        func = getattr(np, aggr, None)
        if not func:
            import scipy.stats
            func = getattr(scipy.stats, aggr)
        if func:
            result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
    ```;
    tbl
    | evaluate python(typeof(*), code, kwargs)
};
//
//  Calculate rolling min, max & 75th percentile of 15 elements
//
demo_make_series1
| make-series num=count() on TimeStamp step 1h by OsVer
| extend rolling_min = dynamic(null), rolling_max = dynamic(null), rolling_pct = dynamic(null)
| invoke series_rolling_fl('num', 'rolling_min', 15, 'min', dynamic([null]))
| invoke series_rolling_fl('num', 'rolling_max', 15, 'max', dynamic([null]))
| invoke series_rolling_fl('num', 'rolling_pct', 15, 'percentile', dynamic([75]))
| render timechart

输出

描述了 15 个元素的滚动最小值、最大值和第 75 个百分位的图形。

计算滚动剪裁平均值

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let series_rolling_fl = (tbl:(*), y_series:string, y_rolling_series:string, n:int, aggr:string, aggr_params:dynamic=dynamic([null]), center:bool=true)
{
    let kwargs = bag_pack('y_series', y_series, 'y_rolling_series', y_rolling_series, 'n', n, 'aggr', aggr, 'aggr_params', aggr_params, 'center', center);
    let code = ```if 1:
        y_series = kargs["y_series"]
        y_rolling_series = kargs["y_rolling_series"]
        n = kargs["n"]
        aggr = kargs["aggr"]
        aggr_params = kargs["aggr_params"]
        center = kargs["center"]
        result = df
        in_s = df[y_series]
        func = getattr(np, aggr, None)
        if not func:
            import scipy.stats
            func = getattr(scipy.stats, aggr)
        if func:
            result[y_rolling_series] = list(pd.Series(in_s[i]).rolling(n, center=center, min_periods=1).apply(func, args=tuple(aggr_params)).values for i in range(len(in_s)))
    ```;
    tbl
    | evaluate python(typeof(*), code, kwargs)
};
range x from 1 to 100 step 1
| extend y=iff(x % 13 == 0, 2.0, iff(x % 23 == 0, -2.0, rand()))
| summarize x=make_list(x), y=make_list(y)
| extend yr = dynamic(null)
| invoke series_rolling_fl('y', 'yr', 7, 'tmean', pack_array(pack_array(-2, 2), pack_array(false, false))) //  trimmed mean: ignoring values outside [-2,2] inclusive
| render linechart

输出

描述了滚动剪裁平均值的图形。