将数据从 Logstash 引入 Azure 数据资源管理器Ingest data from Logstash to Azure Data Explorer

Logstash 是一个开源服务器端数据处理管道,可以同时从多个源引入数据、转换数据,然后将数据发送到你偏好的“储存”中。Logstash is an open source, server-side data processing pipeline that ingests data from many sources simultaneously, transforms the data, and then sends the data to your favorite "stash". 在本文中,你要将该数据发送到 Azure 数据资源管理器 - 用于日志和遥测数据的快速且高度可缩放的数据探索服务。In this article, you'll send that data to Azure Data Explorer, which is a fast and highly scalable data exploration service for log and telemetry data. 首先你将在测试群集中创建一个表和数据映射,然后指示 Logstash 将数据发送到该表,并验证结果。You'll initially create a table and data mapping in a test cluster,and then direct Logstash to send data into the table and validate the results.

必备条件Prerequisites

创建表Create a table

准备好群集和数据库之后,可以创建一个表。After you have a cluster and a database, it's time to create a table.

  1. 在数据库查询窗口中运行以下命令,以创建表:Run the following command in your database query window to create a table:

    .create table logs (timestamp: datetime, message: string)
    
  2. 运行以下命令,确认已创建新表 logs 并且该表是空的:Run the following command to confirm that the new table logs has been created and that it's empty:

    logs
    | count
    

创建映射Create a mapping

Azure 数据资源管理器使用映射将传入的数据转换为目标表架构。Mapping is used by Azure Data Explorer to transform the incoming data into the target table schema. 以下命令创建名为 basicmsg 的新映射。该映射根据 path 的指定从传入的 json 中提取属性,然后将这些属性输出到 columnThe following command creates a new mapping named basicmsg that extracts properties from the incoming json as noted by the path and outputs them to the column.

在查询窗口中运行以下命令:Run the following command in the query window:

.create table logs ingestion json mapping 'basicmsg' '[{"column":"timestamp","path":"$.@timestamp"},{"column":"message","path":"$.message"}]'

安装 Logstash 输出插件Install the Logstash output plugin

Logstash 输出插件与 Azure 数据资源管理器通信,并将数据发送到服务。The Logstash output plugin communicates with Azure Data Explorer and sends the data to the service. 在 Logstash 根目录中运行以下命令,以安装该插件:Run the following command inside the Logstash root directory to install the plugin:

bin/logstash-plugin install logstash-output-kusto

配置 Logstash 以生成示例数据集Configure Logstash to generate a sample dataset

Logstash 可以生成用于测试端到端管道的示例事件。Logstash can generate sample events that can be used to test an end-to-end pipeline. 如果你正在使用 Logstash 并且有权访问自己的事件流,请跳到下一部分。If you're already using Logstash and have access to your own event stream, skip to the next section.

Note

如果使用自己的数据,请更改上一步骤中定义的表和映射对象。If you're using your own data, change the table and mapping objects defined in the previous steps.

  1. 编辑包含所需管道设置的新文本文件(使用 vi):Edit a new text file that will contain the required pipeline settings (using vi):

    vi test.conf
    
  2. 粘贴以下设置,告知 Logstash 生成 1000 个测试事件:Paste the following settings that will tell Logstash to generate 1000 test events:

    input {
        stdin { }
        generator {
            message => "Test Message 123"
            count => 1000
        }
    }
    

此配置还包括 stdin 输入插件。使用该插件可以自行编写更多的消息(请务必使用 Enter 将消息提交到管道中)。This configuration also includes the stdin input plugin that will enable you to write more messages by yourself (be sure to use Enter to submit them into the pipeline).

配置 Logstash 以将数据发送到 Azure 数据资源管理器Configure Logstash to send data to Azure Data Explorer

将以下设置粘贴到在上一步骤中使用的同一配置文件中。Paste the following settings into the same config file used in the previous step. 请将所有占位符替换为相关的设置值。Replace all the placeholders with the relevant values for your setup. 有关详细信息,请参阅创建 AAD 应用程序For more information, see Creating an AAD Application.

output {
    kusto {
            path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm-ss}.txt"
            ingest_url => "https://ingest-<cluster name>.kusto.chinacloudapi.cn/"
            app_id => "<application id>"
            app_key => "<application key/secret>"
            app_tenant => "<tenant id>"
            database => "<database name>"
            table => "<target table>" # logs as defined above
            mapping => "<mapping name>" # basicmsg as defined above
    }
}
参数名称Parameter Name 说明Description
路径path Logstash 插件会将事件写入临时文件,然后将其发送到 Azure 数据资源管理器。The Logstash plugin writes events to temporary files before sending them to Azure Data Explorer. 此参数包含要将文件写入到的路径,以及一个用于轮转文件的时间表达式,该表达式可触发上传到 Azure 数据资源管理器服务的操作。This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the Azure Data Explorer service.
ingest_urlingest_url 用于进行引入相关通信的 Kusto 终结点。The Kusto endpoint for ingestion-related communication.
app_idapp_keyapp_tenantapp_id, app_key, and app_tenant 连接到 Azure 数据资源管理器所需的凭据。Credentials required to connect to Azure Data Explorer. 请务必使用具有引入特权的应用程序。Be sure to use an application with ingest privileges.
databasedatabase 要将事件放入到的数据库的名称。Database name to place events.
tabletable 要将事件放入到的目标表的名称。Target table name to place events.
mappingmapping mapping(映射)用于将传入事件的 json 字符串映射为正确的行格式(定义哪些属性要进入哪个列)。Mapping is used to map an incoming event json string into the correct row format (defines which property goes into which column).

运行 LogstashRun Logstash

现在,可以运行 Logstash 并测试设置。We are now ready to run Logstash and test our settings.

  1. 在 Logstash 根目录中运行以下命令:In the Logstash root directory, run the following command:

    bin/logstash -f test.conf
    

    屏幕上应会输出信息,然后输出示例配置生成的 1000 条消息。You should see information printed to the screen, and then the 1000 messages generated by our sample configuration. 此时还可以手动输入更多消息。At this point, you can also enter more messages manually.

  2. 几分钟后,请运行以下数据资源管理器查询来查看所定义的表中的消息:After a few minutes, run the following Data Explorer query to see the messages in the table you defined:

    logs
    | order by timestamp desc
    
  3. 按 Ctrl+C 退出 LogstashSelect Ctrl+C to exit Logstash

清理资源Clean up resources

在数据库中运行以下命令以清理 logs 表:Run the following command in your database to clean up the logs table:

.drop table logs

后续步骤Next steps