使用开源工具可视化 Azure 网络观察程序 NSG 流日志

注意

本文引用了 CentOS,这是一个接近生命周期结束 (EOL) 状态的 Linux 发行版。 请相应地考虑你的使用和规划。 有关详细信息,请参阅 CentOS 生命周期结束指导

可以通过网络安全组流日志提供的信息了解网络安全组的入口和出口 IP 流量。 这些流日志基于每个规则显示出站和入站流、流所适用的 NIC、有关流的 5 元组信息(源/目标 IP、源/目标端口、协议),以及是允许还是拒绝流量。

这些流日志可能难以手动分析并难以从中获取见解。 不过,可以使用多个开源工具将相关数据可视化。 本文提供了使用 Elastic Stack 将这些日志可视化的解决方案,允许你在 Kibana 仪表板上快速完成流日志的索引和可视化操作。

方案

在本文中,我们设置了一个解决方案,允许你使用 Elastic Stack 来可视化网络安全组流日志。 Logstash 输入插件直接从配置为包含流日志的存储 blob 中获取流日志。 然后,使用 Elastic Stack 为流日志编制索引并将这些日志用于创建 Kibana 仪表板,实现信息可视化。

图为方便用户使用 Elastic Stack 可视化网络安全组流日志的方案。

步骤

启用网络安全组流日志记录

就本方案来说,必须在帐户的至少一个网络安全组上启用网络安全组流日志记录。 有关启用网络安全流日志的说明,请参阅以下文章:Introduction to flow logging for Network Security Groups(网络安全组流日志记录简介)。

设置 Elastic Stack

通过将 NSG 流日志与 Elastic Stack 相连接,可以创建一个 Kibana 仪表板,以便搜索、可视化、分析日志并从中获得见解。

安装 Elasticsearch

以下说明用于在 Ubuntu Azure VM 中安装 Elasticsearch。 有关如何在 RHEL/CentOS 发行版中安装弹性搜索的说明,请参阅使用 RPM 安装 Elasticsearch

  1. Elastic Stack 5.0 及更高版本需要 Java 8。 运行命令 java -version 可以检查版本。 如果尚未安装 Java,请参阅支持 Azure 的 JDK 上的文档。

  2. 下载适用于系统的正确二进制程序包:

    curl -L -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.2.0.deb
    sudo dpkg -i elasticsearch-5.2.0.deb
    sudo /etc/init.d/elasticsearch start
    

    Elasticsearch Installation(Elasticsearch 安装)中介绍了其他安装方法

  3. 使用以下命令验证 Elasticsearch 是否正在运行:

    curl http://127.0.0.1:9200
    

    应该看到类似于以下示例的响应:

    {
    "name" : "Angela Del Toro",
    "cluster_name" : "elasticsearch",
    "version" : {
        "number" : "5.2.0",
        "build_hash" : "8ff36d139e16f8720f2947ef62c8167a888992fe",
        "build_timestamp" : "2016-01-27T13:32:39Z",
        "build_snapshot" : false,
        "lucene_version" : "6.1.0"
    },
    "tagline" : "You Know, for Search"
    }
    

有关安装弹性搜索的其他说明,请参阅安装说明

安装 Logstash

以下说明用于在 Ubuntu 中安装 Logstash。 有关如何在 RHEL/CentOS 中安装此包的说明,请参阅从包存储库安装 - yum一文。

  1. 若要安装 Logstash,请运行以下命令:

    curl -L -O https://artifacts.elastic.co/downloads/logstash/logstash-5.2.0.deb
    sudo dpkg -i logstash-5.2.0.deb
    
  2. 接下来,需要配置 Logstash,以访问和分析流日志。 使用以下命令创建 logstash.conf 文件:

    sudo touch /etc/logstash/conf.d/logstash.conf
    
  3. 将以下内容添加到文件:

    input {
      azureblob
        {
            storage_account_name => "mystorageaccount"
            storage_access_key => "VGhpcyBpcyBhIGZha2Uga2V5Lg=="
            container => "insights-logs-networksecuritygroupflowevent"
            codec => "json"
            # Refer https://docs.azure.cn/network-watcher/network-watcher-read-nsg-flow-logs
            # Typical numbers could be 21/9 or 12/2 depends on the nsg log file types
            file_head_bytes => 12
            file_tail_bytes => 2
            # Enable / tweak these settings when event is too big for codec to handle.
            # break_json_down_policy => "with_head_tail"
            # break_json_batch_count => 2
        }
      }
    
      filter {
        split { field => "[records]" }
        split { field => "[records][properties][flows]"}
        split { field => "[records][properties][flows][flows]"}
        split { field => "[records][properties][flows][flows][flowTuples]"}
    
     mutate{
      split => { "[records][resourceId]" => "/"}
      add_field => {"Subscription" => "%{[records][resourceId][2]}"
                    "ResourceGroup" => "%{[records][resourceId][4]}"
                    "NetworkSecurityGroup" => "%{[records][resourceId][8]}"}
      convert => {"Subscription" => "string"}
      convert => {"ResourceGroup" => "string"}
      convert => {"NetworkSecurityGroup" => "string"}
      split => { "[records][properties][flows][flows][flowTuples]" => ","}
      add_field => {
                  "unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
                  "srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
                  "destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
                  "srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
                  "destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
                  "protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
                  "trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
                  "traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
                  "flowstate" => "%{[records][properties][flows][flows][flowTuples][8]}"
                   "packetsSourceToDest" => "%{[records][properties][flows][flows][flowTuples][9]}"
                   "bytesSentSourceToDest" => "%{[records][properties][flows][flows][flowTuples][10]}"
                   "packetsDestToSource" => "%{[records][properties][flows][flows][flowTuples][11]}"
                   "bytesSentDestToSource" => "%{[records][properties][flows][flows][flowTuples][12]}"
                   }
      convert => {"unixtimestamp" => "integer"}
      convert => {"srcPort" => "integer"}
      convert => {"destPort" => "integer"}
     }
    
     date{
       match => ["unixtimestamp" , "UNIX"]
     }
    }
    output {
     stdout { codec => rubydebug }
     elasticsearch {
       hosts => "localhost"
       index => "nsg-flow-logs"
     }
    }
    

有关安装 Logstash 的其他说明,请参阅正式文档

安装适用于 Azure Blob 存储的 Logstash 输入插件

使用此 Logstash 插件可以直接从指定的存储帐户访问流日志。 要安装此插件,请从默认的 Logstash 安装目录运行以下命令:

sudo /usr/share/logstash/bin/logstash-plugin install logstash-input-azureblob

若要启动 Logstash,请运行以下命令:

sudo /etc/init.d/logstash start

有关此插件的详细信息,请参阅文档

安装 Kibana

有关如何在 RHEL/CentOS 系统中安装 Kibana 的说明,请参阅使用 RPM 安装 Kibana。 有关如何使用存储库包在 Ubuntu/Debian 系统中安装 Kibana 的说明,请参阅从 APT 存储库安装 Kibana

然后,以下说明在 Ubuntu 中进行测试,并可以用于不同的 Linux 发行版中,因为它们不特定于 Ubuntu。

  1. 运行以下命令以安装 Kibana:

    curl -L -O https://artifacts.elastic.co/downloads/kibana/kibana-5.2.0-linux-x86_64.tar.gz
    tar xzvf kibana-5.2.0-linux-x86_64.tar.gz
    
  2. 若要运行 Kibana,请使用以下命令:

    cd kibana-5.2.0-linux-x86_64/
    ./bin/kibana
    
  3. 若要查看 Kibana Web 界面,请导航到 http://localhost:5601

  4. 对于本方案,用于流日志的索引模式为“nsg-flow-logs”。 可以更改 logstash.conf 文件“output”节中的索引模式。

  5. 如果想要远程查看 Kibana 仪表板,请创建允许访问端口 5601 的入站 NSG 规则。

创建 Kibana 仪表板

下图显示了一个示例仪表板,用于查看警报中的趋势和详细信息:

图 1

下载仪表板文件可视化效果文件已保存的搜索文件

在 Kibana 的“Management”(管理)选项卡下,导航到“Saved Objects”(已保存的对象)并导入所有三个文件。 然后,可从“仪表板”选项卡打开并加载示例仪表板。

还可以创建自己的可视化效果,以及根据感兴趣的指标定制的仪表板。 阅读 Kibana 的正式文档,详细了解如何创建 Kibana 可视化效果。

可视化 NSG 流日志

示例仪表板提供了流日志的多种可视化效果:

  1. 按一段时间的决策/方向显示的流 - 时间系列图,显示一段时间内流的数目。 可以编辑这些可视化效果的时间单位和跨度。 “按决策显示的流”显示所做的允许或拒绝决策所占的比例,而“按方向显示的流”则显示入站和出站流量的比例。 使用这些可视化效果,可以检查一段时间内的流量趋势,查看是否存在峰值或异常模式。

    屏幕截图显示了一个样本面板,其中包含按一段时间的决策和方向显示的流。

  2. 按目标端口/源端口显示的流 - 饼图,显示流向各自端口的流的明细。 可以通过此视图查看最常用的端口。 如果点击饼图中的特定端口,仪表板的其余部分会对流进行筛选,仅保留该端口的流。

    屏幕截图显示了一个样本面板,其中包含按目标和源端口显示的流。

  3. 流的数目和最早的日志时间 - 此指标显示记录的流的数目,以及捕获的最早日志的日期。

    屏幕截图显示了一个样本面板,其中包含流数和最早的日志时间。

  4. 按 NSG 和规则显示的流 - 条形图,显示每个 NSG 中流的分布情况,以及每个 NSG 中规则的分布情况。 可以了解产生最多流量的 NSG 和规则。

    屏幕截图显示了一个样本面板,其中包含按 NSG 和规则显示的流。

  5. 排名前 10 的源/目标 IP - 条形图,显示排名前 10 的源 IP 和目标 IP。 可以调整这些图表,增加或减少排名靠前的 IP 的显示数目。 可以通过此图了解最常出现的 IP,以及针对每个 IP 所做的流量决策(允许或拒绝)。

    屏幕截图显示了一个样本面板,其中包含按排名前十的来源和目标 IP 地址显示的流。

  6. 流元组 - 此表显示包含在每个流元组中的信息及其相应的 NSG 和规则。

    屏幕截图显示了表中的流元组。

用户可以使用仪表板顶部的查询栏,根据流的任何参数(例如订阅 ID、资源组、规则或者任何其他感兴趣的变量)对仪表板的内容进行筛选。 有关 Kibana 的查询和筛选器的详细信息,请参阅正式文档

结论

通过将网络安全组流日志与 Elastic Stack 组合使用,我们提出了一种用于可视化网络流量的方式,该方式功能强大并且可以自定义。 这些仪表板允许快速获取和共享针对网络流量的见解,并且可以通过筛选来调查任何潜在的异常问题。 使用 Kibana 时,用户可以根据安全、审核和符合性需要对这些仪表板进行定制,打造特定的可视化效果。

后续步骤

访问 Visualize NSG flows logs with Power BI(使用 Power BI 可视化 NSG 流日志),了解如何使用 Power BI 可视化 NSG 流日志