将数据从 Telegraf 引入到 Azure 数据资源管理器
重要
此连接器可用于 Microsoft Fabric 中的实时智能。 使用本文中的说明时,请注意以下例外情况:
- 如果需要,请按照创建 KQL 数据库中的说明创建数据库。
- 如果需要,请按照创建空表中的说明创建表。
- 使用复制 URI 中的说明获取查询或引入 URI。
- 运行 KQL 查询集中的查询。
Azure 数据资源管理器支持从 Telegraf 进行数据引入。 Telegraf 是一种开源、轻型、内存占用极小的代理,用于收集、处理和写入遥测数据,包括日志、指标和 IoT 数据。 Telegraf 支持数百个输入和输出插件。 它得到了开源社区的广泛使用和大力支持。 Azure 数据资源管理器输出插件充当 Telegraf 的连接器,并支持将数据从许多类型的输入插件引入 Azure 数据资源管理器。
先决条件
- Azure 订阅。 创建 Azure 帐户。
- Azure 数据资源管理器群集和数据库。 创建群集和数据库。
- Telegraf。 在虚拟机 (VM) 或容器中托管 Telegraf。 Telegraf 可以在部署受监视应用或服务的本地托管,也可以在专用监视计算/容器上远程托管。
支持的身份验证方法
插件支持以下身份验证方法:
具有应用密钥或证书的 Microsoft Entra 应用程序。
- 有关如何在 Microsoft Entra ID 中创建和注册应用的信息,请参阅注册应用程序。
- 有关服务主体的信息,请参阅 Microsoft Entra ID 中的应用程序和服务主体对象。
Microsoft Entra 用户令牌
- 允许插件像用户一样进行身份验证。 我们仅建议使用此方法进行开发。
Azure 托管服务标识 (MSI) 令牌
- 如果在支持的 Azure 环境(例如 Azure 虚拟机)中运行 Telegraf,则这是首选的身份验证方法。
无论使用哪种方法,都必须在 Azure 数据资源管理器中为指定主体分配“数据库用户”角色。 此角色允许插件创建引入数据所需的表。 如果插件已配置 create_tables=false
,则指定的主体必须至少具有“数据库引入者”角色。
配置身份验证方法
插件检查环境变量的特定配置,以确定要使用的身份验证方法。 按指定顺序评估配置,并使用检测到的第一个配置。 如果未检测到有效的配置,插件将无法进行身份验证。
若要为插件配置身份验证,请为所选的身份验证方法设置适当的环境变量:
客户端凭据(Microsoft Entra 应用程序令牌):Microsoft Entra 应用程序 ID 和机密。
AZURE_TENANT_ID
:用于身份验证的 Microsoft Entra 租户 ID。AZURE_CLIENT_ID
:租户中应用注册的客户端 ID。AZURE_CLIENT_SECRET
:专为应用注册生成的客户端密码。
客户端证书(Microsoft Entra 应用程序令牌):Microsoft Entra 应用程序 ID 和 X.509 证书。
AZURE_TENANT_ID
:用于身份验证的 Microsoft Entra 租户 ID。AZURE_CERTIFICATE_PATH
:PEM 或 PFX 格式的证书和私钥对路径,可对应用注册进行身份验证。AZURE_CERTIFICATE_PASSWORD
:为证书设置的密码。
资源所有者密码(Microsoft Entra 用户令牌):Microsoft Entra 用户和密码。 我们不建议使用此授权类型。 如果需要交互式登录,请使用设备登录。
AZURE_TENANT_ID
:用于身份验证的 Microsoft Entra 租户 ID。AZURE_CLIENT_ID
:租户中应用注册的客户端 ID。AZURE_USERNAME
:Microsoft Entra 用户帐户的用户名(也称为 upn)。AZURE_PASSWORD
:Microsoft Entra 用户帐户的密码。 请注意,这不支持已启用 MFA 的帐户。
Azure 托管服务标识:将凭据管理委托给平台。 此方法要求在 Azure 中(例如 VM)运行代码。 所有配置均由 Azure 处理。 此方法仅在使用 Azure 资源管理器时才适用。
配置 Telegraf
Telergraf 是一个配置驱动的代理。 若要开始,必须安装 Telegraf 并配置所需的输入和输出插件。 配置文件的默认位置如下所示:
- 在 Windows 中:C:\Program Files\Telegraf\telegraf.conf
- 在 Linux 中:etc/telegraf/telegraf.conf
若要启用 Azure 数据资源管理器输出插件,必须在自动生成的配置文件中取消注释以下部分:
[[outputs.azure_data_explorer]]
## The URI property of the Azure Data Explorer resource on Azure
## ex: https://myadxresource.chinaeast2.kusto.chinacloudapi.cn
# endpoint_url = ""
## The Azure Data Explorer database that the metrics will be ingested into.
## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion.
## ex: "exampledatabase"
# database = ""
## Timeout for Azure Data Explorer operations, default value is 20 seconds
# timeout = "20s"
## Type of metrics grouping used when ingesting to Azure Data Explorer
## Default value is "TablePerMetric" which means there will be one table for each metric
# metrics_grouping_type = "TablePerMetric"
## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable").
# table_name = ""
## Creates tables and relevant mapping if set to true(default).
## Skips table and mapping creation if set to false, this is useful for running telegraf with the least possible access permissions i.e. table ingestor role.
# create_tables = true
支持的引入类型
该插件支持托管(流式)和排队(批量)引入。 默认引入类型为排队。
重要
要使用托管引入,必须在群集上启用流式引入。
要配置插件的引入类型,请修改自动生成的配置文件,如下所示:
## Ingestion method to use.
## Available options are
## - managed -- streaming ingestion with fallback to batched ingestion or the "queued" method below
## - queued -- queue up metrics data and process sequentially
# ingestion_type = "queued"
查询引入数据
下面举例说明了使用 SQL 和 syslog 输入插件以及 Azure 数据资源管理器输出插件收集的数据。 对于每个输入方法,有一个示例演示了如何在 Azure 数据资源管理器中使用数据转换和查询。
SQL 输入插件
下表展示了 SQL 输入插件收集的示例指标数据:
name | 标记 | timestamp | fields |
---|---|---|---|
sqlserver_database_io | {"database_name":"azure-sql-db2","file_type":"DATA","host":"adx-vm","logical_filename":"tempdev","measurement_db_type":"AzureSQLDB","physical_filename":"tempdb.mdf","replica_updateability":"READ_WRITE","sql_instance":"adx-sql-server"} | 2021-09-09T13:51:20Z | {"current_size_mb":16,"database_id":2,"file_id":1,"read_bytes":2965504,"read_latency_ms":68,"reads":47,"rg_read_stall_ms":42,"rg_write_stall_ms":0,"space_used_mb":0,"write_bytes":1220608,"write_latency_ms":103,"writes":149} |
sqlserver_waitstats | {"database_name":"azure-sql-db2","host":"adx-vm","measurement_db_type":"AzureSQLDB","replica_updateability":"READ_WRITE","sql_instance":"adx-sql-server","wait_category":"Worker Thread","wait_type":"THREADPOOL"} | 2021-09-09T13:51:20Z | {"max_wait_time_ms":15,"resource_wait_ms":4469,"signal_wait_time_ms":0,"wait_time_ms":4469,"waiting_tasks_count":1464} |
由于收集的指标对象是一种复杂类型,因此“字段”和“标记”列存储为动态数据类型。 可通过多种方式查询此数据,例如:
直接查询 JSON 属性:可以采用原始格式查询 JSON 数据,而无需对其进行分析。
示例 1
Tablename | where name == "sqlserver_azure_db_resource_stats" and todouble(fields.avg_cpu_percent) > 7
示例 2
Tablename | distinct tostring(tags.database_name)
注意
使用大量数据时,此方法可能会影响性能。 在这种情况下,请使用更新策略方法。
使用更新策略:使用更新策略转换动态数据类型列。 建议使用此方法查询大量数据。
// Function to transform data .create-or-alter function Transform_TargetTableName() { SourceTableName | mv-apply fields on (extend key = tostring(bag_keys(fields)[0])) | project fieldname=key, value=todouble(fields[key]), name, tags, timestamp } // Create destination table with above query's results schema (if it doesn't exist already) .set-or-append TargetTableName <| Transform_TargetTableName() | take 0 // Apply update policy on destination table .alter table TargetTableName policy update @'[{"IsEnabled": true, "Source": "SourceTableName", "Query": "Transform_TargetTableName()", "IsTransactional": true, "PropagateIngestionProperties": false}]'
Syslog 输入插件
下表展示了 Syslog 输入插件收集的示例指标数据:
name | 标记 | timestamp | fields |
---|---|---|---|
syslog | {"appname":"azsecmond","facility":"user","host":"adx-linux-vm","hostname":"adx-linux-vm","severity":"info"} | 2021-09-20T14:36:44Z | {"facility_code":1,"message":" 2021/09/20 14:36:44.890110 Failed to connect to mdsd: dial unix /var/run/mdsd/default_djson.socket: connect: no such file or directory","procid":"2184","severity_code":6,"timestamp":"1632148604890477000","version":1} |
syslog | {"appname":"CRON","facility":"authpriv","host":"adx-linux-vm","hostname":"adx-linux-vm","severity":"info"} | 2021-09-20T14:37:01Z | {"facility_code":10,"message":" pam_unix(cron:session): session opened for user root by (uid=0)","procid":"26446","severity_code":6,"timestamp":"1632148621120781000","version":1} |
可通过多种方式使用 extend 运算符或 bag_unpack() 插件平展动态列。 在更新策略 Transform_TargetTableName() 函数中可以使用其中任一方式。
使用 extend 运算符:建议使用此方法,因为它更快且可靠。 即使架构发生更改,也不会中断查询或仪表板。
Tablename | extend facility_code=toint(fields.facility_code), message=tostring(fields.message), procid= tolong(fields.procid), severity_code=toint(fields.severity_code), SysLogTimestamp=unixtime_nanoseconds_todatetime(tolong(fields.timestamp)), version= todouble(fields.version), appname= tostring(tags.appname), facility= tostring(tags.facility),host= tostring(tags.host), hostname=tostring(tags.hostname), severity=tostring(tags.severity) | project-away fields, tags
使用 bag_unpack() 插件:此方法会将动态类型列解压缩。 更改源架构可能会在动态扩展列时导致问题。
Tablename | evaluate bag_unpack(tags, columnsConflict='replace_source') | evaluate bag_unpack(fields, columnsConflict='replace_source')