更新策略概述

使用将数据添加到源表的命令触发更新策略时,数据还会追加到目标表。 目标表可以具有与源表不同的架构、保留策略和其他策略。 例如,高速率跟踪源表可以包含格式化为自由文本列的数据。 目标表可以包含特定类型的跟踪行,以及使用 parse 运算符转换源表的自由文本数据生成的结构良好的架构。

Diagram shows an overview of the update policy.

更新策略遵循与常规引入相同的限制和最佳做法。 策略根据群集大小进行横向扩展,在处理批量引入时更加高效。

注意

源表和目标表必须在同一数据库中。 更新策略函数架构和目标表架构必须具有匹配的列名、类型和顺序。

引入格式化的数据可以提高性能。由于 CSV 是定义明确的格式,因此首选这种格式。 但是,有时无法控制数据的格式,或者可能希望扩充引入的数据,例如,在数据库中将记录与静态维度表联接在一起。

更新策略查询

如果更新策略是在目标表上定义的,则可以对引入源表的数据运行多个查询。 如果有多个更新策略,则执行顺序不一定是已知的。

查询限制

  • 与策略相关的查询可以调用存储的函数,但:
    • 它无法执行跨群集查询。
    • 它无法访问外部数据或外部表。
    • 它无法进行标注(通过使用插件)。
  • 查询对启用了 RestrictedViewAccess 策略的表没有读取访问权限。
  • 有关流式引入中的更新策略限制,请参阅流式引入限制

警告

不正确的查询可能会阻止将数据引入源表。

上述限制以及查询结果与源表和目标表的架构之间的“兼容性”可能会导致不正确的查询,以阻止将数据引入源表。

这些限制在创建和执行策略时会得到验证,但在更新查询可能引用的任意存储函数时不会得到验证。 因此,在进行此类更改时,必须注意保持更新策略的完整性。

在策略的 Query 部分中或由 Query 部分引用的函数中引用 Source 表时:

  • 不要使用表的限定名称。 请改用 TableName
  • 不要使用 database("DatabaseName").TableNamecluster("ClusterName").database("DatabaseName").TableName

更新策略对象

一个表可能具有零个或多个与之关联的更新策略对象。 每一个此类对象都表示为 JSON 属性包,并定义了以下属性。

properties 类型​​ 说明
IsEnabled bool 说明是启用 (true) 还是禁用 (false) 了更新策略
string 触发更新策略调用的表的名称
查询 string 用于生成更新数据的查询
IsTransactional bool 说明更新策略是否为事务性的(默认为 false)。 如果事务性和更新策略失败,则不会更新源表。
PropagateIngestionProperties bool 说明在引入源表期间指定的属性(例如分片标记和创建时间)是否适用于目标表。
托管标识 string 将以其身份运行更新策略的托管标识。 托管标识可以是对象 ID,也可以是 system 保留字。 当查询引用其他数据库中的表,或启用了行级别安全策略的表时,必须使用托管标识配置更新策略。 有关详细信息,请参阅使用托管标识以运行更新策略

注意

在生产系统中,将 IsTransactional 设置为“true”可确保目标表不会在暂时性故障中丢失数据。

注意

允许级联更新,例如从表 A 到表 B 到表 C。但是,如果以循环方式定义更新策略,则在运行时会检测到此情况,并且会切断更新链。 对于链中的每个表,数据只会引入一次。

管理命令

更新策略管理命令包括:

更新策略在引入后启动

使用以下任意命令将数据引入或移动到源表(或在原表中创建的分片)时,更新策略会生效:

警告

将更新策略作为 .set-or-replace 命令的一部分进行调用时,默认情况下,派生表中的数据也会被替换,就像在源表中一样。 如果调用 replace 命令,则具有更新策略关系的所有表中的数据可能会丢失。 请考虑改用 .set-or-append

从源表中删除数据

将数据引入目标表后,可能需要将其从源表中删除。 请在源表的保留策略中将软删除时间段设置义为 0sec(或 00:00:00),并将更新策略设置为“事务性”。 下列条件适用:

  • 无法从源表查询源数据
  • 引入操作期间,源数据不会永久保存到持久性存储中
  • 操作性能提高。 在源表的分片上,为后台清理操作减少了引入后的资源。

注意

当源表的软删除期为 0sec(或 00:00:00)时,引用此表的任何更新策略都必须是事务性。

性能影响

更新策略可能会影响群集性能,并且数据分片的引入将乘以目标表的数量。 优化与策略相关的查询非常重要。 可以在创建或更改策略之前或在查询中使用的函数上,调用现有分片上的策略来测试更新策略的性能影响。

评估资源使用情况

.show queries 与以下参数结合使用来评估资源使用情况(CPU、内存等):

  • Source 属性(源表名称)设置为 MySourceTable
  • Query 属性设置为调用名称为 MyFunction() 的函数
// '_extentId' is the ID of a recently created extent, that likely hasn't been merged yet.
let _extentId = toscalar(
    MySourceTable
    | project ExtentId = extent_id(), IngestionTime = ingestion_time()
    | where IngestionTime > ago(10m)
    | top 1 by IngestionTime desc
    | project ExtentId
);
// This scopes the source table to the single recent extent.
let MySourceTable = 
    MySourceTable
    | where ingestion_time() > ago(10m) and extent_id() == _extentId;
// This invokes the function in the update policy (that internally references `MySourceTable`).
MyFunction

失败数

使用默认的 IsTransactional:false 设置,即使策略未运行,仍可以将数据引入源表。

设置 IsTransactional:true 可确保源表和目标表之间的数据的一致性。 但是,如果策略条件失败,数据不会引入源表。 或者,根据条件,有时数据会引入源表,但不会引入目标表。 但是,如果策略定义不正确,或者架构不匹配,数据不会引入源表或目标表。 例如,从目标表中删除列可能会导致查询输出架构与目标表之间不匹配。

可以使用 .show ingestion failures 命令来查看故障。

.show ingestion failures 
| where FailedOn > ago(1hr) and OriginatesFromUpdatePolicy == true

故障处理

非事务性策略

设置为 IsTransactional:false 时,将忽略运行策略的任何故障。 不会自动重试引入操作。 可以手动重试引入操作。

事务性策略

设置为 IsTransactional:true 时,如果引入方法为 pull,则涉及到数据管理服务,并且会根据以下条件自动重试引入操作:

  • 将执行重试,直到满足以下可配置限制设置之一:DataImporterMaximumRetryPeriodDataImporterMaximumRetryAttempts
  • 默认情况下,DataImporterMaximumRetryPeriod 设置为 2 天,并且 DataImporterMaximumRetryAttempts 为 10
  • 回退时间段从 2 分钟开始并将翻倍。 因此,等待时间从 2 分钟开始,然后增加到 4 分钟、8 分钟、16 分钟,以此类推。

在任何其他情况下,可以手动重试引入操作。

提取、转换和加载的示例

可以使用“更新策略设置”执行提取、转换和加载 (ETL)。

在本示例中,我们将更新策略与简单函数结合使用来执行 ETL。 首先创建两个表:

  • 源表 - 包含引入数据的单个字符串类型列。
  • 目标表 - 包含所需的架构。 更新策略是在此表上定义的。
  1. 让我们创建源表:

    .create table MySourceTable (OriginalRecord:string) 
    
  2. 接下来创建目标表:

    .create table MyTargetTable (Timestamp:datetime, ThreadId:int, ProcessId:int, TimeSinceStartup:timespan, Message:string)
    
  3. 然后创建一个函数来提取数据:

    .create function
     with (docstring = 'Parses raw records into strongly-typed columns', folder = 'UpdatePolicyFunctions')
         ExtractMyLogs()  
        {
        MySourceTable
        | parse OriginalRecord with "[" Timestamp:datetime "] [ThreadId:" ThreadId:int "] [ProcessId:" ProcessId:int "] TimeSinceStartup: " TimeSinceStartup:timespan " Message: " Message:string
        | project-away OriginalRecord
    }
    
  4. 现在,将更新策略设置为调用创建的函数:

    .alter table MyTargetTable policy update 
    @'[{ "IsEnabled": true, "Source": "MySourceTable", "Query": "ExtractMyLogs()", "IsTransactional": true, "PropagateIngestionProperties": false}]'
    
  5. 若要在将数据引入目标表后清空源表,请定义源表上的保留策略,使它将 0s 作为其 SoftDeletePeriod

     .alter-merge table MySourceTable policy retention softdelete = 0s