log_reduce_predict_fl()

适用于:✅Azure 数据资源管理器

函数 log_reduce_predict_fl() 会解析半结构化文本列(例如日志行)。对于每一行,它会与预训练模型中的相应模式匹配,否则会在未找到匹配模式的情况下报告异常。 该函数的输出类似于 log_reduce_fl(),但模式是从 log_reduce_train_fl() 生成的预训练模型中检索的。

先决条件

  • 必须在群集上启用 Python 插件。 这是函数中使用的内联 Python 所必需的。

语法

T | invoke log_reduce_predict_fl(models_tbl, model_name, reduce_col [, anomaly_str ])

详细了解语法约定

参数

客户 类型​​ 必需 说明
models_tbl ✔️ 一个表,其中包含由 log_reduce_train_fl() 生成的模型。 表的架构应为 (name:string, timestamp:datetime, model:string)。
model_name string ✔️ 将从 models_tbl 检索的模型的名称。 如果表中包含几个与模型名称匹配的模型,则使用最新的模型。
reduce_col string ✔️ 函数所应用到的字符串列的名称。
anomaly_str string 此字符串是在模型中没有匹配模式的行的输出。 默认值为“ANOMALY”。

函数定义

可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句不能独立运行。 它必须后跟一个表格表达式语句。 若要运行 log_reduce_fl() 的工作示例,请参阅示例

let log_reduce_predict_fl=(tbl:(*), models_tbl: (name:string, timestamp: datetime, model:string), 
                      model_name:string, reduce_col:string, anomaly_str: string = 'ANOMALY')
{
    let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
    let kwargs = bag_pack('logs_col', reduce_col, 'output_patterns_col', 'LogReduce','output_parameters_col', '', 
                          'model', model_str, 'anomaly_str', anomaly_str, 'output_type', 'summary');
    let code = ```if 1:
        from log_cluster import log_reduce_predict
        result = log_reduce_predict.log_reduce_predict(df, kargs)
    ```;
    tbl
    | evaluate hint.distribution=per_node python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
// Write your query to use the function here.

示例

以下示例使用 invoke 运算符运行函数。

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let log_reduce_predict_fl=(tbl:(*), models_tbl: (name:string, timestamp: datetime, model:string), 
                      model_name:string, reduce_col:string, anomaly_str: string = 'ANOMALY')
{
    let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
    let kwargs = bag_pack('logs_col', reduce_col, 'output_patterns_col', 'LogReduce','output_parameters_col', '', 
                          'model', model_str, 'anomaly_str', anomaly_str, 'output_type', 'summary');
    let code = ```if 1:
        from log_cluster import log_reduce_predict
        result = log_reduce_predict.log_reduce_predict(df, kargs)
    ```;
    tbl
    | evaluate hint.distribution=per_node python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
HDFS_log_100k
| take 1000
| invoke log_reduce_predict_fl(models_tbl=ML_Models, model_name="HDFS_100K", reduce_col="data")

输出

计数 LogReduce 示例
239 081110 <NUM><NUM> INFO dfs.DataNode$DataXceiver: 正在接收块 blk_<NUM> src: <IP> dest: <IP> 081110 215858 15494 INFO dfs.DataNode$DataXceiver: 正在接收块 blk_-7037346755429293022 src: /10.251.43.21:45933 dest: /10.251.43.21:50010
231 081110 <NUM><NUM> INFO dfs.DataNode$PacketResponder: 从 <IP> 接收到大小为 <NUM> 的块 blk_<NUM> 081110 215858 15485 INFO dfs.DataNode$PacketResponder: 从 /10.251.43.21 接收到大小为 67108864 的块 blk_5080254298708411681
230 081110 <NUM><NUM> INFO dfs.DataNode$PacketResponder: PacketResponder 正在终止块 <NUM> 的 <NUM> 081110 215858 15496 INFO dfs.DataNode$PacketResponder: 正在终止块 blk_-7746692545918257727 的 PacketResponder 2
218 081110 <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: <IP> 添加到大小为 <NUM> 081110 215858 27 的 blk_<NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.11.85:50010 添加到大小为 67108864 的 blk_5080254298708411681
79 081110 <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: <>. <> 081110 215858 26 INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /user/root/rand3/_temporary/task_200811101024_0005_m_001805_0/part-01805. blk-7037346755429293022
3 081110 <NUM><NUM> INFO dfs.DataBlockScanner: <*> 验证成功 081110 215859 13 INFO dfs.DataBlockScanner: blk_-7244926816084627474 验证成功