更新策略概述

适用于:✅Azure 数据资源管理器

更新策略是在将新数据写入表时触发的自动化机制。 更新策通过运行查询来转换引入的数据,无需特殊的业务流程,并将结果保存到目标表。 可以在单个表上定义多个更新策略,从而进行不同的转换并将数据同时保存到多个表。 目标表可以具有与源表不同的架构、保留策略和其他策略。

例如,高速率跟踪源表可以包含格式化为自由文本列的数据。 目标表可以包含特定类型的跟踪行,以及使用 parse 运算符转换源表的自由文本数据生成的结构良好的架构。 有关详细信息,请参阅常见方案

下图描绘了更新策略的概要视图。 它显示了在将数据添加到第二个源表时触发的两个更新策略。 触发后,转换的数据将添加到两个目标表中。

显示更新策略概述的图表。

更新策略遵循与常规引入相同的限制和最佳做法。 策略根据群集大小进行横向扩展,在处理批量引入时更加高效。

注意

  • 源表和目标表必须在同一数据库中。
  • 更新策略函数架构和目标表架构必须匹配其列类型和顺序。
  • 更新策略函数可以引用其他数据库中的表。 为此,必须使用 ManagedIdentity 属性定义更新策略,并且托管标识必须在引用的数据库中具有 viewer角色。 引入格式化的数据可以提高性能。由于 CSV 是定义明确的格式,因此首选这种格式。 但是,有时无法控制数据的格式,或者可能希望扩充引入的数据,例如,在数据库中将记录与静态维度表联接在一起。

更新策略查询

如果更新策略是在目标表上定义的,则可以对引入源表的数据运行多个查询。 如果有多个更新策略,则执行顺序不一定是已知的。

查询限制

  • 与策略相关的查询可以调用存储的函数,但:
    • 它无法执行跨群集查询。
    • 它无法访问外部数据或外部表。
    • 它无法进行标注(通过使用插件)。
  • 查询对启用了 RestrictedViewAccess 策略的表没有读取访问权限。
  • 有关流式引入中的更新策略限制,请参阅流式引入限制

警告

不正确的查询会阻止将数据引入源表。 请务必注意,这些限制以及查询结果与源表和目标表的架构之间的兼容性会导致不正确的查询,以阻止将数据引入源表。

这些限制在创建和执行策略期间会得到验证,但在更新查询可能引用的任意存储函数时不会得到验证。 因此,进行任何更改请务必谨慎,以确保更新策略保持不变。

在策略的 Source 部分中或由 Query 部分引用的函数中引用 Query 表时:

  • 不要使用表的限定名称。 请改用 TableName
  • 不要使用 database("<DatabaseName>").TableNamecluster("<ClusterName>").database("<DatabaseName>").TableName

更新策略对象

一个表可以具有零个或多个与之关联的更新策略对象。 每一个此类对象都表示为 JSON 属性包,并定义了以下属性。

properties 类型 说明
已启用 bool 说明是启用 (true) 还是禁用 (false) 了更新策略
string 触发调用更新策略的表的名称。
SourceIsWildCard bool 如果 为 true,则 Source 属性可以是通配符模式。 请参阅 源表通配符模式的更新策略
查询 string 用于生成更新数据的查询。
IsTransactional bool 说明更新策略是否为事务性的(默认为 false)。 如果策略是事务性策略且更新策略失败,则不会更新源表。
PropagateIngestionProperties bool 说明在引入源表期间指定的属性(例如分片标记和创建时间)是否适用于目标表。
托管标识 string 以其身份运行更新策略的托管标识。 托管标识可以是对象 ID,也可以是 system 保留字。 当查询引用其他数据库中的表,或启用了行级别安全策略的表时,必须使用托管标识配置更新策略。 有关详细信息,请参阅使用托管标识以运行更新策略

注意

在生产系统中,将 IsTransactional 设置为“true”可确保目标表不会在暂时性故障中丢失数据。

注意

允许级联更新,例如从表 A 到表 B 到表 C。但是,如果以循环方式定义更新策略,则在运行时会检测到此情况,并且会切断更新链。 对于链中的每个表,数据只会引入一次。

管理命令

更新策略管理命令包括:

更新策略在引入后启动

将数据引入或移动到源表,或在源表中创建扩展时,更新策略会生效。 这些操作可以通过以下任意命令完成:

警告

将更新策略作为 .set-or-replace 命令的一部分进行调用时,默认情况下,派生表中的数据也会被替换,就像在源表中一样。 如果调用 replace 命令,则具有更新策略关系的所有表中的数据可能会丢失。 请考虑改用 .set-or-append

使用源表通配符模式更新策略

更新策略支持从共享相同模式的多个源表引入,同时使用与更新策略查询相同的查询。 如果有多个源表,通常共享同一架构(或共享通用架构的列的子集),并且想要在引入到其中任一表时触发引入到单个目标表,这非常有用。 在这种情况下,可以定义具有通配符 Source的单个更新策略,而不是为单个源表定义多个更新策略。 Query 更新策略必须符合与模式匹配的所有源表。 若要在更新策略查询中引用源表,可以使用名为 <a0/> 的特殊符号。 请参阅 通配符更新策略示例中的示例。

从源表中删除数据

将数据引入目标表后,可以选择将其从源表中删除。 请在源表的0sec中将软删除时间段设置义为 00:00:00(或 ),并将更新策略设置为“事务性”。 下列条件适用:

  • 无法从源表查询源数据
  • 引入操作期间,源数据不会永久保存到持久性存储中
  • 操作性能提高。 在源表的分片上,为后台清理操作减少了引入后的资源。

注意

当源表的软删除期为 0sec(或 00:00:00)时,引用此表的任何更新策略都必须是事务性。

性能影响

更新策略可能会影响性能,并且数据盘区的引入将乘以目标表的数量。 优化与策略相关的查询非常重要。 可以在创建或更改策略之前或在查询中使用的函数上,调用现有分片上的策略来测试更新策略的性能影响。

评估资源使用情况

.show queries 与以下参数结合使用来评估资源使用情况(CPU、内存等):

  • Source 属性(源表名称)设置为 MySourceTable
  • Query 属性设置为调用名称为 MyFunction() 的函数
// '_extentId' is the ID of a recently created extent, that likely hasn't been merged yet.
let _extentId = toscalar(
    MySourceTable
    | project ExtentId = extent_id(), IngestionTime = ingestion_time()
    | where IngestionTime > ago(10m)
    | top 1 by IngestionTime desc
    | project ExtentId
);
// This scopes the source table to the single recent extent.
let MySourceTable =
    MySourceTable
    | where ingestion_time() > ago(10m) and extent_id() == _extentId;
// This invokes the function in the update policy (that internally references `MySourceTable`).
MyFunction

事务性设置

更新策略 IsTransactional 设置定义了更新策略是否是事务性策略,并且可影响策略更新的行为,如下所示:

  • IsTransactional:false:如果该值设置为默认值“false”,则更新策略不能保证源表和目标表中数据之间的一致性。 如果更新策略失败,则仅将数据引入源表,而不引入目标表。 在这种情况下,引入操作是成功的。
  • IsTransactional:true:如果该值设置为“true”,则此设置可以保证源表和目标表中的数据之间的一致性。 如果更新策略失败,则不会将数据引入源表或目标表。 在这种情况下,引入操作是不成功的。

处理故障

策略更新失败时,会根据 IsTransactional 设置是 true 还是 false 进行不同的处理。 更新策略失败的常见原因如下:

  • 查询输出架构与目标表不匹配。
  • 任何查询错误。

可以使用将 .show ingestion failures 命令与以下命令结合使用来查看策略更新失败:在任何其他情况下,都可以手动重试引入。

.show ingestion failures
| where FailedOn > ago(1hr) and OriginatesFromUpdatePolicy == true

提取、转换和加载的示例

可以使用“更新策略设置”执行提取、转换和加载 (ETL)。

在本示例中,我们将更新策略与简单函数结合使用来执行 ETL。 首先创建两个表:

  • 源表 - 包含引入数据的单个字符串类型列。
  • 目标表 - 包含所需的架构。 更新策略是在此表上定义的。
  1. 让我们创建源表:

    .create table MySourceTable (OriginalRecord:string)
    
  2. 接下来创建目标表:

    .create table MyTargetTable (Timestamp:datetime, ThreadId:int, ProcessId:int, TimeSinceStartup:timespan, Message:string)
    
  3. 然后创建一个函数来提取数据:

    .create function
     with (docstring = 'Parses raw records into strongly-typed columns', folder = 'UpdatePolicyFunctions')
         ExtractMyLogs()
        {
        MySourceTable
        | parse OriginalRecord with "[" Timestamp:datetime "] [ThreadId:" ThreadId:int "] [ProcessId:" ProcessId:int "] TimeSinceStartup: " TimeSinceStartup:timespan " Message: " Message:string
        | project-away OriginalRecord
    }
    
  4. 现在,将更新策略设置为调用创建的函数:

    .alter table MyTargetTable policy update
    @'[{ "IsEnabled": true, "Source": "MySourceTable", "Query": "ExtractMyLogs()", "IsTransactional": true, "PropagateIngestionProperties": false}]'
    
  5. 若要在将数据引入目标表后清空源表,请定义源表上的保留策略,使它将 0s 作为其 SoftDeletePeriod

     .alter-merge table MySourceTable policy retention softdelete = 0s
    

通配符更新策略示例

以下示例在表中 TargetTable创建一个包含单个条目的更新策略。 该策略引用所有匹配模式 SourceTable* 的表作为其源。 与模式(在本地数据库中)匹配的表的任何引入都将根据更新策略查询触发更新策略,并将数据引入到 TargetTable该表。

  1. 创建两个源表:

    .create table SourceTable1(Id:long, Value:string)
    
    .create table SourceTable2(Id:long, Value:string)
    
  2. 创建目标表:

    .create table TargetTable(Id:long, Value:string, Source:string)
    
  3. 创建将用作 Query 更新策略的函数。 该函数使用 $source_table 符号来引用 Source 更新策略。 用于 skipValidation=true 在创建函数期间跳过验证,因为在 $source_table 更新策略执行期间才知道。 更改更新策略时,在下一步中验证该函数。

    .create function with(skipValidation=true) IngestToTarget()
    {
        $source_table 
        | parse Value with "I'm from table " Source
        | project Id, Value, Source
    }
    
  4. 创建更新策略。TargetTable 该策略引用所有匹配模式 SourceTable* 的表作为其源。

        .alter table TargetTable policy update
        ```[{ 
                "IsEnabled": true, 
                "Source": "SourceTable*", 
                "SourceIsWildCard" : true,
                "Query": "IngestToTarget()",
                "IsTransactional": true,
                "PropagateIngestionProperties": true
        }]```
    
    
  5. 引入源表。 这两个引入都触发更新策略:

    .set-or-append SourceTable1 <| 
        datatable (Id:long, Value:string)
        [
            1, "I'm from table SourceTable1",
            2, "I'm from table SourceTable1"
        ]
    
    .set-or-append SourceTable2 <| 
        datatable (Id:long, Value:string)
        [
            3, "I'm from table SourceTable2",
            4, "I'm from table SourceTable2"
        ]
    
  6. 查询 TargetTable

     TargetTable
    
    Id 价值
    1 我来自表 SourceTable1 SourceTable1
    2 我来自表 SourceTable1 SourceTable1
    3 我来自表 SourceTable2 SourceTable2
    4 我来自表 SourceTable2 SourceTable2