处理 Azure 流分析中基于可配置阈值的规则Process configurable threshold-based rules in Azure Stream Analytics

本文介绍如何使用参考数据实现在 Azure 流分析中使用基于可配置阈值的规则的警报解决方案。This article describes how to use reference data to achieve an alerting solution that uses configurable threshold-based rules in Azure Stream Analytics.

方案:基于可调整规则阈值的警报Scenario: Alerting based on adjustable rule thresholds

当传入的流式处理事件达到特定值,或者当基于传入的流式处理事件的聚合值超过特定阈值时,可能需要生成警报作为输出。You may need to produce an alert as output when incoming streamed events have reached a certain value, or when an aggregated value based on the incoming streamed events exceeds a certain threshold. 设置比较该值与预先确定的固定静态阈值的流分析查询非常简单。It simple to set up a Stream Analytics query that compared value to a static threshold that is fixed and predetermined. 固定阈值可硬编码为使用简单数值比较(大于、小于和等于)的流式处理查询语法。A fixed threshold can be hard-coded into the streaming query syntax using simple numerical comparisons (greater than, less than, and equality).

在某些情况下,需要能够更加轻松地配置阈值,确保无需在每次阈值发生更改时编辑查询语法。In some cases, the threshold values need to be more easily configurable without editing the query syntax each time that a threshold value changes. 在其他情况下,同一个查询可能需要处理多个设备或用户,并且每个查询需要在每种设备上设置不同的阈值。In other cases, you may need numerous devices or users processed by the same query with each of them having a different threshold values on each kind of device.

此模式可用于动态配置阈值,也可以通过筛选输入数据有选择性地选择阈值所应用到的设备类型,还可以有选择性地选择要包含在输出中的字段。This pattern can be used to dynamically configure thresholds, selectively choose which kind of device the threshold applies by filtering the input data, and selectively choose which fields to include in the output.

将参考数据输入用于流分析作业,以查找警报阈值:Use a reference data input to a Stream Analytics job as a lookup of the alert thresholds:

  • 将阈值存储在参考数据中,一个键一个值。Store the threshold values in the reference data, one value per key.
  • 将流式处理数据输入事件联接到键列上的参考数据。Join the streaming data input events to the reference data on the key column.
  • 将参考数据中的键控值用作阈值。Use the keyed value from the reference data as the threshold value.

示例数据和查询Example data and query

在示例中,当在一分钟的窗口内从设备流式传入的数据的聚合与规则中作为参考数据提供的规定值匹配时,生成警报。In the example, alerts are generated when the aggregate of data streaming in from devices in a minute-long window matches the stipulated values in the rule supplied as reference data.

在查询中,对于每个 deviceId 和每个 deviceId 下的 metricName,可以从 0 到 5 个维度进行配置,以作为分组依据。In the query, for each deviceId, and each metricName under the deviceId, you can configure from 0 to 5 dimensions to GROUP BY. 仅对具有相应筛选值的事件进行分组。Only the events having the corresponding filter values are grouped. 分组后,Min、Max、Avg 的窗口聚合以 60 秒的翻转窗口进行计算。Once grouped, windowed aggregates of Min, Max, Avg, are calculated over a 60-second tumbling window. 然后,根据参考数据中配置的阈值筛选计算出的聚合值,以生成警报输出事件。Filters on the aggregated values are then calculated as per the configured threshold in the reference, to generate the alert output event.

例如,假设有一个流分析作业,它有一个名为“rules”的参考数据输入和一个名为“metrics”的流式处理数据输入。As an example, assume there is a Stream Analytics job that has a reference data input named rules, and streaming data input named metrics.

引用数据Reference data

此示例参考数据显示基于阈值的规则的表示形式。This example reference data shows how a threshold-based rule could be represented. JSON 文件保存参考数据并保存在 Azure blob 存储中,该 blob 存储容器用作名为“rules”的参考数据输入。A JSON file holds the reference data and is saved into Azure blob storage, and that blob storage container is used as a reference data input named rules. 随着时间的推移,用户可以覆盖此 JSON 文件并替换规则配置,而无需停止或启动流式处理作业。You could overwrite this JSON file and replace the rule configuration as time goes on, without stopping or starting the streaming job.

  • 示例规则用于表示在 CPU 超出(平均数大于或等于)90% 时的可调整警报。The example rule is used to represent an adjustable alert when CPU exceeds (average is greater than or equal to) the value 90 percent. value 字段可根据需要进行配置。The value field is configurable as needed.
  • 注意,规则有“operator”字段,稍后将在有关 AVGGREATEROREQUAL 上的查询语法中动态阐释。Notice the rule has an operator field, which is dynamically interpreted in the query syntax later on AVGGREATEROREQUAL.
  • 规则筛选键/值对为 2/C1 的特定维度的数据。The rule filters the data on a certain dimension key 2 with value C1. 其他字段为空字符串,指示不会按这些事件字段筛选输入流。Other fields are empty string, indicating not to filter the input stream by those event fields. 用户可以设置其他 CPU 规则来根据需要筛选其他匹配字段。You could set up additional CPU rules to filter other matching fields as needed.
  • 并非所有列都包含在输出警报事件中。Not all columns are to be included in the output alert event. 在本例中,includedDim 键号 2 打开为 TRUE,表示流中事件数据的字段号 2 将包含在符合条件的输出事件中。In this case, includedDim key number 2 is turned on TRUE to represent that field number 2 of event data in the stream will be included in the qualifying output events. 其他字段不包含在警报输出中,但可以调整字段列表。The other fields are not included in the alert output, but the field list can be adjusted.
{
    "ruleId": 1234, 
    "deviceId" : "978648", 
    "metricName": "CPU", 
    "alertName": "hot node AVG CPU over 90",
    "operator" : "AVGGREATEROREQUAL",
    "value": 90, 
    "includeDim": {
        "0": "FALSE", 
        "1": "FALSE", 
        "2": "TRUE", 
        "3": "FALSE", 
        "4": "FALSE"
    },
    "filter": {
        "0": "", 
        "1": "",
        "2": "C1", 
        "3": "", 
        "4": ""
    }    
}

示例流式处理查询Example streaming query

此示例流分析查询将上列中的“rules”参考数据联接到名为“metrics”上的数据的输入流。This example Stream Analytics query joins the rules reference data from the example above, to an input stream of data named metrics.

WITH transformedInput AS
(
    SELECT
        dim0 = CASE rules.includeDim.[0] WHEN 'TRUE' THEN metrics.custom.dimensions.[0].value ELSE NULL END,
        dim1 = CASE rules.includeDim.[1] WHEN 'TRUE' THEN metrics.custom.dimensions.[1].value ELSE NULL END,
        dim2 = CASE rules.includeDim.[2] WHEN 'TRUE' THEN metrics.custom.dimensions.[2].value ELSE NULL END,
        dim3 = CASE rules.includeDim.[3] WHEN 'TRUE' THEN metrics.custom.dimensions.[3].value ELSE NULL END,
        dim4 = CASE rules.includeDim.[4] WHEN 'TRUE' THEN metrics.custom.dimensions.[4].value ELSE NULL END,
        metric = metrics.metric.value,
        metricName = metrics.metric.name,
        deviceId = rules.deviceId, 
        ruleId = rules.ruleId, 
        alertName = rules.alertName,
        ruleOperator = rules.operator, 
        ruleValue = rules.value
    FROM 
        metrics
        timestamp by eventTime
    JOIN 
        rules
        ON metrics.deviceId = rules.deviceId AND metrics.metric.name = rules.metricName
    WHERE
        (rules.filter.[0] = '' OR metrics.custom.filters.[0].value = rules.filter.[0]) AND 
        (rules.filter.[1] = '' OR metrics.custom.filters.[1].value = rules.filter.[1]) AND
        (rules.filter.[2] = '' OR metrics.custom.filters.[2].value = rules.filter.[2]) AND
        (rules.filter.[3] = '' OR metrics.custom.filters.[3].value = rules.filter.[3]) AND
        (rules.filter.[4] = '' OR metrics.custom.filters.[4].value = rules.filter.[4])
)

SELECT
    System.Timestamp as time, 
    transformedInput.deviceId as deviceId,
    transformedInput.ruleId as ruleId,
    transformedInput.metricName as metric,
    transformedInput.alertName as alert,
    AVG(metric) as avg,
    MIN(metric) as min, 
    MAX(metric) as max, 
    dim0, dim1, dim2, dim3, dim4
FROM
    transformedInput
GROUP BY
    transformedInput.deviceId,
    transformedInput.ruleId,
    transformedInput.metricName,
    transformedInput.alertName,
    dim0, dim1, dim2, dim3, dim4,
    ruleOperator, 
    ruleValue, 
    TumblingWindow(second, 60)
HAVING
    (
        (ruleOperator = 'AVGGREATEROREQUAL' AND avg(metric) >= ruleValue) OR
        (ruleOperator = 'AVGEQUALORLESS' AND avg(metric) <= ruleValue) 
    )

示例流式处理输入事件数据Example streaming input event data

此示例 JSON 数据表示上述流式处理查询中使用的“metrics”输入数据。This example JSON data represents the metrics input data that is used in the above streaming query.

  • 在 1 分钟内列出三个示例事件,值为 T14:50Three example events are listed within the 1-minute timespan, value T14:50.
  • 这三个示例事件的 deviceId 值都为 978648All three have the same deviceId value 978648.
  • 每个事件的 CPU 指标值各不相同,分别为 989580The CPU metric values vary in each event, 98, 95, 80 respectively. 仅前两个示例事件超过规则中确定的 CPU 警报规则。Only the first two example events exceed the CPU alert rule established in the rule.
  • 警报规则中的 includeDim 字段为键号 2。The includeDim field in the alert rule was key number 2. 示例事件中的对应键 2 字段名为 NodeNameThe corresponding key 2 field in the example events is named NodeName. 这三个示例事件的值分别为 N024N024N014The three example events have values N024, N024, and N014 respectively. 在输出中,只能看见节点 N024,因为它是唯一满足高 CPU 警报条件的数据。In the output, you see only the node N024 as that is the only data that matches the alert criteria for high CPU. N014 不满足高 CPU 阈值。N014 does not meet the high CPU threshold.
  • 警报规则仅在键号 2 上配置 filter,这对应示例事件中的 cluster 字段。The alert rule is configured with a filter only on key number 2, which corresponds to the cluster field in the sample events. 这三个示例事件都有值 C1 并满足筛选条件。The three example events all have value C1 and match the filter criteria.
{
    "eventTime": "2018-04-30T14:50:23.1324132Z",
    "deviceId": "978648",
    "custom": {
        "dimensions": {
            "0": {
                "name": "NodeType",
                "value": "N1"
            },
            "1": {
                "name": "Cluster",
                "value": "C1"
            },
            "2": {
                "name": "NodeName",
                "value": "N024"
            }
        },
        "filters": {
            "0": {
                "name": "application",
                "value": "A1"
            },
            "1": {
                "name": "deviceType",
                "value": "T1"
            },
            "2": {
                "name": "cluster",
                "value": "C1"
            },
            "3": {
                "name": "nodeType",
                "value": "N1"
            }
        }
    },
    "metric": {
        "name": "CPU",
        "value": 98,
        "count": 1.0,
        "min": 98,
        "max": 98,
        "stdDev": 0.0
    }
}
{
    "eventTime": "2018-04-30T14:50:24.1324138Z",
    "deviceId": "978648",
    "custom": {
        "dimensions": {
            "0": {
                "name": "NodeType",
                "value": "N2"
            },
            "1": {
                "name": "Cluster",
                "value": "C1"
            },
            "2": {
                "name": "NodeName",
                "value": "N024"
            }
        },
        "filters": {
            "0": {
                "name": "application",
                "value": "A1"
            },
            "1": {
                "name": "deviceType",
                "value": "T1"
            },
            "2": {
                "name": "cluster",
                "value": "C1"
            },
            "3": {
                "name": "nodeType",
                "value": "N2"
            }
        }
    },
    "metric": {
        "name": "CPU",
        "value": 95,
        "count": 1,
        "min": 95,
        "max": 95,
        "stdDev": 0
    }
}
{
    "eventTime": "2018-04-30T14:50:37.1324130Z",
    "deviceId": "978648",
    "custom": {
        "dimensions": {
            "0": {
                "name": "NodeType",
                "value": "N3"
            },
            "1": {
                "name": "Cluster",
                "value": "C1 "
            },
            "2": {
                "name": "NodeName",
                "value": "N014"
            }
        },
        "filters": {
            "0": {
                "name": "application",
                "value": "A1"
            },
            "1": {
                "name": "deviceType",
                "value": "T1"
            },
            "2": {
                "name": "cluster",
                "value": "C1"
            },
            "3": {
                "name": "nodeType",
                "value": "N3"
            }
        }
    },
    "metric": {
        "name": "CPU",
        "value": 80,
        "count": 1,
        "min": 80,
        "max": 80,
        "stdDev": 0
    }
}

示例输出Example output

此示例输出 JSON 数据显示,单个警报事件是基于参考数据中定义的 CPU 阈值规则生成的。This example output JSON data shows a single alert event was produced based on the CPU threshold rule defined in the reference data. 输出事件包含警报名称以及涉及字段的聚合(平均值、最小值、最大值)。The output event contains the name of the alert as well as the aggregated (average, min, max) of the fields considered. 鉴于规则配置,输出事件数据包括字段键号 2 NodeNameN024The output event data includes field key number 2 NodeName value N024 due to the rule configuration. (将 JSON 更改为显示换行符以便阅读。)(The JSON was altered to show line breaks for readability.)

{"time":"2018-05-01T02:03:00.0000000Z","deviceid":"978648","ruleid":1234,"metric":"CPU",
"alert":"hot node AVG CPU over 90","avg":96.5,"min":95.0,"max":98.0,
"dim0":null,"dim1":null,"dim2":"N024","dim3":null,"dim4":null}