教程:使用表更新策略路由数据

当源数据涉及简单且快速的转换时,最好使用事件流在管道的上游执行这些转换。 但是,此方法可能不适用于复杂或需要专用功能才能运行的其他转换。

本教程介绍如何执行下列操作:

本教程中的示例演示了如何使用针对数据路由的更新策略来执行复杂的转换,以在引入时扩充、清理和转换数据。 有关其他常见用例的列表,请参阅表更新策略的常见用例

先决条件

  • Microsoft 帐户或 Microsoft Entra 用户标识。 无需 Azure 订阅。
  • Azure 数据资源管理器群集和数据库。 创建群集和数据库

1 - 创建表和更新策略

以下步骤将指导你创建源表、转换函数、目标表和更新策略。 本教程演示如何使用表更新策略执行复杂转换并将结果保存到一个或多个目标表。 该示例使用了一个名为“Raw_Table”的源表和三个分别名为“Device_Telemetry”、“Device_Alarms”和“Error_Log”的目标表

  1. 运行以下命令以创建名为“Raw_Table”的表。

    .create table Raw_Table (RawData: dynamic)
    

    该源表是保存引入的数据的位置。 该表具有一个名为“RawData”的动态类型的列。 动态类型用于按原样存储原始数据,而无需任何架构。 有关详细信息,请参阅 .create table 命令

  2. 运行以下命令,创建名为“Get_Telemetry”、“Get_Alarms”“Log_Error”的函数。

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    创建更新策略时,可以指定用于执行的内联脚本。 但是,我们建议将转换逻辑封装到函数中。 使用函数可改进代码维护。 新数据到达时,将执行函数来转换数据。 该函数可以跨多个更新策略重复使用。 有关详细信息,请参阅 .create function 命令

  3. 运行以下命令以创建目标表。

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    目标表的架构必须与转换函数的输出具有相同的架构。 可以通过以下方式创建目标表:

    • 使用 .create table 命令并手动指定架构,如创建“Device_Telemetry”表时所示。 但是,此方法可能容易出错且耗时。
    • 如果已创建用于转换数据的函数,请使用 .set-or-append 命令。 此方法将创建架构与函数输出相同的新表,使用 take 0 可确保该函数仅返回架构。 有关详细信息,请参阅 .set-or-append 命令
  4. 运行以下命令,为目标表创建更新策略

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    .alter table policy update 命令用于链接源表、转换函数和目标表。 更新策略将在目标表上创建,并指向源表和转换函数。 有关详细信息,请参阅 .alter table policy update 命令

2 - 引入示例数据

若要测试更新策略,可以使用 .set-or-append 命令将示例数据引入源表中。 有关详细信息,请参阅从查询引入数据

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 - 验证结果

若要验证结果,可以运行查询来验证数据是否已转换并路由到目标表。 在以下示例中,union 运算符用于将源表和目标表的结果合并到单个结果集中。

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

输出

你应该会看到以下输出,其中 Raw_Table 有三行,每个目标表各有一行。

TableName “行”
Raw_Table 3
Error_Log 1
Device_Alarms 1
Device_Telemetry 1

清理资源

在数据库中运行以下命令,清理在本教程中创建的表和函数。

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error