log_reduce_predict_fl()
适用于:✅Azure 数据资源管理器
函数 log_reduce_predict_fl()
会解析半结构化文本列(例如日志行)。对于每一行,它会与预训练模型中的相应模式匹配,否则会在未找到匹配模式的情况下报告异常。 该函数的输出类似于 log_reduce_fl(),但模式是从 log_reduce_train_fl() 生成的预训练模型中检索的。
先决条件
- 必须在群集上启用 Python 插件。 这是函数中使用的内联 Python 所必需的。
语法
T |
invoke
log_reduce_predict_fl(
models_tbl,
model_name,
reduce_col [,
anomaly_str ])
详细了解语法约定。
参数
客户 | 类型 | 必需 | 说明 |
---|---|---|---|
models_tbl | 表 | ✔️ | 一个表,其中包含由 log_reduce_train_fl() 生成的模型。 表的架构应为 (name:string, timestamp:datetime, model:string)。 |
model_name | string |
✔️ | 将从 models_tbl 检索的模型的名称。 如果表中包含几个与模型名称匹配的模型,则使用最新的模型。 |
reduce_col | string |
✔️ | 函数所应用到的字符串列的名称。 |
anomaly_str | string |
此字符串是在模型中没有匹配模式的行的输出。 默认值为“ANOMALY”。 |
函数定义
可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:
使用以下 let 语句定义函数。 不需要任何权限。
let log_reduce_predict_fl=(tbl:(*), models_tbl: (name:string, timestamp: datetime, model:string),
model_name:string, reduce_col:string, anomaly_str: string = 'ANOMALY')
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = bag_pack('logs_col', reduce_col, 'output_patterns_col', 'LogReduce','output_parameters_col', '',
'model', model_str, 'anomaly_str', anomaly_str, 'output_type', 'summary');
let code = ```if 1:
from log_cluster import log_reduce_predict
result = log_reduce_predict.log_reduce_predict(df, kargs)
```;
tbl
| evaluate hint.distribution=per_node python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
// Write your query to use the function here.
示例
以下示例使用 invoke 运算符运行函数。
若要使用查询定义的函数,请在嵌入的函数定义后调用它。
let log_reduce_predict_fl=(tbl:(*), models_tbl: (name:string, timestamp: datetime, model:string),
model_name:string, reduce_col:string, anomaly_str: string = 'ANOMALY')
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = bag_pack('logs_col', reduce_col, 'output_patterns_col', 'LogReduce','output_parameters_col', '',
'model', model_str, 'anomaly_str', anomaly_str, 'output_type', 'summary');
let code = ```if 1:
from log_cluster import log_reduce_predict
result = log_reduce_predict.log_reduce_predict(df, kargs)
```;
tbl
| evaluate hint.distribution=per_node python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
HDFS_log_100k
| take 1000
| invoke log_reduce_predict_fl(models_tbl=ML_Models, model_name="HDFS_100K", reduce_col="data")
输出
计数 | LogReduce | 示例 |
---|---|---|
239 | 081110 | <NUM><NUM> INFO dfs.DataNode$DataXceiver: 正在接收块 blk_<NUM> src: <IP> dest: <IP> 081110 215858 15494 INFO dfs.DataNode$DataXceiver: 正在接收块 blk_-7037346755429293022 src: /10.251.43.21:45933 dest: /10.251.43.21:50010 |
231 | 081110 | <NUM><NUM> INFO dfs.DataNode$PacketResponder: 从 <IP> 接收到大小为 <NUM> 的块 blk_<NUM> 081110 215858 15485 INFO dfs.DataNode$PacketResponder: 从 /10.251.43.21 接收到大小为 67108864 的块 blk_5080254298708411681 |
230 | 081110 | <NUM><NUM> INFO dfs.DataNode$PacketResponder: PacketResponder 正在终止块 <NUM> 的 <NUM> 081110 215858 15496 INFO dfs.DataNode$PacketResponder: 正在终止块 blk_-7746692545918257727 的 PacketResponder 2 |
218 | 081110 | <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: <IP> 添加到大小为 <NUM> 081110 215858 27 的 blk_<NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.11.85:50010 添加到大小为 67108864 的 blk_5080254298708411681 |
79 | 081110 | <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: <>. <> 081110 215858 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /user/root/rand3/_temporary/task_200811101024_0005_m_001805_0/part-01805. blk-7037346755429293022 |
3 | 081110 | <NUM><NUM> INFO dfs.DataBlockScanner: <*> 验证成功 081110 215859 13 INFO dfs.DataBlockScanner: blk_-7244926816084627474 验证成功 |