使用网络观察程序和 Grafana 管理和分析网络安全组流日志Manage and analyze Network Security Group flow logs using Network Watcher and Grafana

可以通过网络安全组 (NSG) 流日志提供的信息了解网络接口上的入口和出口 IP 流量。Network Security Group (NSG) flow logs provide information that can be used to understand ingress and egress IP traffic on network interfaces. 这些流日志针对每个 NSG 规则显示出站和入站流、流所适用的 NIC、有关流的 5 -元组信息(源/目标 IP、源/目标端口、协议),以及是允许还是拒绝流量。These flow logs show outbound and inbound flows on a per NSG rule basis, the NIC the flow applies to, 5-tuple information about the flow (Source/Destination IP, Source/Destination Port, Protocol), and if the traffic was allowed or denied.

网络中可能有许多启用了流日志记录的 NSG。You can have many NSGs in your network with flow logging enabled. 这么大量的日志记录数据导致难以对日志进行分析以及从中获得见解。This amount of logging data makes it cumbersome to parse and gain insights from your logs. 本文提供了一个解决方案来使用 Grafana(一个开源绘图工具)、ElasticSearch(一个分布式搜索和分析引擎)和 Logstash(一个开源服务器端数据处理管道)来集中管理这些 NSG 流日志。This article provides a solution to centrally manage these NSG flow logs using Grafana, an open source graphing tool, ElasticSearch, a distributed search and analytics engine, and Logstash, which is an open source server-side data processing pipeline.

方案Scenario

NSG 流日志是使用网络观察程序启用的,并且存储在 Azure Blob 存储中。NSG flow logs are enabled using Network Watcher and are stored in Azure blob storage. Logstash 插件用于连接和处理 Blob 存储中的流日志并将其发送到 ElasticSearch。A Logstash plugin is used to connect and process flow logs from blob storage and send them to ElasticSearch. 将流日志存储到 ElasticSearch 中之后,可在 Grafana 中对其进行分析,并在自定义的仪表板中将其可视化。Once the flow logs are stored in ElasticSearch, they can be analyzed and visualized into customized dashboards in Grafana.

NSG 网络观察程序 Grafana

安装步骤Installation steps

启用网络安全组流日志记录Enable Network Security Group flow logging

就本方案来说,必须在帐户的至少一个网络安全组上启用网络安全组流日志记录。For this scenario, you must have Network Security Group Flow Logging enabled on at least one Network Security Group in your account. 有关启用网络安全组流日志的说明,请参阅以下文章:网络安全组流日志记录简介For instructions on enabling Network Security Flow Logs, refer to the following article Introduction to flow logging for Network Security Groups.

安装注意事项Setup considerations

在此示例中,Azure 中部署的 Ubuntu 16.04 LTS 服务器上配置了 Grafana、ElasticSearch 和 Logstash。In this example Grafana, ElasticSearch, and Logstash are configured on an Ubuntu 16.04 LTS Server deployed in Azure. 此最小安装用于运行所有三个组件 - 均在同一 VM 上运行。This minimal setup is used for running all three components - they are all running on the same VM. 此安装应当仅用于测试和非关键工作负荷。This setup should only be used for testing and non-critical workloads. Logstash、Elasticsearch 和 Grafana 都可以构建为跨多个实例独立缩放。Logstash, Elasticsearch, and Grafana can all be architected to scale independently across many instances. 有关详细信息,请参阅这些组件中每一个的文档。For more information, see the documentation for each of these components.

安装 LogstashInstall Logstash

可以使用 Logstash 将 JSON 格式的流日志平展到流元组级别。You use Logstash to flatten the JSON formatted flow logs to a flow tuple level.

  1. 若要安装 Logstash,请运行以下命令:To install Logstash, run the following commands:

    curl -L -O https://artifacts.elastic.co/downloads/logstash/logstash-5.2.0.deb
    sudo dpkg -i logstash-5.2.0.deb
    
  2. 配置 Logstash,以分析流日志并将其发送到 ElasticSearch。Configure Logstash to parse the flow logs and send them to ElasticSearch. 使用以下命令创建 Logstash.conf 文件:Create a Logstash.conf file using:

    sudo touch /etc/logstash/conf.d/logstash.conf
    
  3. 将以下内容添加到该文件。Add the following content to the file. 更改存储帐户名称和访问密钥来反映你的存储帐户详细信息:Change the storage account name and access key to reflect your storage account details:

    input {
      azureblob
      {
        storage_account_name => "mystorageaccount"
        storage_access_key => "VGhpcyBpcyBhIGZha2Uga2V5Lg=="
        container => "insights-logs-networksecuritygroupflowevent"
        codec => "json"
        # Refer https://docs.azure.cn/network-watcher/network-watcher-read-nsg-flow-logs
        # Typical numbers could be 21/9 or 12/2 depends on the nsg log file types
        file_head_bytes => 12
        file_tail_bytes => 2
        # Enable / tweak these settings when event is too big for codec to handle.
        # break_json_down_policy => "with_head_tail"
        # break_json_batch_count => 2
      }
    }
    filter {
      split { field => "[records]" }
      split { field => "[records][properties][flows]"}
      split { field => "[records][properties][flows][flows]"}
      split { field => "[records][properties][flows][flows][flowTuples]"}
    
      mutate {
        split => { "[records][resourceId]" => "/"}
        add_field => { "Subscription" => "%{[records][resourceId][2]}"
          "ResourceGroup" => "%{[records][resourceId][4]}"
          "NetworkSecurityGroup" => "%{[records][resourceId][8]}" 
        }
        convert => {"Subscription" => "string"}
        convert => {"ResourceGroup" => "string"}
        convert => {"NetworkSecurityGroup" => "string"}
        split => { "[records][properties][flows][flows][flowTuples]" => "," }
        add_field => {
          "unixtimestamp" => "%{[records][properties][flows][flows][flowTuples][0]}"
          "srcIp" => "%{[records][properties][flows][flows][flowTuples][1]}"
          "destIp" => "%{[records][properties][flows][flows][flowTuples][2]}"
          "srcPort" => "%{[records][properties][flows][flows][flowTuples][3]}"
          "destPort" => "%{[records][properties][flows][flows][flowTuples][4]}"
          "protocol" => "%{[records][properties][flows][flows][flowTuples][5]}"
          "trafficflow" => "%{[records][properties][flows][flows][flowTuples][6]}"
          "traffic" => "%{[records][properties][flows][flows][flowTuples][7]}"
          "flowstate" => "%{[records][properties][flows][flows][flowTuples][8]}"
          "packetsSourceToDest" => "%{[records][properties][flows][flows][flowTuples][9]}"
          "bytesSentSourceToDest" => "%{[records][properties][flows][flows][flowTuples][10]}"
          "packetsDestToSource" => "%{[records][properties][flows][flows][flowTuples][11]}"
          "bytesSentDestToSource" => "%{[records][properties][flows][flows][flowTuples][12]}"
        }
        add_field => {
          "time" => "%{[records][time]}"
          "systemId" => "%{[records][systemId]}"
          "category" => "%{[records][category]}"
          "resourceId" => "%{[records][resourceId]}"
          "operationName" => "%{[records][operationName}}"
          "Version" => "%{[records][properties][Version}}"
          "rule" => "%{[records][properties][flows][rule]}"
          "mac" => "%{[records][properties][flows][flows][mac]}"
        }
        convert => {"unixtimestamp" => "integer"}
        convert => {"srcPort" => "integer"}
        convert => {"destPort" => "integer"}
        add_field => { "message" => "%{Message}" }        
      }
    
      date {
        match => ["unixtimestamp" , "UNIX"]
      }
    }
    output {
      stdout { codec => rubydebug }
      elasticsearch {
        hosts => "localhost"
        index => "nsg-flow-logs"
      }
    }
    

提供的 Logstash 配置文件由三个部分组成:input、filter 和 output。The Logstash config file provided is composed of three parts: the input, filter, and output. input 部分指定 Logstash 要处理的日志的输入源 – 在本例中,我们将使用“azureblob”输入插件(在后续步骤中安装),以便可以访问 Blob 存储中存储的 NSG 流日志 JSON 文件。The input section designates the input source of the logs that Logstash will process - in this case we are going to use an "azureblob" input plugin (installed in the next steps) that will allow us to access the NSG flow log JSON files stored in blob storage.

然后,filter 部分将平展每个流日志文件,以便使每个单独的流元组及其关联属性成为单独的 Logstash 事件。The filter section then flattens each flow log file so that each individual flow tuple and its associated properties becomes a separate Logstash event.

最后,output 部分将每个 Logstash 事件转发到 ElasticSearch 服务器。Finally, the output section forwards each Logstash event to the ElasticSearch server. 可以随意修改 Logstash 配置文件来适应具体需求。Feel free to modify the Logstash config file to suit your specific needs.

安装适用于 Azure Blob 存储的 Logstash 输入插件Install the Logstash input plugin for Azure Blob storage

使用该 Logstash 插件可以直接从指定的 Blob 存储帐户访问流日志。This Logstash plugin enables you to directly access the flow logs from their designated blob storage account. 若要安装此插件,请从默认 Logstash 安装目录(在此示例中为 /usr/share/logstash/bin)运行以下命令:To install this plug in, from the default Logstash installation directory (in this case /usr/share/logstash/bin) run the command:

cd /usr/share/logstash/bin
sudo ./logstash-plugin install logstash-input-azureblob

有关此插件的详细信息,请参阅 Logstash input plugin for Azure Storage Blobs(适用于 Azure 存储 Blob 的 Logstash 输入插件)。For more information about this plug in, see Logstash input plugin for Azure Storage Blobs.

安装 ElasticSearchInstall ElasticSearch

可以使用以下脚本安装 ElasticSearch。You can use the following script to install ElasticSearch. 有关安装 ElasticSearch 的信息,请参阅 Elastic Stack(弹性堆栈)。For information about installing ElasticSearch, see Elastic Stack.

apt-get install apt-transport-https openjdk-8-jre-headless uuid-runtime pwgen -y
wget -qO - https://packages.elastic.co/GPG-KEY-elasticsearch | apt-key add -
echo "deb https://packages.elastic.co/elasticsearch/5.x/debian stable main" | tee -a /etc/apt/sources.list.d/elasticsearch-5.x.list
apt-get update && apt-get install elasticsearch
sed -i s/#cluster.name:.*/cluster.name:\ grafana/ /etc/elasticsearch/elasticsearch.yml
systemctl daemon-reload
systemctl enable elasticsearch.service
systemctl start elasticsearch.service

安装 GrafanaInstall Grafana

若要安装并运行 Grafana,请运行以下命令:To install and run Grafana, run the following commands:

wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana_4.5.1_amd64.deb
sudo apt-get install -y adduser libfontconfig
sudo dpkg -i grafana_4.5.1_amd64.deb
sudo service grafana-server start

有关更多的安装信息,请参阅 Installing on Debian / Ubuntu(在 Debian / Ubuntu 上进行安装)。For additional installation information, see Installing on Debian / Ubuntu.

将 ElasticSearch 服务器添加为数据源Add the ElasticSearch server as a data source

接下来,需要将包含流日志的 ElasticSearch 索引添加为数据源。Next, you need to add the ElasticSearch index containing flow logs as a data source. 可以通过选择“添加数据源”并使用相关信息完成表单来添加数据源。You can add a data source by selecting Add data source and completing the form with the relevant information. 可以在下面的屏幕截图中找到此配置的示例:A sample of this configuration can be found in the following screenshot:

添加数据源

创建仪表板Create a dashboard

现在,你已成功配置了 Grafana 来从包含 NSG 流日志的 ElasticSearch 索引读取数据,可以创建并个性化仪表板了。Now that you have successfully configured Grafana to read from the ElasticSearch index containing NSG flow logs, you can create and personalize dashboards. 若要创建新仪表板,请选择“创建第一个仪表板”。To create a new dashboard, select Create your first dashboard. 以下示例图形配置显示了按 NSG 规则分段的流:The following sample graph configuration shows flows segmented by NSG rule:

仪表板图形

以下屏幕截图描绘了一个图形和图表,其中显示了出现最多的流及其频率。The following screenshot depicts a graph and chart showing the top flows and their frequency. 流还可以按 NSG 规则以及按决策显示。Flows are also shown by NSG rule and flows by decision. Grafana 是可以高度自定义的,因此,建议创建仪表板来适应你的特定监视需求。Grafana is highly customizable so it's advisable that you create dashboards to suit your specific monitoring needs. 下面的示例显示了一个典型的仪表板:The following example shows a typical dashboard:

显示示例图配置的屏幕截图,其中按 NSG 规则对流进行了细分。

结论Conclusion

通过将网络观察程序与 ElasticSearch 和 Grafana 集成,现在能够以方便、集中的方式管理和可视化 NSG 流日志和其他数据。By integrating Network Watcher with ElasticSearch and Grafana, you now have a convenient and centralized way to manage and visualize NSG flow logs as well as other data. Grafana 提供了许多其他强大的绘图功能,使用这些功能还可以进一步管理流日志,以及更好地了解网络流量。Grafana has a number of other powerful graphing features that can also be used to further manage flow logs and better understand your network traffic. 现在,你已设置了 Grafana 实例并将其连接到了 Azure,可以继续尽情了解它所提供的其他功能了。Now that you have a Grafana instance set up and connected to Azure, feel free to continue to explore the other functionality that it offers.

后续步骤Next steps