Compartir a través de

Azure 流分析的输出

Azure 流分析作业由输入、查询和输出构成。 可以将转换后的数据发送到多个输出类型。 本文列出了支持的流分析输出。 设计流分析查询时,使用 INTO 子句引用输出的名称。 可针对每个作业使用单个输出,也可通过向查询添加多个 INTO 子句,针对每个流式处理作业使用多个输出(如果需要)。

要创建、编辑和测试流分析作业输出,可使用 Azure 门户Azure PowerShell.NET APIREST API

某些输出类型支持分区,如下表所示。

所有输出都支持批处理,但仅部分输出显式支持设置输出批处理大小。 有关详细信息,请参阅输出批处理大小部分。

输出类型 分区 安全性
Azure Functions 访问密钥
Blob 存储和 Azure Data Lake Gen 2 访问密钥、
托管标识
Azure Data Lake Storage Gen 2 Microsoft Entra 用户
托管标识
Azure 事件中心 是,需要在输出配置中设置分区键列。 访问密钥、
托管标识
Kafka(预览版) 是,需要在输出配置中设置分区键列。 访问密钥、
托管标识
Azure Database for PostgreSQL 用户名和密码身份验证
Azure 服务总线队列 访问密钥、
托管标识
Azure 服务总线主题 访问密钥、
托管标识
Azure SQL 数据库 是,可选。 SQL 用户身份验证、
托管标识

重要

根据设计,Azure 流分析使用插入或替换 API。 此操作替换现有的实体或插入新的实体(如果在表中不存在)。

分区

流分析支持所有输出的分区。 有关分区键和输出编写器数目的详细信息,请参阅你感兴趣的特定输出类型的文章。 在上一节中链接了所有输出类型的文章。

另外,若要对分区进行更高级的优化,可以在查询中使用 INTO <partition count>(请参阅 INTO)子句来控制输出写入器的数量,这可能有助于实现所需的作业拓扑。 如果输出适配器未分区,则一个输入分区中缺少数据将导致延迟最多可达延迟到达的时间量。 在这种情况下,输出将合并到单个写入器,这可能会导致管道中出现瓶颈。 若要了解有关延迟到达策略的详细信息,请参阅 Azure 流分析事件顺序注意事项

输出批大小

所有输出都支持批处理,但仅部分输出显式支持设置批处理大小。 Azure 流分析使用大小可变的批来处理事件和写入到输出。 通常流分析引擎不会一次写入一条消息,而是使用批来提高效率。 当传入和传出事件的速率较高时,流分析将使用更大的批。 输出速率低时,使用较小的批来保证低延迟。

Avro 和 Parquet 文件拆分行为

流分析查询可以为给定输出生成多个架构。 投影的列列表及其类型可能在不同行之间发生变化。 根据设计,Avro 和 Parquet 格式不支持单个文件中的变量架构。

当使用下列格式将具有可变架构的流定向到输出时,可能会发生以下行为:

  • 如果可以检测到架构更改,则关闭当前输出文件,并在新架构上初始化一个新文件。 当架构更改频繁发生时,拆分文件会严重降低输出速度。 这种行为会严重影响作业的整体性能
  • 如果无法检测到架构更改,则很可能会拒绝该行,并且由于无法输出该行而导致作业卡住。 嵌套列或多类型数组是不会发现和拒绝的情况。

建议将使用 Avro 或 Parquet 格式的输出视为强类型或写时架构,并以此方式来编写针对它们的查询(统一模式的显式转换和投影)。

如果需要生成多个架构,请考虑使用 WHERE 子句创建多个输出并将记录拆分到每个目标。

Parquet 输出批处理窗口属性

使用 Azure 资源管理器模板部署或 REST API 时,两个批处理窗口属性为:

  1. timeWindow

    每批的最长等待时间。 该值应为 Timespan 的字符串。 例如,00:02:00 表示两分钟。 在此时间后,即使不满足最小行数要求,也会将该批写入输出。 默认值为 1 分钟,允许的最大值为 2 小时。 如果 Blob 输出具有路径模式频率,则等待时间不能超过分区时间范围。

  2. sizeWindow

    每批的最小行数。 对于 Parquet,每个批处理都将创建一个新文件。 当前默认值为 2000 行,允许的最大值为 10000 行。

这些批处理窗口属性仅受 API 版本 2017-04-01-preview 或更高版本支持。 以下是 REST API 调用的 JSON 有效负载示例:

"type": "stream",
      "serialization": {
        "type": "Parquet",
        "properties": {}
      },
      "timeWindow": "00:02:00",
      "sizeWindow": "2000",
      "datasource": {
        "type": "Microsoft.Storage/Blob",
        "properties": {
          "storageAccounts" : [
          {
            "accountName": "{accountName}",
            "accountKey": "{accountKey}",
          }
          ],

后续步骤