将数据从 Telegraf 引入到 Azure 数据资源管理器

重要

此连接器可用于 Microsoft Fabric 中的实时智能。 使用本文中的说明时,请注意以下例外情况:

Azure 数据资源管理器支持从 Telegraf 进行数据引入。 Telegraf 是一种开源、轻型、内存占用极小的代理,用于收集、处理和写入遥测数据,包括日志、指标和 IoT 数据。 Telegraf 支持数百个输入和输出插件。 它得到了开源社区的广泛使用和大力支持。 Azure 数据资源管理器输出插件充当 Telegraf 的连接器,并支持将数据从许多类型的输入插件引入 Azure 数据资源管理器。

先决条件

  • Azure 订阅。 创建 Azure 帐户
  • Azure 数据资源管理器群集和数据库。 创建群集和数据库
  • Telegraf。 在虚拟机 (VM) 或容器中托管 Telegraf。 Telegraf 可以在部署受监视应用或服务的本地托管,也可以在专用监视计算/容器上远程托管。

支持的身份验证方法

插件支持以下身份验证方法:

  • 具有应用密钥或证书的 Microsoft Entra 应用程序。

  • Microsoft Entra 用户令牌

    • 允许插件像用户一样进行身份验证。 我们仅建议使用此方法进行开发。
  • Azure 托管服务标识 (MSI) 令牌

    • 如果在支持的 Azure 环境(例如 Azure 虚拟机)中运行 Telegraf,则这是首选的身份验证方法。

无论使用哪种方法,都必须在 Azure 数据资源管理器中为指定主体分配“数据库用户”角色。 此角色允许插件创建引入数据所需的表。 如果插件已配置 create_tables=false,则指定的主体必须至少具有“数据库引入者”角色。

配置身份验证方法

插件检查环境变量的特定配置,以确定要使用的身份验证方法。 按指定顺序评估配置,并使用检测到的第一个配置。 如果未检测到有效的配置,插件将无法进行身份验证。

若要为插件配置身份验证,请为所选的身份验证方法设置适当的环境变量:

  • 客户端凭据(Microsoft Entra 应用程序令牌):Microsoft Entra 应用程序 ID 和机密。

    • AZURE_TENANT_ID:用于身份验证的 Microsoft Entra 租户 ID。
    • AZURE_CLIENT_ID:租户中应用注册的客户端 ID。
    • AZURE_CLIENT_SECRET:专为应用注册生成的客户端密码。
  • 客户端证书(Microsoft Entra 应用程序令牌):Microsoft Entra 应用程序 ID 和 X.509 证书。

    • AZURE_TENANT_ID:用于身份验证的 Microsoft Entra 租户 ID。
    • AZURE_CERTIFICATE_PATH:PEM 或 PFX 格式的证书和私钥对路径,可对应用注册进行身份验证。
    • AZURE_CERTIFICATE_PASSWORD:为证书设置的密码。
  • 资源所有者密码(Microsoft Entra 用户令牌):Microsoft Entra 用户和密码。 我们不建议使用此授权类型。 如果需要交互式登录,请使用设备登录。

    • AZURE_TENANT_ID:用于身份验证的 Microsoft Entra 租户 ID。
    • AZURE_CLIENT_ID:租户中应用注册的客户端 ID。
    • AZURE_USERNAME:Microsoft Entra 用户帐户的用户名(也称为 upn)。
    • AZURE_PASSWORD:Microsoft Entra 用户帐户的密码。 请注意,这不支持已启用 MFA 的帐户。
  • Azure 托管服务标识:将凭据管理委托给平台。 此方法要求在 Azure 中(例如 VM)运行代码。 所有配置均由 Azure 处理。 此方法仅在使用 Azure 资源管理器时才适用。

配置 Telegraf

Telergraf 是一个配置驱动的代理。 若要开始,必须安装 Telegraf 并配置所需的输入和输出插件。 配置文件的默认位置如下所示:

  • 在 Windows 中:C:\Program Files\Telegraf\telegraf.conf
  • 在 Linux 中:etc/telegraf/telegraf.conf

若要启用 Azure 数据资源管理器输出插件,必须在自动生成的配置文件中取消注释以下部分:

[[outputs.azure_data_explorer]]
  ## The URI property of the Azure Data Explorer resource on Azure
  ## ex: https://myadxresource.chinaeast2.kusto.chinacloudapi.cn
  # endpoint_url = ""

  ## The Azure Data Explorer database that the metrics will be ingested into.
  ## The plugin will NOT generate this database automatically, it's expected that this database already exists before ingestion.
  ## ex: "exampledatabase"
  # database = ""

  ## Timeout for Azure Data Explorer operations, default value is 20 seconds
  # timeout = "20s"

  ## Type of metrics grouping used when ingesting to Azure Data Explorer
  ## Default value is "TablePerMetric" which means there will be one table for each metric
  # metrics_grouping_type = "TablePerMetric"

  ## Name of the single table to store all the metrics (Only needed if metrics_grouping_type is "SingleTable").
  # table_name = ""

  ## Creates tables and relevant mapping if set to true(default).
  ## Skips table and mapping creation if set to false, this is useful for running telegraf with the least possible access permissions i.e. table ingestor role.
  # create_tables = true

支持的引入类型

该插件支持托管(流式)和排队(批量)引入。 默认引入类型为排队。

重要

要使用托管引入,必须在群集上启用流式引入

要配置插件的引入类型,请修改自动生成的配置文件,如下所示:

  ##  Ingestion method to use.
  ##  Available options are
  ##    - managed  --  streaming ingestion with fallback to batched ingestion or the "queued" method below
  ##    - queued   --  queue up metrics data and process sequentially
  # ingestion_type = "queued"

查询引入数据

下面举例说明了使用 SQL 和 syslog 输入插件以及 Azure 数据资源管理器输出插件收集的数据。 对于每个输入方法,有一个示例演示了如何在 Azure 数据资源管理器中使用数据转换和查询。

SQL 输入插件

下表展示了 SQL 输入插件收集的示例指标数据:

name 标记 timestamp fields
sqlserver_database_io {"database_name":"azure-sql-db2","file_type":"DATA","host":"adx-vm","logical_filename":"tempdev","measurement_db_type":"AzureSQLDB","physical_filename":"tempdb.mdf","replica_updateability":"READ_WRITE","sql_instance":"adx-sql-server"} 2021-09-09T13:51:20Z {"current_size_mb":16,"database_id":2,"file_id":1,"read_bytes":2965504,"read_latency_ms":68,"reads":47,"rg_read_stall_ms":42,"rg_write_stall_ms":0,"space_used_mb":0,"write_bytes":1220608,"write_latency_ms":103,"writes":149}
sqlserver_waitstats {"database_name":"azure-sql-db2","host":"adx-vm","measurement_db_type":"AzureSQLDB","replica_updateability":"READ_WRITE","sql_instance":"adx-sql-server","wait_category":"Worker Thread","wait_type":"THREADPOOL"} 2021-09-09T13:51:20Z {"max_wait_time_ms":15,"resource_wait_ms":4469,"signal_wait_time_ms":0,"wait_time_ms":4469,"waiting_tasks_count":1464}

由于收集的指标对象是一种复杂类型,因此“字段”和“标记”列存储为动态数据类型。 可通过多种方式查询此数据,例如:

  • 直接查询 JSON 属性:可以采用原始格式查询 JSON 数据,而无需对其进行分析。

    示例 1

    Tablename
    | where name == "sqlserver_azure_db_resource_stats" and todouble(fields.avg_cpu_percent) > 7
    

    示例 2

    Tablename
    | distinct tostring(tags.database_name)
    

    注意

    使用大量数据时,此方法可能会影响性能。 在这种情况下,请使用更新策略方法。

  • 使用更新策略:使用更新策略转换动态数据类型列。 建议使用此方法查询大量数据。

    // Function to transform data
    .create-or-alter function Transform_TargetTableName() {
      SourceTableName
      | mv-apply fields on (extend key = tostring(bag_keys(fields)[0]))
      | project fieldname=key, value=todouble(fields[key]), name, tags, timestamp
    }
    
    // Create destination table with above query's results schema (if it doesn't exist already)
    .set-or-append TargetTableName <| Transform_TargetTableName() | take 0
    
    // Apply update policy on destination table
    .alter table TargetTableName policy update
    @'[{"IsEnabled": true, "Source": "SourceTableName", "Query": "Transform_TargetTableName()", "IsTransactional": true, "PropagateIngestionProperties": false}]'
    

Syslog 输入插件

下表展示了 Syslog 输入插件收集的示例指标数据:

name 标记 timestamp fields
syslog {"appname":"azsecmond","facility":"user","host":"adx-linux-vm","hostname":"adx-linux-vm","severity":"info"} 2021-09-20T14:36:44Z {"facility_code":1,"message":" 2021/09/20 14:36:44.890110 Failed to connect to mdsd: dial unix /var/run/mdsd/default_djson.socket: connect: no such file or directory","procid":"2184","severity_code":6,"timestamp":"1632148604890477000","version":1}
syslog {"appname":"CRON","facility":"authpriv","host":"adx-linux-vm","hostname":"adx-linux-vm","severity":"info"} 2021-09-20T14:37:01Z {"facility_code":10,"message":" pam_unix(cron:session): session opened for user root by (uid=0)","procid":"26446","severity_code":6,"timestamp":"1632148621120781000","version":1}

可通过多种方式使用 extend 运算符或 bag_unpack() 插件平展动态列。 在更新策略 Transform_TargetTableName() 函数中可以使用其中任一方式。

  • 使用 extend 运算符:建议使用此方法,因为它更快且可靠。 即使架构发生更改,也不会中断查询或仪表板。

    Tablename
    | extend facility_code=toint(fields.facility_code), message=tostring(fields.message), procid= tolong(fields.procid), severity_code=toint(fields.severity_code),
    SysLogTimestamp=unixtime_nanoseconds_todatetime(tolong(fields.timestamp)), version= todouble(fields.version),
    appname= tostring(tags.appname), facility= tostring(tags.facility),host= tostring(tags.host), hostname=tostring(tags.hostname), severity=tostring(tags.severity)
    | project-away fields, tags
    
  • 使用 bag_unpack() 插件:此方法会将动态类型列解压缩。 更改源架构可能会在动态扩展列时导致问题。

    Tablename
    | evaluate bag_unpack(tags, columnsConflict='replace_source')
    | evaluate bag_unpack(fields, columnsConflict='replace_source')