Compartilhar via

Azure 流分析的输出

Azure 流分析作业由输入、查询和输出构成。 可以将转换后的数据发送到多个输出类型。 本文列出了支持的流分析输出。 设计流分析查询时,请使用 INTO 子句引用输出的名称。 可以为每个作业使用单个输出,或者通过向查询添加多个 INTO 子句,为每个流式作业使用多个输出。

若要创建、编辑和测试流分析作业输出,可以使用 Azure 门户Azure PowerShell.NET APIREST API

某些输出类型支持 分区 ,如下表所示。

所有输出都支持批处理,但只有一些支持显式设置输出批大小。 有关详细信息,请参阅 输出批大小 部分。

输出类型 Partitioning 安全性
Azure Functions 是的 访问密钥
Blob 存储和 Azure Data Lake Gen 2 是的 访问密钥,
托管标识
Azure Data Lake Storage Gen 2 是的 Microsoft Entra 用户
托管标识
Azure 事件中心 是的,需要在输出配置中设置分区键列。 访问密钥,
托管标识
Kafka (预览版) 是的,需要在输出配置中设置分区键列。 访问密钥,
托管标识
Azure Database for PostgreSQL 是的 用户名和密码身份验证
Azure 服务总线队列 是的 访问密钥,
托管标识
Azure 服务总线主题 是的 访问密钥,
托管标识
Azure SQL 数据库 是,可选。 SQL 用户身份验证、
托管标识

重要

Azure 流分析按设计使用“插入”或“替换 API”。 此作将替换现有实体,或者插入新实体(如果表中不存在)。

Partitioning

流分析支持除 Power BI 之外的所有输出的分区。 有关分区键和输出编写器数的详细信息,请参阅文章以了解感兴趣的特定输出类型。 上一部分中链接了与输出类型相关的文章。

若要对分区进行更高级的优化,可以在查询中使用 INTO <partition count> (see INTO) 子句来控制输出编写器的数量。 此控件可帮助你实现所需的作业拓扑。 如果未对输出适配器进行分区,则一个输入分区中缺少数据会导致延迟,最长可达延迟到达时间。 在这种情况下,输出将合并到单个写入器,这可能会导致管道中出现瓶颈。 若要详细了解延迟到达策略,请参阅 Azure 流分析事件顺序注意事项

输出批大小

所有输出都支持批处理,但仅某些支持显式设置批大小。 Azure 流分析使用可变大小的批处理来处理事件并写入输出。 通常,流分析引擎不会一次写入一条消息,并使用批处理提高效率。 当传入和传出事件的速率较高时,流分析使用较大的批处理。 当出口速率较低时,它会使用更小的批次以保持较低的延迟。

Avro 和 Parquet 文件拆分行为

流分析查询可以为给定输出生成多个架构。 投影列及其类型的列表可以逐行更改。 根据设计,Avro 和 Parquet 格式不支持单个文件中的变量架构。

当使用以下格式将具有变量架构的流定向到输出时,可能会出现以下行为:

  • 如果可以检测到架构更改,将关闭当前输出文件,并在新架构上初始化一个新文件。 当架构更改频繁发生时,拆分文件会严重降低输出速度。 此行为可能会严重影响作业的整体性能。
  • 如果无法检测到架构更改,则行很可能被拒绝,并且作业停滞不前,因为无法输出该行。 嵌套列或多类型数组是未被发现和拒绝的情况。

使用 Avro 或 Parquet 格式时,考虑它们为强类型或写时架构的输出,并通过显式转换和投影编写针对它们的查询,以实现统一的架构。

如果需要生成多个架构,请考虑使用 WHERE 子句创建多个输出并将记录拆分到每个目标。

Parquet 输出批处理窗口属性

使用 Azure 资源管理器模板部署或 REST API 时,设置两个批处理窗口属性:

  1. timeWindow

    每个批处理的最大等待时间。 将值设置为字符串Timespan。 例如,使用 00:02:00 两分钟。 在此之后,即使不满足最小行要求,批处理也会写入输出。 默认值为 1 分钟,允许的最大值为 2 小时。 如果 Blob 输出具有路径模式频率,则等待时间不能高于分区时间范围。

  2. sizeWindow

    每批次的最小行数。 对于 Parquet,每一批都会创建一个新文件。 当前默认值为 2,000 行,允许的最大值为 10,000 行。

API 版本 2017-04-01-preview 或更高版本支持这些批处理窗口属性。 下面是 REST API 调用的 JSON 有效负载示例:

"type": "stream",
      "serialization": {
        "type": "Parquet",
        "properties": {}
      },
      "timeWindow": "00:02:00",
      "sizeWindow": "2000",
      "datasource": {
        "type": "Microsoft.Storage/Blob",
        "properties": {
          "storageAccounts" : [
          {
            "accountName": "{accountName}",
            "accountKey": "{accountKey}",
          }
          ],

后续步骤