Azure 流分析 JavaScript 用户定义的聚合Azure Stream Analytics JavaScript user-defined aggregates

Azure 流分析支持以 JavaScript 编写的用户定义的聚合 (UDA),可实现复杂的有状态业务逻辑。Azure Stream Analytics supports user-defined aggregates (UDA) written in JavaScript, it enables you to implement complex stateful business logic. 在 UDA 中,我们可以全面控制状态数据结构、状态累积、状态分散和聚合结果计算。Within UDA you have full control of the state data structure, state accumulation, state decumulation, and aggregate result computation. 本文介绍两个不同的 JavaScript UDA 接口、UDA 的创建步骤,以及如何在流分析查询中将 UDA 与基于窗口的操作结合使用。The article introduces the two different JavaScript UDA interfaces, steps to create a UDA, and how to use UDA with window-based operations in Stream Analytics query.

JavaScript 用户定义的聚合JavaScript user-defined aggregates

用户定义的聚合在时间窗口规范的顶层使用,可基于该窗口内的事件进行聚合,并生成单个结果值。A user-defined aggregate is used on top of a time window specification to aggregate over the events in that window and produce a single result value. 流分析目前支持两种类型的 UDA 接口:AccumulateOnly 和 AccumulateDeaccumulate。There are two types of UDA interfaces that Stream Analytics supports today, AccumulateOnly and AccumulateDeaccumulate. 翻转窗口、跳跃窗口、滑动窗口和会话窗口可以使用这两种类型的 UDA。Both types of UDA can be used by Tumbling, Hopping, Sliding and Session Window. 与跳跃窗口、滑动窗口和会话窗口结合使用时,AccumulateDeaccumulate UDA 的表现比 AccumulateOnly UDA 更好。AccumulateDeaccumulate UDA performs better than AccumulateOnly UDA when used together with Hopping, Sliding and Session Window. 可以根据所用的算法选择其中一种类型。You choose one of the two types based on the algorithm you use.

AccumulateOnly 聚合AccumulateOnly aggregates

AccumulateOnly 聚合只能将新事件累积到其状态中,算法不允许将值分散。AccumulateOnly aggregates can only accumulate new events to its state, the algorithm does not allow deaccumulation of values. 无法实现从状态值中分散事件信息时,请选择此聚合类型。Choose this aggregate type when deaccumulate an event information from the state value is impossible to implement. 下面是 AccumulatOnly 聚合的 JavaScript 模板:Following is the JavaScript template for AccumulatOnly aggregates:

// Sample UDA which state can only be accumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.computeResult = function () {
        return this.state;
    }
}

AccumulateDeaccumulate 聚合AccumulateDeaccumulate aggregates

AccumulateDeaccumulate 聚合允许从状态中分散以前积累的某个值,例如,从事件值列表中删除某个键值对,或者从求和聚合的状态中减去某个值。AccumulateDeaccumulate aggregates allow deaccumulation of a previous accumulated value from the state, for example, remove a key-value pair from a list of event values, or subtract a value from a state of sum aggregate. 下面是 AccumulateDeaccumulate 聚合的 JavaScript 模板:Following is the JavaScript template for AccumulateDeaccumulate aggregates:

// Sample UDA which state can be accumulated and deaccumulated.
function main() {
    this.init = function () {
        this.state = 0;
    }

    this.accumulate = function (value, timestamp) {
        this.state += value;
    }

    this.deaccumulate = function (value, timestamp) {
        this.state -= value;
    }

    this.deaccumulateState = function (otherState){
        this.state -= otherState.state;
    }

    this.computeResult = function () {
        return this.state;
    }
}

UDA - JavaScript 函数声明UDA - JavaScript function declaration

每个 JavaScript UDA 由函数对象声明定义。Each JavaScript UDA is defined by a Function object declaration. 下面是 UDA 定义中的主要元素。Following are the major elements in a UDA definition.

函数别名Function alias

函数别名是 UDA 标识符。Function alias is the UDA identifier. 在流分析查询中调用时,始终结合“uda”使用 UDA 别名。When called in Stream Analytics query, always use UDA alias together with a "uda." 前缀开头。prefix.

函数类型Function type

对于 UDA 而言,函数类型应为 Javascript UDAFor UDA, the function type should be Javascript UDA.

输出类型Output type

流分析作业支持的特定类型;如果想要在查询中处理类型,则值为“Any”。A specific type that Stream Analytics job supported, or "Any" if you want to handle the type in your query.

函数名称Function name

此函数对象的名称。The name of this Function object. 函数名称应与 UDA 别名匹配。The function name should match the UDA alias.

方法 - init()Method - init()

Init() 方法初始化聚合的状态。The init() method initializes state of the aggregate. 窗口启动时会调用此方法。This method is called when window starts.

方法 - accumulate()Method - accumulate()

Accumulate() 方法基于前一状态和当前事件值计算 UDA 状态。The accumulate() method calculates the UDA state based on the previous state and the current event values. 当某个事件进入时间窗口(TUMBLINGWINDOW、HOPPINGWINDOW、SLIDINGWINDOW 或 SESSIONWINDOW)时,会调用此方法。This method is called when an event enters a time window (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW or SESSIONWINDOW).

方法 - deaccumulate()Method - deaccumulate()

deaccumulate() 方法基于前一状态和当前事件值重新计算状态。The deaccumulate() method recalculates state based on the previous state and the current event values. 当事件退出 SLIDINGWINDOW 或 SESSIONWINDOW 时,会调用此方法。This method is called when an event leaves a SLIDINGWINDOW or SESSIONWINDOW.

方法 - deaccumulateState()Method - deaccumulateState()

deaccumulateState() 方法基于前一状态和跃点状态重新计算状态。The deaccumulateState() method recalculates state based on the previous state and the state of a hop. 当一组事件退出 HOPPINGWINDOW 时,会调用此方法。This method is called when a set of events leave a HOPPINGWINDOW.

方法 - computeResult()Method - computeResult()

computeResult() 方法基于当前状态返回聚合结果。The computeResult() method returns aggregate result based on the current state. 在时间窗口(TUMBLINGWINDOW、HOPPINGWINDOW、SLIDINGWINDOW 或 SESSIONWINDOW)结束时调用此方法。This method is called at the end of a time window (TUMBLINGWINDOW, HOPPINGWINDOW, SLIDINGWINDOW or SESSIONWINDOW).

JavaScript UDA 支持的输入和输出数据类型JavaScript UDA supported input and output data types

有关 JavaScript UDA 数据类型,请参阅集成 JavaScript UDF流分析和 JavaScript 类型转换部分。For JavaScript UDA data types, refer to section Stream Analytics and JavaScript type conversion of Integrate JavaScript UDFs.

通过 Azure 门户添加 JavaScript UDAAdding a JavaScript UDA from the Azure portal

下面演练通过门户创建 UDA 的过程。Below we walk through the process of creating a UDA from Portal. 此处使用的示例计算时间加权平均值。The example we use here is computing time weighted averages.

现在,让我们执行以下步骤在现有的 ASA 作业下创建一个 JavaScript UDA。Now let's create a JavaScript UDA under an existing ASA job by following steps.

  1. 登录到 Azure 门户,并找到现有的流分析作业。Log on to Azure portal and locate your existing Stream Analytics job.

  2. 然后单击“作业拓扑”下的函数链接。Then click on functions link under JOB TOPOLOGY.

  3. 单击“添加”图标添加新函数。Click on the Add icon to add a new function.

  4. 在“新建函数”视图中,选择“JavaScript UDA”作为函数类型,然后,编辑器中会显示默认的 UDA 模板。On the New Function view, select JavaScript UDA as the Function Type, then you see a default UDA template show up in the editor.

  5. 填入“TWA”作为 UDA 别名,并按如下所示更改函数实现:Fill in "TWA" as the UDA alias and change the function implementation as the following:

    // Sample UDA which calculate Time-Weighted Average of incoming values.
    function main() {
        this.init = function () {
            this.totalValue = 0.0;
            this.totalWeight = 0.0;
        }
    
        this.accumulate = function (value, timestamp) {
            this.totalValue += value.level * value.weight;
            this.totalWeight += value.weight;
    
        }
    
        // Uncomment below for AccumulateDeaccumulate implementation
        /*
        this.deaccumulate = function (value, timestamp) {
            this.totalValue -= value.level * value.weight;
            this.totalWeight -= value.weight;
        }
    
        this.deaccumulateState = function (otherState){
            this.state -= otherState.state;
            this.totalValue -= otherState.totalValue;
            this.totalWeight -= otherState.totalWeight;
        }
        */
    
        this.computeResult = function () {
            if(this.totalValue == 0) {
                result = 0;
            }
            else {
                result = this.totalValue/this.totalWeight;
            }
            return result;
        }
    }
    
  6. 单击“保存”按钮后,该 UDA 会显示在函数列表中。Once you click the "Save" button, your UDA shows up on the function list.

  7. 单击新函数“TWA”即可检查函数定义。Click on the new function "TWA", you can check the function definition.

在 ASA 查询中调用 JavaScript UDACalling JavaScript UDA in ASA query

在 Azure 门户中打开作业,编辑查询,并调用具有必需前缀“uda.”的 TWA() 函数。In Azure portal and open your job, edit the query and call TWA() function with a mandate prefix "uda.". 例如:For example:

WITH value AS
(
    SELECT
    NoiseLevelDB as level,
    DurationSecond as weight
FROM
    [YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
    System.Timestamp as ts,
    uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)

使用 UDA 测试查询Testing query with UDA

创建包含以下内容的本地 JSON 文件,将该文件上传到流分析作业,并测试上述查询。Create a local JSON file with below content, upload the file to Stream Analytics job, and test above query.

[
  {"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
  {"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
  {"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
  {"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
  {"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
  {"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
  {"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
  {"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
  {"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
  {"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
  {"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
  {"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
  {"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]

获取帮助Get help

如需更多帮助,请访问有关 Azure 流分析的 Microsoft 问答页For additional help, try our Microsoft Q&A question page for Azure Stream Analytics.

后续步骤Next steps