使用 “版本 ”下拉列表切换服务。 了解有关导航的详细信息。
适用于:✅ Azure Data Explorer
如果源数据涉及简单且快速的转换,请使用事件流在管道中上游执行这些转换。 但是,此方法可能不适用于复杂或需要专用功能才能运行的其他转换。
本教程介绍如何执行下列操作:
本教程中的示例演示了如何使用针对数据路由的更新策略来执行复杂的转换,以在引入时扩充、清理和转换数据。 有关其他常见用例的列表,请参阅表更新策略的常见用例。
先决条件
- Microsoft 帐户或 Microsoft Entra 用户标识。 不需要Azure订阅。
- Azure Data Explorer群集和数据库。 创建群集和数据库
步骤 1 - 创建表和更新策略
以下步骤将指导你创建源表、转换函数、目标表和更新策略。 本教程演示如何使用表更新策略执行复杂转换并将结果保存到一个或多个目标表。 该示例使用了一个名为“Raw_Table”的源表和三个分别名为“Device_Telemetry”、“Device_Alarms”和“Error_Log”的目标表。
运行以下命令以创建名为“Raw_Table”的表。
.create table Raw_Table (RawData: dynamic)源表是保存导入数据的位置。 该表有一个名为RawData的单列,其数据类型为动态。 动态类型按原样存储原始数据,没有任何架构。 有关详细信息,请参阅 .create table 命令。
运行以下命令,创建名为 Get_Telemetry、 Get_Alarms和 Log_Error的函数。
.execute database script <| .create-or-alter function Get_Telemetry() { Raw_Table | where todynamic(RawData).MessageType == 'Telemetry' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceType), SensorName = tostring(RawData.SensorName), SensorValue = toreal(RawData.SensorValue), SensorUnit = tostring(RawData.SensorUnit) | project-away RawData } .create-or-alter function Get_Alarms() { Raw_Table | where RawData.MessageType == 'Alarms' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceTpe) , AlarmType = tostring(RawData.AlarmType) | project-away RawData } .create-or-alter function Log_Error() { Raw_Table | where RawData.MessageType !in ('Telemetry', 'Alarms') | extend TimeStamp = datetime(now), ErrorType = 'Unknown MessageType' | project TimeStamp, RawData, ErrorType }创建更新策略时,可以指定用于执行的内联脚本。 但是,将转换逻辑封装到函数中。 使用函数可改进代码维护。 新数据到达时,函数将执行该函数来转换数据。 该函数可以跨多个更新策略重复使用。 有关详细信息,请参阅 .create function 命令。
运行以下命令以创建目标表。
.execute database script <| .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string) .set-or-append Device_Alarms <| Get_Alarms | take 0 .set-or-append Error_Log <| Log_Error | take 0目标表的架构必须与转换函数的输出具有相同的架构。 可以通过以下方式创建目标表:
- 请使用
.create table命令并手动指定架构,如创建Device_Telemetry表所示。 但是,此方法可能容易出错且耗时。 -
.set-or-append如果已创建函数来转换数据,请使用该命令。 此方法使用take 0与函数输出相同的架构创建一个新表,以确保该函数仅返回架构。 有关详细信息,请参阅 .set-or-append 命令。
- 请使用
运行以下命令,为目标表创建更新策略。
.execute database script <| .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]".alter table policy update使用命令链接源表、转换函数和目标表。 在目标表上创建更新策略,并指定源表和转换函数。 有关详细信息,请参阅 .alter table policy update 命令。
步骤 2 - 引入示例数据
若要测试更新策略,请使用 .set-or-append 命令将示例数据引入源表。 有关详细信息,请参阅从查询引入数据。
.set-or-append Raw_Table <|
let Raw_Stream = datatable(RawData: dynamic)
[
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
];
Raw_Stream
步骤 3 - 验证结果
若要验证结果,请运行查询以验证数据是否已转换并路由到目标表。 在以下示例中, union 运算符将目标表中的源和结果合并到单个结果集中。
Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc
输出
你应该会看到以下输出,其中 Raw_Table 有三行,每个目标表各有一行。
| TableName (表名称) | 行 |
|---|---|
| 原始表格 | 3 |
| 错误日志 | 1 |
| 设备报警 | 1 |
| 设备遥测 | 1 |
步骤 4 - 清理资源
在数据库中运行以下命令,清理在本教程中创建的表和函数。
.execute database script <|
.drop table Raw_Table
.drop table Device_Telemetry
.drop table Device_Alarms
.drop table Error_Log
.drop function Get_Telemetry
.drop function Get_Alarms
.drop function Log_Error