series_uv_anomalies_fl()
适用于:✅Azure 数据资源管理器
函数 series_uv_anomalies_fl()
是一个用户定义的函数 (UDF),它通过调用单变量异常情况检测 API(Azure 认知服务的一部分)来检测时序中的异常。 函数接受一组有限的时间序列(作为数字动态数组)和所需的异常情况检测敏感级别。 每个时序都会转换为所需的 JSON 格式,然后发布到异常检测器服务终结点。 服务响应包含高/低/全部异常情况的动态数组、建模基线时序、其正常高/低边界(高于或低于高/低边界的值属于异常情况)和检测到的周期性。
注意
请考虑使用原生函数 series_decompose_anomalies(),其可伸缩性更强且运行速度更快。
先决条件
- Azure 订阅。 创建 Azure 帐户。
- 群集和数据库创建群集和数据库或具有编辑权限和数据的 KQL 数据库。
- 必须在群集上启用 Python 插件。 这是函数中使用的内联 Python 所必需的。
- 创建异常检测器资源并获取其密钥以访问服务。
- 在群集上启用 http_request 插件/http_request_post 插件以访问异常情况检测服务终结点。
- 修改类型
webapi
的标注策略以访问异常情况检测服务终结点。
在以下函数示例中,请将标头的 uri 中的 YOUR-AD-RESOURCE-NAME
和 Ocp-Apim-Subscription-Key
中的 YOUR-KEY
替换为异常检测器资源名称和密钥。
语法
T | invoke series_uv_anomalies_fl(
y_series [,
sensitivity [,
tsid]])
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
y_series | string |
✔️ | 输入表列的名称,其中包含要进行异常情况检测的序列的值。 |
sensitivity | integer | [0-100] 范围内的整数,用于指定异常情况检测敏感度。 0 是敏感度最低的检测,而 100 是敏感度最高的检测,表示即使只是与预期基线有很小的偏差,也会被标记为异常情况。 默认值:85 | |
tsid | string |
输入表列的名称,其中包含时序 ID。 分析单个时序时可以省略。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let series_uv_anomalies_fl=(tbl:(*), y_series:string, sensitivity:int=85, tsid:string='_tsid')
{
let uri = 'https://YOUR-AD-RESOURCE-NAME.cognitiveservices.azure.com/anomalydetector/v1.0/timeseries/entire/detect';
let headers=dynamic({'Ocp-Apim-Subscription-Key': h'YOUR-KEY'});
let kwargs = bag_pack('y_series', y_series, 'sensitivity', sensitivity);
let code = ```if 1:
import json
y_series = kargs["y_series"]
sensitivity = kargs["sensitivity"]
json_str = []
for i in range(len(df)):
row = df.iloc[i, :]
ts = [{'value':row[y_series][j]} for j in range(len(row[y_series]))]
json_data = {'series': ts, "sensitivity":sensitivity} # auto-detect period, or we can force 'period': 84. We can also add 'maxAnomalyRatio':0.25 for maximum 25% anomalies
json_str = json_str + [json.dumps(json_data)]
result = df
result['json_str'] = json_str
```;
tbl
| evaluate python(typeof(*, json_str:string), code, kwargs)
| extend _tsid = column_ifexists(tsid, 1)
| partition by _tsid (
project json_str
| evaluate http_request_post(uri, headers, dynamic(null))
| project period=ResponseBody.period, baseline_ama=ResponseBody.expectedValues, ad_ama=series_add(0, ResponseBody.isAnomaly), pos_ad_ama=series_add(0, ResponseBody.isPositiveAnomaly)
, neg_ad_ama=series_add(0, ResponseBody.isNegativeAnomaly), upper_ama=series_add(ResponseBody.expectedValues, ResponseBody.upperMargins), lower_ama=series_subtract(ResponseBody.expectedValues, ResponseBody.lowerMargins)
| extend _tsid=toscalar(_tsid)
)
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
使用 series_uv_anomalies_fl()
来检测异常
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let series_uv_anomalies_fl=(tbl:(*), y_series:string, sensitivity:int=85, tsid:string='_tsid')
{
let uri = 'https://YOUR-AD-RESOURCE-NAME.cognitiveservices.azure.com/anomalydetector/v1.0/timeseries/entire/detect';
let headers=dynamic({'Ocp-Apim-Subscription-Key': h'YOUR-KEY'});
let kwargs = bag_pack('y_series', y_series, 'sensitivity', sensitivity);
let code = ```if 1:
import json
y_series = kargs["y_series"]
sensitivity = kargs["sensitivity"]
json_str = []
for i in range(len(df)):
row = df.iloc[i, :]
ts = [{'value':row[y_series][j]} for j in range(len(row[y_series]))]
json_data = {'series': ts, "sensitivity":sensitivity} # auto-detect period, or we can force 'period': 84. We can also add 'maxAnomalyRatio':0.25 for maximum 25% anomalies
json_str = json_str + [json.dumps(json_data)]
result = df
result['json_str'] = json_str
```;
tbl
| evaluate python(typeof(*, json_str:string), code, kwargs)
| extend _tsid = column_ifexists(tsid, 1)
| partition by _tsid (
project json_str
| evaluate http_request_post(uri, headers, dynamic(null))
| project period=ResponseBody.period, baseline_ama=ResponseBody.expectedValues, ad_ama=series_add(0, ResponseBody.isAnomaly), pos_ad_ama=series_add(0, ResponseBody.isPositiveAnomaly)
, neg_ad_ama=series_add(0, ResponseBody.isNegativeAnomaly), upper_ama=series_add(ResponseBody.expectedValues, ResponseBody.upperMargins), lower_ama=series_subtract(ResponseBody.expectedValues, ResponseBody.lowerMargins)
| extend _tsid=toscalar(_tsid)
)
};
let etime=datetime(2017-03-02);
let stime=datetime(2017-01-01);
let dt=1h;
let ts = requests
| make-series value=avg(value) on timestamp from stime to etime step dt
| extend _tsid='TS1';
ts
| invoke series_uv_anomalies_fl('value')
| lookup ts on _tsid
| render anomalychart with(xcolumn=timestamp, ycolumns=value, anomalycolumns=ad_ama)
输出
比较 series_uv_anomalies_fl()
和原生 series_decompose_anomalies()
以下示例将单变量异常情况检测 API 与三个时序的原生 series_decompose_anomalies()
函数进行比较,并假定 series_uv_anomalies_fl()
函数已在数据库中定义:
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let series_uv_anomalies_fl=(tbl:(*), y_series:string, sensitivity:int=85, tsid:string='_tsid')
{
let uri = 'https://YOUR-AD-RESOURCE-NAME.cognitiveservices.azure.com/anomalydetector/v1.0/timeseries/entire/detect';
let headers=dynamic({'Ocp-Apim-Subscription-Key': h'YOUR-KEY'});
let kwargs = bag_pack('y_series', y_series, 'sensitivity', sensitivity);
let code = ```if 1:
import json
y_series = kargs["y_series"]
sensitivity = kargs["sensitivity"]
json_str = []
for i in range(len(df)):
row = df.iloc[i, :]
ts = [{'value':row[y_series][j]} for j in range(len(row[y_series]))]
json_data = {'series': ts, "sensitivity":sensitivity} # auto-detect period, or we can force 'period': 84. We can also add 'maxAnomalyRatio':0.25 for maximum 25% anomalies
json_str = json_str + [json.dumps(json_data)]
result = df
result['json_str'] = json_str
```;
tbl
| evaluate python(typeof(*, json_str:string), code, kwargs)
| extend _tsid = column_ifexists(tsid, 1)
| partition by _tsid (
project json_str
| evaluate http_request_post(uri, headers, dynamic(null))
| project period=ResponseBody.period, baseline_ama=ResponseBody.expectedValues, ad_ama=series_add(0, ResponseBody.isAnomaly), pos_ad_ama=series_add(0, ResponseBody.isPositiveAnomaly)
, neg_ad_ama=series_add(0, ResponseBody.isNegativeAnomaly), upper_ama=series_add(ResponseBody.expectedValues, ResponseBody.upperMargins), lower_ama=series_subtract(ResponseBody.expectedValues, ResponseBody.lowerMargins)
| extend _tsid=toscalar(_tsid)
)
};
let ts = demo_make_series2
| summarize TimeStamp=make_list(TimeStamp), num=make_list(num) by sid;
ts
| invoke series_uv_anomalies_fl('num', 'sid', 90)
| join ts on $left._tsid == $right.sid
| project-away _tsid
| extend (ad_adx, score_adx, baseline_adx)=series_decompose_anomalies(num, 1.5, -1, 'linefit')
| project-reorder num, *
| render anomalychart with(series=sid, xcolumn=TimeStamp, ycolumns=num, baseline_adx, baseline_ama, lower_ama, upper_ama, anomalycolumns=ad_adx, ad_ama)
输出
下图显示用单变量异常情况检测 API 在 TS1 上检测到的异常情况。 也可以在图表筛选器框中选择“TS2”或“TS3”。
下图显示了 TS1 上的原生函数检测到的异常。