log_reduce_fl()

适用于:✅Azure 数据资源管理器

函数 log_reduce_fl() 在半结构化文本列(例如日志行)中查找通用模式,并根据提取的模式对这些行进行聚类。 它输出一个摘要表,其中包含已找到的按相应频率自上而下排序的模式。

先决条件

  • 必须在群集上启用 Python 插件。 这是函数中使用的内联 Python 所必需的。

语法

T | invoke log_reduce_fl(reduce_col [, use_logram [, use_drain [, custom_regexes [, custom_regexes_policy [, delimiters [, similarity_th [, tree_depth [, trigram_th [, bigram_th ]]]]]]]]])

详细了解语法约定

参数

以下参数说明是摘要。 有关详细信息,请参阅有关算法的详细信息部分。

名称 类型​​ 必需 说明
reduce_col string ✔️ 函数所应用到的字符串列的名称。
use_logram bool 启用或禁用 Logram 算法。 默认值为 true
use_drain bool 启用或禁用 Drain 算法。 默认值为 true
custom_regexes dynamic 一个动态数组,其中包含要在每个输入行中搜索的正则表达式和替换符号对,这些符号将替换为各自匹配的符号。 默认值是 dynamic([])。 默认正则表达式表替换数字、IP 地址和 GUID。
custom_regexes_policy string “prepend”、“append”或“replace”。 控制 custom_regexes 是在前面附加、在后面追加还是替换默认项。 默认值为“prepend”。
delimiters dynamic 包含分隔符字符串的动态数组。 默认值为 dynamic([" "]),即,将空格定义为唯一一个字符分隔符。
similarity_th real Drain 算法使用的相似性阈值。 增大 similarity_th 会产生更精细的数据库。 默认值为 0.5。 如果禁用了 Drain,则此参数不起作用。
tree_depth int 增大 tree_depth 可以改进 Drain 算法的运行时间,但可能会降低其准确度。 默认值为 4。 如果禁用了 Drain,则此参数不起作用。
trigram_th int 减小 trigram_th 会增大 Logram 用通配符替换标记的可能性。 默认值为 10。 如果禁用了 Logram,则此参数不起作用。
bigram_th int 减小 bigram_th 会增大 Logram 用通配符替换标记的可能性。 默认值为 15。 如果禁用了 Logram,则此参数不起作用。

有关算法的详细信息

该函数对行运行多个阶段以化简为通用模式。 以下列表解释了阶段:

  • 正则表达式替换:在此阶段,每个行独立地与一组正则表达式匹配,每个匹配的表达式由一个替换符号替换。 默认正则表达式将 IP 地址、数字和 GUID 替换为 /<IP>、<GUID> 和 /<NUM>。 用户可以通过修改 custom_regexes 和 custom_regexes_policy 将更多正则表达式附加到这些正则表达式之前/之后,或者将其替换为新正则表达式或空列表。 例如,若要将整数替换为 <WNUM>,请设置 custom_regexes=pack_array('/^\d+$/', '<WNUM>');若要取消正则表达式替换,请设置 custom_regexes_policy='replace'。 对于每一行,该函数将保留要作为泛型替换标记的参数输出的原始表达式列表(在替换之前)。

  • 标记化:与上一步骤类似,每一行将独立处理并根据一组分隔符划分为标记。 例如,若要定义以逗号、句点或分号划分为标记,请设置 delimiters=pack_array(',', '.', ';')。

  • 应用 Logram 算法:此阶段是可选的,待处理的 use_logram 为 true。 建议在需要较大规模以及参数可能出现在日志条目的第一个标记中时使用 Logram。 当日志条目较短时禁用 OTOH,因为在这种情况下,算法往往会过于频繁地用通配符替换令牌。 Logram 算法考虑标记的 3 元组和 2 元组。 如果标记的 3 元组在日志行中很常见(出现超过 trigram_th 次),则有可能所有三个标记都是模式的一部分。 如果 3 元组比较罕见,则它有可能包含一个应该用通配符替换的变量。 对于罕见的 3 元组,我们会考虑 3 元组中包含的 2 元组的出现频率。 如果 2 元组很常见(出现超过 bigram_th 次),则剩余标记可能是参数,而不是模式的一部分。
    Logram 算法很容易并行化。 它要求对日志语料库完成两个阶段:第一个阶段统计每个 3 元组和 2 元组的频率,第二个阶段将前面所述的逻辑应用于每个条目。 若要并行化算法,我们只需对日志条目进行分区,并统一不同工作器的频率计数。

  • 应用 Drain 算法:此阶段是可选的,待处理的 use_drain 为 true。 Drain 是一种基于截断深度前缀树的日志分析算法。 日志消息根据其长度拆分,对于每个长度,日志消息的前 tree_depth 个标记用于生成前缀树。 如果找不到前缀标记的匹配项,则创建新分支。 如果找到前缀的匹配项,我们将在树叶中包含的模式中搜索最相似的模式。 模式相似性是按照所有标记中匹配的非通配符标记比率来衡量的。 如果最相似模式的相似性高于相似性阈值(参数 similarity_th),则日志条目与模式匹配。 对于该模式,该函数将用通配符替换所有不匹配的标记。 如果最相似模式的相似性低于相似性阈值,则创建包含日志条目的新模式。
    我们根据各种日志测试将默认的 tree_depth 设置为 4。 增大此深度可以改善运行时间,但可能会降低模式准确度;减小此深度可以提高准确度,但会减慢速度,因为每个节点执行的相似性测试要多得多。
    通常,Drain 可以有效地通用化和减少模式(不过很难并行化)。 但是,由于它依赖于前缀树,因此它在前几个标记包含参数的日志条目中可能不是最佳选择。 在大多数情况下,可以通过首先应用 Logram 来解决此问题。

函数定义

可以通过将函数的代码嵌入为查询定义的函数,或将其创建为数据库中的存储函数来定义函数,如下所示:

使用以下 let 语句定义函数。 不需要任何权限。

重要

let 语句不能独立运行。 它必须后跟一个表格表达式语句。 若要运行 log_reduce_fl() 的工作示例,请参阅示例

let log_reduce_fl=(tbl:(*), reduce_col:string,
              use_logram:bool=True, use_drain:bool=True, custom_regexes: dynamic = dynamic([]), custom_regexes_policy: string = 'prepend',
              delimiters:dynamic = dynamic(' '), similarity_th:double=0.5, tree_depth:int = 4, trigram_th:int=10, bigram_th:int=15)
{
    let default_regex_table = pack_array('(/|)([0-9]+\\.){3}[0-9]+(:[0-9]+|)(:|)', '<IP>', 
                                         '([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})', '<GUID>', 
                                         '(?<=[^A-Za-z0-9])(\\-?\\+?\\d+)(?=[^A-Za-z0-9])|[0-9]+$', '<NUM>');
    let kwargs = bag_pack('reduced_column', reduce_col, 'delimiters', delimiters,'output_column', 'LogReduce', 'parameters_column', '', 
                          'trigram_th', trigram_th, 'bigram_th', bigram_th, 'default_regexes', default_regex_table, 
                          'custom_regexes', custom_regexes, 'custom_regexes_policy', custom_regexes_policy, 'tree_depth', tree_depth, 'similarity_th', similarity_th, 
                          'use_drain', use_drain, 'use_logram', use_logram, 'save_regex_tuples_in_output', True, 'regex_tuples_column', 'RegexesColumn', 
                          'output_type', 'summary');
    let code = ```if 1:
        from log_cluster import log_reduce
        result = log_reduce.log_reduce(df, kargs)
    ```;
    tbl
    | extend LogReduce=''
    | evaluate python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
// Write your query to use the function here.

示例

以下示例使用 invoke 运算符运行函数。 此示例使用 Apache Hadoop 分布式文件系统日志

若要使用查询定义的函数,请在嵌入的函数定义后调用它。

let log_reduce_fl=(tbl:(*), reduce_col:string,
              use_logram:bool=True, use_drain:bool=True, custom_regexes: dynamic = dynamic([]), custom_regexes_policy: string = 'prepend',
              delimiters:dynamic = dynamic(' '), similarity_th:double=0.5, tree_depth:int = 4, trigram_th:int=10, bigram_th:int=15)
{
    let default_regex_table = pack_array('(/|)([0-9]+\\.){3}[0-9]+(:[0-9]+|)(:|)', '<IP>', 
                                         '([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})', '<GUID>', 
                                         '(?<=[^A-Za-z0-9])(\\-?\\+?\\d+)(?=[^A-Za-z0-9])|[0-9]+$', '<NUM>');
    let kwargs = bag_pack('reduced_column', reduce_col, 'delimiters', delimiters,'output_column', 'LogReduce', 'parameters_column', '', 
                          'trigram_th', trigram_th, 'bigram_th', bigram_th, 'default_regexes', default_regex_table, 
                          'custom_regexes', custom_regexes, 'custom_regexes_policy', custom_regexes_policy, 'tree_depth', tree_depth, 'similarity_th', similarity_th, 
                          'use_drain', use_drain, 'use_logram', use_logram, 'save_regex_tuples_in_output', True, 'regex_tuples_column', 'RegexesColumn', 
                          'output_type', 'summary');
    let code = ```if 1:
        from log_cluster import log_reduce
        result = log_reduce.log_reduce(df, kargs)
    ```;
    tbl
    | extend LogReduce=''
    | evaluate python(typeof(Count:int, LogReduce:string, example:string), code, kwargs)
};
//
// Finding common patterns in HDFS logs, a commonly used benchmark for log parsing
//
HDFS_log
| take 100000
| invoke log_reduce_fl(reduce_col="data")

输出

计数 LogReduce 示例
55356 081110 <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_<NUM> 添加到 <IP> 为 081110 220623 26 的 invalidSet INFO dfs.FSNamesystem: BLOCK* NameSystem.delete: blk_1239016582509138045 添加到 10.251.123.195:50010 的 invalidSet
10278 081110 <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: <IP> 添加到大小为 <NUM> 081110 215858 27 的 blk_<NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: blockMap updated: 10.250.11.85:50010 添加到大小为 67108864 的 blk_5080254298708411681
10256 081110 <NUM><NUM> INFO dfs.DataNode$PacketResponder: PacketResponder 正在终止块 <NUM> 的 <NUM> 081110 215858 15496 INFO dfs.DataNode$PacketResponder: 正在终止块 blk_-7746692545918257727 的 PacketResponder 2
10256 081110 <NUM><NUM> INFO dfs.DataNode$PacketResponder: 从 <IP> 接收到大小为 <NUM> 的块 blk_<NUM> 081110 215858 15485 INFO dfs.DataNode$PacketResponder: 从 /10.251.43.21 接收到大小为 67108864 的块 blk_5080254298708411681
9140 081110 <NUM><NUM> INFO dfs.DataNode$DataXceiver: 正在接收块 blk_<NUM> src: <IP> dest: <IP> 081110 215858 15494 INFO dfs.DataNode$DataXceiver: 正在接收块 blk_-7037346755429293022 src: /10.251.43.21:45933 dest: /10.251.43.21:50010
3047 081110 <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.allocateBlock: /user/root/rand3/temporary/task<NUM><NUM>m<NUM>_<NUM>/part-<NUM>. <> 081110 215858 26 INFO dfs.FSNamesystem: BLOCK NameSystem.allocateBlock: /user/root/rand3/_temporary/task_200811101024_0005_m_001805_0/part-01805. blk-7037346755429293022
1402 081110 <NUM><NUM> INFO <>: <> block blk_<NUM><> <> 081110 215957 15556 INFO dfs.DataNode$DataTransfer: 10.250.15.198:50010:Transmitted block blk_-3782569120714539446 to /10.251.203.129:50010
177 081110 <NUM><NUM> INFO <>: <><> <><*> 081110 215859 13 INFO dfs.DataBlockScanner: blk_-7244926816084627474 验证成功
36 081110 <NUM><NUM> INFO <>: <><> <> for block <*> 081110 215924 15636 INFO dfs.DataNode$BlockReceiver: 正在接收块 blk_3991288654265301939 的空数据包
12 081110 <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* <> <><> <><> <><> <> 081110 215953 19 INFO dfs.FSNamesystem: BLOCK* 要求 10.250.15.198:50010 将 blk_-3782569120714539446 复制到数据节点 10.251.203.129:50010
12 081110 <NUM><NUM> INFO <>: <><> <><> <> block blk_<NUM><> <> 081110 215955 18 INFO dfs.DataNode: 10.250.15.198:50010 正在启动将块 blk_-3782569120714539446 传输到 10.251.203.129:50010 的线程
12 081110 <NUM><NUM> INFO dfs.DataNode$DataXceiver: 已接收大小为<NUM> 的块 blk_<NUM> src: <IP> dest: <IP> 081110 215957 15226 INFO dfs.DataNode$DataXceiver: 已接收大小为 14474705 的块 blk_-3782569120714539446 src: /10.250.15.198:51013 dest: /10.250.15.198:50010
6 081110 <NUM><NUM><> dfs.FSNamesystem: BLOCK NameSystem.addStoredBlock: <> <><> <><> <><> <> size <NUM> 081110 215924 27 WARN dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: 在 10.251.202.134:50010 上收到 blk_2522553781740514003 的冗余 addStoredBlock 请求,大小为 67108864
6 081110 <NUM><NUM> INFO dfs.DataNode$DataXceiver: <> <><> <><>: <><> <><> <> 081110 215936 15714 INFO dfs.DataNode$DataXceiver: writeBlock blk_720939897861061328 收到异常 java.io.IOException:无法从流中读取
3 081110 <NUM><NUM> INFO dfs.FSNamesystem: BLOCK* NameSystem.addStoredBlock: <> <><> <><> <><> size <NUM><><> <><> <><> <><>. 081110 220635 28 INFO dfs.FSNamesystem: BLOCK NameSystem.addStoredBlock: 在 10.250.17.177:50010 上收到 blk_-81196479666306310 的 addStoredBlock 请求,大小为 53457811。但它不属于任何文件。
1 081110 <NUM><NUM><> <>: <> <><> <><> <><>. <><> <><> <>. 081110 220631 19 WARN dfs.FSDataset: Unexpected error trying to delete block blk_-2012154052725261337. BlockInfo not found in volumeMap.