使用 Logstash 和基于 DCR 的 API 将日志流式传输到 Microsoft Sentinel

重要

注意:根据世纪互联发布的公告,所有 Microsoft Sentinel 功能将在中国区域的 Azure 上2026 年 8 月 18 日正式退役。

重要

使用 Logstash 输出插件通过数据收集规则 (DCR)(目前为公共预览版)引入数据。 此功能提供时不附带服务级别协议。 有关详细信息,请参阅适用于 Azure 预览版的补充使用条款

Microsoft Sentinel的 Logstash 输出插件支持通过数据收集规则 (DCR) 进行管道转换和高级配置。 该插件将日志从外部数据源转发到 Log Analytics 或 Microsoft Sentinel 中的自定义表或标准表。

本文介绍如何设置 Logstash 插件,以使用 DCR 将数据流式传输到 Log Analytics 或Microsoft Sentinel,并完全控制输出架构。

使用插件,可以:

  • 控制列名和类型的配置。
  • 在摄取过程中执行转换,例如筛选或增强。
  • 将自定义日志引入自定义表中,或将 Syslog 输入流引入 Log Analytics Syslog 表中。

引入到标准表仅限于支持自定义日志引入的标准表

若要详细了解如何使用 Logstash 数据收集引擎,请参阅 Logstash 入门

体系结构概述

Logstash 体系结构示意图,其中显示了通过日志引入 API 将数据发送到 Log Analytics 的输入、筛选和输出插件阶段。

Logstash 引擎由三个组件组成:

  • 输入插件:以自定义的方式从各种源收集数据。
  • 筛选器插件:根据指定的条件对数据进行处理和规范化。
  • 输出插件:以自定义的方式将收集和处理的数据发送到各种目标。

注意

  • Azure 仅支持此处讨论的由 Microsoft Sentinel 提供的 Logstash 输出插件。 当前插件为 microsoft-sentinel-log-analytics-logstash-output-plugin,v2.1.0。 可以针对有关输出插件的任何问题提交支持票证
  • Azure 不支持 Microsoft Sentinel 的第三方 Logstash 输出插件,也不支持任何其他 Logstash 插件或组件(不管什么类型)。
  • 请参阅插件的 Logstash 版本支持的先决条件。

该插件使用日志引入 API 将 JSON 格式的数据发送到 Log Analytics 工作区。 数据将引入到自定义日志或一个标准表中。

在 Logstash 中部署 Microsoft Sentinel 输出插件

若要设置插件,请执行以下步骤:

  • 查看必备条件
  • 安装插件
  • 创建示例文件
  • 创建所需的DCR相关资源
  • 配置 Logstash 配置文件
  • 重启 Logstash
  • 在 Microsoft Sentinel 中查看传入的日志
  • 监视输出插件审核日志

Logstash 插件先决条件

  • 安装支持的 Logstash 版本。 该插件支持以下 Logstash 版本:

    • 7.0 - 7.17.13
    • 8.0 - 8.9
    • 8.11 - 8.15
    • 8.19.2
    • 9.0.8
    • 9.1.10
    • 9.2.4 - 9.2.5

    注意

    如果使用 Logstash 8,我们建议在管道中禁用 ECS

  • 验证你是否有一个你对其至少拥有参与者权限的 Log Analytics 工作区。

  • 验证你是否有权在该工作区中创建 DCR 对象。

安装插件

RubyGems 上的 Logstash 集合中提供了 Microsoft Sentinel 输出插件。

  • 请按照 Logstash 使用插件文档中的说明安装 microsoft-sentinel-log-analytics-logstash-output-plugin 插件。 若要安装到现有 Logstash 安装,请运行以下命令:

    logstash-plugin install microsoft-sentinel-log-analytics-logstash-output-plugin
    
  • 如果 Logstash 系统无法访问 Internet,请按照 Logstash 脱机插件管理 文档中的说明准备和使用脱机插件包。 (这需要构建另一个具有 Internet 访问的 Logstash 系统。)

创建示例文件

在本部分,你将为以下方案之一创建示例文件:

  • 为自定义日志创建示例文件
  • 创建示例文件以将日志引入 Syslog 表

为自定义日志创建示例文件

在此方案中,你将配置 Logstash 输入插件以将事件发送到 Microsoft Sentinel。 此示例使用生成器输入插件来模拟事件。 你可以使用任何其他输入插件。

在此示例中,Logstash 配置文件如下所示:

input {
      generator {
            lines => [
                 "This is a test log message"
            ]
           count => 10
      }
}

若要创建示例文件,请执行以下步骤:

  1. 将以下输出插件配置复制到 Logstash 配置文件中。

    output {
        microsoft-sentinel-log-analytics-logstash-output-plugin {
          create_sample_file => true
          sample_file_path => "<enter the path to the file in which the sample data will be written>" #for example: "c:\\temp" (for windows) or "/tmp" for Linux. 
        }
    }
    
  2. 确保引用的文件路径已存在,然后启动 Logstash。

    一旦有 10 个要采样的事件或 Logstash 进程正常退出,插件会将 10 条记录写入配置路径中名为 sampleFile<epoch seconds>.json 的示例文件。 例如:C:\temp\sampleFile1648453501.json。 下面是插件创建的示例文件的一部分:

    [
            {
                "host": "logstashMachine",
                "sequence": 0,
                "message": "This is a test log message",
                "ls_timestamp": "2022-03-28T17:45:01.690Z",
                "ls_version": "1"
            },
            {
                "host": "logstashMachine",
                "sequence": 1
        ...
    
        ]    
    

    插件会自动将以下属性添加到每条记录:

    • ls_timestamp:从输入插件收到记录的时间
    • ls_version:Logstash 管道的版本。

    创建 DCR 时,可以删除这些字段。

创建示例文件以将日志引入 Syslog 表

在此方案中,你将配置 Logstash 输入插件以将 syslog 事件发送到 Microsoft Sentinel。

  1. 如果你尚未将 syslog 消息转发到 Logstash 计算机中,可以使用 logger 命令生成消息。 例如(对于 Linux):

    logger -p local4.warn --rfc3164 --tcp -t CEF "0|Microsoft|Device|cef-test|example|data|1|here is some more data for the example" -P 514 -d -n 127.0.0.1
    

    下面是 Logstash 输入插件的示例:

    input {
         syslog {
             port => 514
        }
    }
    
  2. 将以下输出插件配置复制到 Logstash 配置文件中。

    output {
        microsoft-sentinel-log-analytics-logstash-output-plugin {
          create_sample_file => true
          sample_file_path => "<enter the path to the file in which the sample data will be written>" #for example: "c:\\temp" (for windows) or "/tmp" for Linux. 
        }
    }
    
  3. 确保文件路径已存在,然后启动 Logstash。

    一旦有 10 个要采样的事件或 Logstash 进程正常退出,插件会将 10 条记录写入配置路径中名为 sampleFile<epoch seconds>.json 的示例文件。 例如:C:\temp\sampleFile1648453501.json。 下面是插件创建的示例文件的一部分:

    [
            {
                "logsource": "logstashMachine",
                "facility": 20,
                "severity_label": "Warning",
                "severity": 4,
                "timestamp": "Apr  7 08:26:04",
                "program": "CEF:",
                "host": "127.0.0.1",
                "facility_label": "local4",
                "priority": 164,
                "message": "0|Microsoft|Device|cef-test|example|data|1|here is some more data for the example",
                "ls_timestamp": "2022-04-07T08:26:04.000Z",
                "ls_version": "1"
            }
    ]    
    
    

    插件会自动将以下属性添加到每条记录:

    • ls_timestamp:从输入插件收到记录的时间
    • ls_version:Logstash 管道的版本。

    创建 DCR 时,可以删除这些字段。

创建所需的 DCR 资源

若要配置Microsoft Sentinel基于 DCR 的 Logstash 插件,请先创建与 DCR 相关的资源。

在本节中,你将为以下情景之一创建资源,供你的 DCR 使用:

  • 创建要引入到自定义表中的 DCR 资源
  • 创建要引入到标准表中的 DCR 资源

创建要引入到自定义表中的 DCR 资源

若要将数据引入到自定义表中,请执行以下步骤(根据使用 REST API(Azure 门户)将数据发送到 Azure Monitor 日志教程):

  1. 请查看先决条件

  2. 配置应用程序

  3. 添加自定义日志表

  4. 使用在上一部分创建的示例文件分析和筛选示例数据

  5. 从 DCR 收集信息

  6. 分配对 DCR 的权限

    跳过“发送示例数据”步骤。

如果遇到任何问题,请参阅故障排除步骤

创建要引入到标准表中的 DCR 资源

若要将数据引入到 Syslog 或 CommonSecurityLog 等标准表中,请根据使用 REST API(资源管理器模板)将数据发送到 Azure Monitor 日志教程使用一个过程。 虽然本教程介绍的是如何将数据引入到自定义表中,但你可以轻松调整该过程以将数据引入到标准表中。 以下步骤指明了需要在步骤中做出的相关更改。

  1. 请查看先决条件

  2. 收集工作区详细信息

  3. 配置应用程序

    跳过“在 Log Analytics 工作区中创建新表”步骤。 将数据引入标准表时,此步骤并不相关,因为该表已在 Log Analytics 中定义。

  4. 创建 DCR。 在此步骤中:

    • 提供在上一部分创建的示例文件。
    • 使用创建的示例文件来定义 streamDeclarations 属性。 示例文件中的每个字段都应有一个名称相同且类型适当的相应列, (请参阅以下示例) 。
    • 使用标准表而不是自定义表的名称配置 outputStream 属性的值。 与自定义表不同,标准表的名称没有 _CL 后缀。
    • 表名称的前缀应该是 Microsoft- 而不是 Custom-。 在此示例中, outputStream 属性值为 Microsoft-Syslog
  5. 分配对 DCR 的权限

    跳过“发送示例数据”步骤。

如果遇到任何问题,请参阅故障排除步骤

示例:用于将数据引入到 Syslog 表中的 DCR

请记住以下几点:

  • streamDeclarations 名称和类型应与示例文件字段相同,但你不必指定所有这些字段。 例如,在以下 DCR 中,PRI 列中省略了 typels_versionstreamDeclarations 字段。
  • dataflows 属性将输入转换为 Syslog 表格式,并将 outputStream 设置为 Microsoft-Syslog
{
  "$schema": "https://schema.management.azure.com/schemas/2019-04-01/deploymentTemplate.json#",
  "contentVersion": "1.0.0.0",
  "parameters": {
    "dataCollectionRuleName": {
      "type": "String",
      "metadata": {
        "description": "Specifies the name of the Data Collection Rule to create."
      }
    },
    "location": {
      "defaultValue": "[resourceGroup().location]",
      "type": "String",
      "metadata": {
        "description": "Specifies the location in which to create the Data Collection Rule."
      }
    },
    "workspaceResourceId": {
      "type": "String",
      "metadata": {
        "description": "Specifies the Azure resource ID of the Log Analytics workspace to use."
      }
    }
  },
  "resources": [
    {
      "type": "Microsoft.Insights/dataCollectionRules",
      "apiVersion": "2021-09-01-preview",
      "name": "[parameters('dataCollectionRuleName')]",
      "location": "[parameters('location')]",
      "properties": {
        "streamDeclarations": {
          "Custom-SyslogStream": {
            "columns": [
              { "name": "ls_timestamp", "type": "datetime" },
              { "name": "timestamp", "type": "datetime" },
              { "name": "message", "type": "string" },
              { "name": "facility_label", "type": "string" },
              { "name": "severity_label", "type": "string" },
              { "name": "host", "type": "string" },
              { "name": "logsource", "type": "string" }
            ]
          }
        },
        "destinations": {
          "logAnalytics": [
            {
              "workspaceResourceId": "[parameters('workspaceResourceId')]",
              "name": "clv2ws1"
            }
          ]
        },
        "dataFlows": [
          {
            "streams": ["Custom-SyslogStream"],
            "destinations": ["clv2ws1"],
            "transformKql": "source | project TimeGenerated = ls_timestamp, EventTime = todatetime(timestamp), Computer = logsource, HostName = logsource, HostIP = host, SyslogMessage = message, Facility = facility_label, SeverityLevel = severity_label",
            "outputStream": "Microsoft-Syslog"
          }
        ]
      }
    }
  ],
  "outputs": {
    "dataCollectionRuleId": {
      "type": "String",
      "value": "[resourceId('Microsoft.Insights/dataCollectionRules', parameters('dataCollectionRuleName'))]"
    }
  }
}

配置 Logstash 配置文件

该插件支持两种身份验证方法: 服务主体 (客户端凭据) 和 托管标识 (无密码) 。 选择适合你的环境的方法。

服务主体身份验证

若要配置 Logstash 配置文件以使用服务主体身份验证将日志引入自定义表,请检索以下值:

字段 如何检索
client_app_Id 根据你在本部分中使用的教程,创建 DCR 资源时在步骤 3 中创建的 Application (client) ID 值。
client_app_secret 根据本部分中使用的教程,在步骤 5 中创建 DCR 资源时创建的客户端机密值。
tenant_id 订阅的租户 ID。 可以在“主页”>“Microsoft Entra ID”>“概述”>“基本信息”下找到该租户 ID。
data_collection_endpoint 根据你在本节中使用的教程,创建 DCR 资源时第 3 步中 logsIngestion URI 的值。
dcr_immutable_id 根据你在本节使用的教程创建 DCR 资源时,步骤 6 中的 DCR immutableId 的值。
dcr_stream_name 对于自定义表,如创建 DCR 资源时的步骤 6 中所述,请转到 DCR 的 JSON 视图,并复制 dataFlows>streams 属性。 请参阅以下示例中的 dcr_stream_name。 对于标准表,值为 Custom-SyslogStream

在您检索到所需值之后:

  1. 将上一步中创建的 Logstash 配置文件的 output 节替换为以下示例。
  2. 将以下示例中的占位符字符串替换为检索到的值。
  3. 请确保将 create_sample_file 属性更改为 false
示例:服务主体输出插件的配置
output {
    microsoft-sentinel-log-analytics-logstash-output-plugin {
      azure_cloud => "AzureChinaCloud"
      client_app_Id => "<enter your client_app_id value here>"
      client_app_secret => "<enter your client_app_secret value here>"
      tenant_id => "<enter your tenant id here>"
      data_collection_endpoint => "<enter your logsIngestion URI here>"
      dcr_immutable_id => "<enter your DCR immutableId here>"
      dcr_stream_name => "<enter your stream name here>"
      create_sample_file=> false
      sample_file_path => "c:\\temp"
    }
}

托管标识身份验证(无密码)

managed_identity 设置为 true 时,插件无需客户端密钥即可进行身份验证。 插件在运行时按以下顺序自动检测适当的标识机制:

  1. AKS 工作负荷标识 - 如果环境变量 AZURE_CLIENT_IDAZURE_TENANT_ID并且 AZURE_FEDERATED_TOKEN_FILE 存在(由 AKS 自动设置),插件将执行 OIDC 令牌交换。
  2. Azure Arc - 如果在主机上检测到 Azure Connected Machine 代理(azcmagent),则该插件会使用 Azure Arc 托管标识端点来处理混合环境和本地服务器。
  3. IMDS - 否则,插件则会改为使用用于 Azure VM 和 VMSS 的 Azure 实例元数据服务(IMDS)。

托管标识的必需配置:

字段 说明
managed_identity 布尔值,默认为 false。 将其设置为 true 以启用无密码身份验证。
data_collection_endpoint String. 你的 DCE 的 logsIngestion URI。
dcr_immutable_id String. DCR immutableId。
dcr_stream_name String. 数据流的名称。
managed_identity_object_id 可选。 字符串,默认为空。 用户分配的托管身份的对象 ID。 当 VM 具有多个用户分配的标识时是必需的。 对于系统分配的托管标识,请省略。
示例:系统分配的托管标识
output {
    microsoft-sentinel-log-analytics-logstash-output-plugin {
      azure_cloud => "AzureChinaCloud"
      managed_identity => true
      data_collection_endpoint => "<enter your DCE logsIngestion URI here>"
      dcr_immutable_id => "<enter your DCR immutableId here>"
      dcr_stream_name => "<enter your stream name here>"
    }
}
示例:用户分配的托管标识
output {
    microsoft-sentinel-log-analytics-logstash-output-plugin {
      azure_cloud => "AzureChinaCloud"
      managed_identity => true
      managed_identity_object_id => "<enter the object ID of your user-assigned identity>"
      data_collection_endpoint => "<enter your DCE logsIngestion URI here>"
      dcr_immutable_id => "<enter your DCR immutableId here>"
      dcr_stream_name => "<enter your stream name here>"
    }
}

注意

  • 使用 Azure Arc 时,Logstash 进程必须以属于 himds 组的用户身份运行,才能读取质询令牌。 有关详细信息,请参阅 Azure Arc 托管标识文档
  • 出于安全原因,请勿隐式声明敏感配置值,例如 client_app_secret 在 Logstash 配置文件中。 将敏感信息存储在 Logstash KeyStore 中。
  • 当将空字符串设置为代理设置的值时,会清除所有系统级代理设置。

可选配置

字段 说明 默认值
azure_cloud 用于指定正在使用的Azure云的名称。 可用值为: AzureCloudAzureChinaCloudAzureUSGovernment AzureCloud
key_names 字符串数组。 如果你要将列的子集发送到 Log Analytics,请提供此字段。 无(字段为空)
plugin_flush_interval 定义将两条消息发送到 Log Analytics 的最大间隔时间差(以秒为单位)。 5
retransmission_time 设置发送失败后重新传输消息的持续时间(以秒为单位)。 10
retransmission_delay 发送日志数据失败时,每次重试尝试之间的延迟(以秒为单位)。 增大此值可降低在限流 (HTTP 429) 情况下的请求速率。 2
compress_data 如果此字段为 True,则会在使用 API 之前压缩事件数据。 建议用于高吞吐量管道。 False
proxy 指定要用于所有 API 调用的代理 URL。 无(字段为空)
proxy_aad 指定要用于对 Microsoft Entra ID 进行 API 调用的代理 URL。 覆盖 proxy 设置。 无(字段为空)
proxy_endpoint 指定用于对数据收集终结点进行 API 调用的代理 URL。 覆盖 proxy 设置。 无(字段为空)

重启 Logstash

使用更新的输出插件配置重启 Logstash。 验证数据是否根据 DCR 配置引入到正确的表中。

在 Microsoft Sentinel 中查看传入的日志

若要验证日志数据是否到达工作区,请执行以下步骤:

  1. 验证消息是否发送到输出插件。

  2. 在Microsoft Sentinel导航菜单中,选择“日志”。 在“表格”标题下,展开“自定义日志”类别 。 查找并选择你在配置中指定的、带有 _CL 后缀的表名。

    Microsoft Sentinel“日志”页的截图,其中“自定义日志”类别已展开,并已选中 Logstash 自定义表。

  3. 若要查看表格中的记录,请将表格名称用作架构来查询表。

    Logstash 自定义日志查询的屏幕截图。

监视输出插件审核日志

若要监视 Microsoft Sentinel 输出插件的连接和活动,请启用相应的 Logstash 日志文件。 有关日志文件位置的信息,请参阅 Logstash 目录布局文档。

如果在此日志文件中看不到任何数据,请通过输入和筛选器插件在本地生成和发送一些事件,以确保输出插件正在接收数据。 Microsoft Sentinel仅支持与输出插件相关的问题。

网络安全

定义网络设置并为 Microsoft Sentinel Logstash 输出插件启用网络隔离。

虚拟网络服务标记

Microsoft Sentinel 输出插件支持 Azure 虚拟网络服务标记。 需要 AzureMonitorAzureActiveDirectory 标记。

Azure 虚拟网络服务标记可用于定义对网络安全组Azure 防火墙和用户定义的路由的网络访问控制。 创建安全规则和路由时,请使用服务标记而不是特定 IP 地址。 对于无法使用Azure 虚拟网络服务标记的情况,下面提供了防火墙要求。

防火墙要求

下表列出了无法使用 Azure 虚拟网络服务标记的场景的防火墙要求。

终结点 目的 端口 方向 绕过 HTTPS 检查
由世纪互联运营的 Microsoft Azure https://login.chinacloudapi.cn 授权服务器(Microsoft 标识平台) 端口 443 出站
由世纪互联运营的 Microsoft Azure 将上面的“.com”替换为“.cn” 数据收集终结点 端口 443 出站

插件版本历史记录

2.1.0

  • 修复了事件归一化问题。

2.0.0

  • 将插件从 Ruby 重构为 Java。
  • 添加了 ManagedIdentity 身份验证。
  • 已将基本代码从 GitHub 移动到 Azure DevOps。
  • 闭源代码库。

1.2.0

  • 新增对 Azure VM/VMSS 的托管标识身份验证支持(包括系统分配的标识以及通过 IMDS 分配的用户标识)。
  • 通过 OIDC 令牌交换新增对 AKS 工作负载身份的支持。
  • 为混合服务器和本地服务器添加了Azure Arc 托管标识支持。
  • 根据环境 (工作负载标识 env vars、Arc 代理或 IMDS 回退) ,在运行时自动检测身份验证方法。
  • 将 HTTP 客户端从 excon 迁移到 , rest-client 以提高 JRuby 和 Logstash 插件生态系统兼容性。
  • 将 Azure Active Directory 引用重命名为 Microsoft Entra ID。

1.1.4

  • excon 库版本限制为低于 1.0.0,以确保在使用代理时始终使用该端口。

1.1.3

  • rest-client 库替换用于连接 Azure 的 excon 库。

1.1.1

  • 新增对中国世纪互联运营的 Azure 美国政府 云和 Microsoft Azure 的支持。

1.1.0

  • 允许为 API 连接设置不同的代理值。
  • 将日志引入 API 的版本升级到 2023-01-01。
  • 将插件重命名为 'microsoft-sentinel-log-analytics-logstash-output-plugin'。

1.0.0

  • Microsoft Sentinel 的 Logstash 输出插件的初始版本。 此插件将数据收集规则 (DCR) 与 Azure Monitor 的日志引入 API 结合使用。

已知问题

使用安装在 Lite Ubuntu 的 Docker 映像上的 Logstash 时,可能会出现以下警告:

java.lang.RuntimeException: getprotobyname_r failed

若要解决此错误,请在 Dockerfile 中安装 netbase 包:

USER root
RUN apt install netbase -y

有关详细信息,请参阅 Logstash 7.17.0 中的 JNR 回归 (Docker)

如果环境的事件率较低,请将 plugin_flush_interval 的值增加到 60 或更高。 可以使用DCR 指标来监控数据摄取负载。 有关 plugin_flush_interval的详细信息,请参阅 可选配置 表。

限制

  • 引入到标准表仅限于支持自定义日志引入的标准表

  • streamDeclarations 属性中输入流的列必须以字母开头。 如果在列的开头使用其他字符(例如 @_),操作将会失败。

  • TimeGenerated 日期/时间字段是必填的。 必须在 KQL 转换中包含此字段。

  • 如果遇到其他问题,请查看教程中的故障排除部分