将数据作为流分析的输入进行流式传输

流分析与Azure数据流集成,作为来自五种资源的输入:

这些输入资源可以存在于与流分析作业相同的Azure订阅中,也可以存在于其他订阅中。

压缩

流分析支持对所有输入源进行压缩。 支持的压缩类型为:None、Gzip 和 Deflate。 参考数据不支持压缩。 如果输入数据是经过压缩的 Avro 数据,流分析服务会自动处理。 不需要通过 Avro 序列化指定压缩类型。

创建、编辑或测试输入

可以使用 Azure 门户来添加和查看或编辑流式处理作业上的现有输入。 还可以基于 Azure 门户的示例数据测试输入连接和测试查询。 编写查询时,将在 FROM 子句中列出输入。 可以在门户的“查询”页中获取可用输入的列表。 如果要使用多个输入,可以将它们合在一起或编写多个 JOIN 查询。

从事件中心对数据进行流式传输

Azure 事件中心是具有高扩展性的发布-订阅事件引入器。 事件中心每秒可收集数百万个事件,使你能够处理和分析互连设备与应用程序生成的海量数据。 事件中心和流分析共同为实时分析提供端到端解决方案。 事件中心实时将事件馈送到Azure,流分析作业实时处理这些事件。 例如,用户可以将 Web 点击操作、传感器读数或联机日志事件发送到事件中心。 然后可以创建流分析作业,将事件中心用作输入数据,以便进行实时筛选、聚合和关联操作。

EventEnqueuedUtcTime 是事件到达事件中心的时间戳,也是事件从事件中心发送到流分析的默认时间戳。 若要在事件有效负载中使用时间戳以流方式处理数据,必须使用 TIMESTAMP BY 关键字。

事件中心使用者组

使用自己的使用者组配置每个事件中心输入。 当作业包含自联接或有多个输入时,下游多个读取器可能会读取某些输入。 这种情况会影响单个使用者组中的读取器数量。 若要避免超出每个分区每个使用者组 5 个读取器的事件中心限制,请为每个流分析作业指定一个使用者组。 此外,标准层事件中心还有一个限制,即最多支持 20 个消费者组。 有关详细信息,请参阅排查 Azure 流分析数据输入问题

从事件中心创建输入

下表介绍了 Azure 门户的“新输入”页中用于从事件中心流式传输数据输入的每个属性:

属性 说明
输入别名 在作业查询中用于引用此输入的友好名称。
订阅 选择 Azure 事件中心资源所在的订阅。
事件中心命名空间 事件中心命名空间是事件中心的容器。 创建事件中心后,另请创建命名空间。
事件中心名称 要用作输入的事件中心的名称。
事件中心使用者组(推荐) 为每个流分析作业使用不同的使用者组。 此字符串用于标识从事件中心引入数据的使用者组。 如果未指定使用者组,流分析作业将使用 $Default 使用者组。
身份验证模式 指定要用于连接到事件中心的身份验证类型。 使用连接字符串或托管标识在事件中心进行身份验证。 对于托管标识选项,可以为流分析作业创建系统分配的托管标识,也可以创建用户分配的托管标识,以向事件中心进行身份验证。 使用托管标识时,托管标识必须是 Azure 事件中心数据接收者或 Azure 事件中心数据所有者角色的成员。
事件中心策略名称 用于提供对事件中心的访问权限的共享访问策略。 每个共享访问策略具有名称、所设权限以及访问密钥。 除非选择手动提供事件中心设置的选项,否则将自动填充此选项。
分区键 仅当将作业配置为使用 兼容级别 1.2 或更高版本时,此可选字段才可用。 如果输入按属性进行分区,请在此处添加此属性的名称。 如果您的查询包含此属性上的PARTITION BYGROUP BY子句,使用它来提高查询的性能。 如果此作业使用兼容性级别 1.2 或更高版本,则此字段默认为 PartitionId.
事件序列化格式 传入数据流的序列化格式(JSON、CSV、Avro)。 确保 JSON 格式与规范保持一致,并且不包括十进制数字的前导 0。
编码 目前只支持 UTF-8 这种编码格式。
事件压缩类型 用于读取传入数据流的压缩类型,例如None(默认)、Gzip 或 Deflate。
模式注册表 选择架构注册表,其中包含从事件中心接收的事件数据的架构。

当数据来自事件中心流输入时,可以访问流分析查询中的以下元数据字段:

属性 说明
EventProcessedUtcTime 流分析处理事件的日期和时间。
EventEnqueuedUtcTime 事件中心接收事件的日期和时间。
PartitionId 输入适配器的分区 ID(从零开始)。

通过使用这些字段,可以编写如下示例所示的查询:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

注意

使用事件中心作为IoT Hub路由的终结点时,可以使用 GetMetadataPropertyValue 函数访问IoT Hub元数据

从 IoT 中心流式传输数据

Azure IoT 中心是一种高度可扩展的发布-订阅事件处理器,专为 IoT 方案优化。

在流分析中,来自 IoT 中心的事件的默认时间戳是事件到达 IoT 中心的时间戳,即 EventEnqueuedUtcTime。 若要使用事件有效负载中的时间戳以流的形式处理数据,请使用 TIMESTAMP BY 关键字。

IoT Hub使用者组

将流分析的每个 IoT Hub 输入配置为拥有自己的使用者组。 当作业包含自联接或具有多个输入时,多个读取器可能会读取某些输入。 这种情况会影响单个使用者组中的读取器数量。 为了避免超出 Azure IoT Hub 对每个使用者组每个分区的 5 个读取器限制,请为每个流分析作业指定一个使用者组。

将 IoT 中心配置为数据流输入

下表介绍了将 IoT 中心配置为流输入时,Azure 门户的“新输入”页中的每个属性。

属性 说明
输入别名 在作业查询中用于引用此输入的友好名称。
订阅 选择 IoT Hub 资源所在的订阅。
IoT 中心 可用作输入的 IoT 中心的名称。
使用者组 为每个流分析作业使用不同的使用者组。 使用者组从IoT Hub引入数据。 流分析使用 $Default 消费组,除非另有指定。
共享访问策略名称 提供对 IoT 中心的访问权限的共享访问策略。 每个共享访问策略具有名称、所设权限以及访问密钥。
共享访问策略密钥 用于授予对 IoT 中心的访问权限的共享访问密钥。 除非选择手动提供IoT Hub设置的选项,否则会自动填充此选项。
终结点 IoT 中心端点。
分区键 仅当将作业配置为使用 兼容级别 1.2 或更高版本时,该字段才可用。 如果按属性对输入进行分区,可以在此处添加此属性的名称。 如果你的查询包含此属性的 PARTITION BY 或 GROUP BY 子句,则此属性用于提高该查询性能。 如果此作业使用兼容性级别 1.2 或更高版本,则此字段默认为“PartitionId”。
事件序列化格式 传入数据流的序列化格式(JSON、CSV、Avro)。 确保 JSON 格式与规范保持一致,并且不包括十进制数字的前导 0。
编码 目前只支持 UTF-8 这种编码格式。
事件压缩类型 用于读取传入数据流的压缩类型,例如None(默认)、Gzip 或 Deflate。

如果使用的流数据来自 IoT 中心,则可以在流分析查询中访问以下元数据字段:

属性 说明
EventProcessedUtcTime 处理事件的日期和时间。
EventEnqueuedUtcTime IoT 中心接收事件的日期和时间。
PartitionId 输入适配器的以零为基础的分区 ID。
IoTHub.MessageId 用于关联 IoT 中心内的双向通信的 ID。
IoTHub.CorrelationId 用于 IoT 中心内的消息响应和反馈的 ID。
IoTHub.ConnectionDeviceId 用于发送此消息的身份验证 ID。 IoT Hub在服务绑定消息上标记此值。
IoTHub.ConnectionDeviceGenerationId 用于发送此消息的经过身份验证设备的生成 ID。 IoT Hub在面向服务的消息上标记此值。
IoTHub.EnqueuedTime IoT 中心接收消息的时间。

从 Blob 存储或 Data Lake Storage Gen2 流式传输数据

对于涉及在云中存储大量数据的方案,Azure Blob 存储或Azure Data Lake Storage Gen2提供了经济高效且可缩放的解决方案。 Blob 存储或 Azure Data Lake Storage Gen2 中的数据被视为静态数据。 但是,流分析可以将此数据作为数据流进行处理。

将此类输入用于流分析的常用方案是日志处理。 在此方案中,你将从系统捕获遥测数据文件,并需要分析和处理这些文件以提取有意义的数据。

流分析中 Blob 存储或 Azure Data Lake Storage Gen2 事件的默认时间戳是上次修改该事件时的时间戳,即 BlobLastModifiedUtcTime。 如果在13:00将blob上传到存储帐户,并在13:01使用Now选项启动 Azure Stream Analytics 作业,作业不会处理该 blob,因为该文件的修改时间超出了作业的运行时间窗口。

如果在 13:00 时将 blob 上传到存储帐户容器,并使用 Custom Time 在 13:00 或更早的时间启动 Azure Stream Analytics 作业,则作业会选取该 blob,因为该 blob 修改的时间在作业运行期间内。

如果在 13:00 使用 Now 启动 Azure Stream Analytics 作业,并在 13:01 将 blob 上传到存储帐户容器,Azure Stream Analytics 就会处理该 blob。 分配给每个 blob 的时间戳仅基于 BlobLastModifiedTime。 blob 所在的文件夹与分配的时间戳无关。 例如,如果有一个 blob 2019/10-01/00/b1.txt,并且 BlobLastModifiedTime2019-11-11,则分配给该 blob 的时间戳为 2019-11-11

若要在事件有效负载中使用时间戳以流的形式处理数据,必须使用 TIMESTAMP BY 关键字。 如果该 blob 文件可用,则流分析作业每一秒都会从 Azure Blob 存储或 Data Lake Storage Gen2 输入中拉取数据。 如果 Blob 文件不可用,作业将采用指数退避策略,最大延迟时间为 90 秒。

注意

流分析不支持将内容添加到现有 Blob 文件。 流分析仅查看每个文件一次,在作业读取数据后,它不会处理文件中发生的任何更改。 最佳做法是立即上传 blob 文件的全部数据,然后将其他较新的事件添加到其他全新的 blob 文件中。

在连续添加许多 Blob 并在添加时由流分析处理的场景中,您可能会因为 BlobLastModifiedTime 的粒度而跳过某些 Blob,不过这种情况非常罕见。 可以通过每次上传 Blob 间隔至少两秒来缓解此问题。 如果此选项不可行,则可以使用事件中心流式传输大量事件。

将 Blob 存储配置为流输入

下表介绍了将 Blob 存储配置为流输入时,Azure 门户的“新输入”页中的每个属性。

属性 说明
输入别名 在作业查询中用于引用此输入的友好名称。
订阅 选择存储资源所在的订阅。
存储帐户 存储 Blob 文件所在的存储帐户的名称。
存储帐户密钥 与存储帐户关联的密钥。 除非你选择手动提供设置的选项,否则会自动填充此选项。
容器 容器为 blob 提供逻辑分组。 可以选择“使用现有”以使用现有容器;也可以选择“新建”以创建新的容器。
身份验证模式 指定要用于连接到存储帐户的身份验证类型。 可以使用连接字符串或托管标识通过存储帐户进行身份验证。 对于托管标识选项,可以将系统分配的托管标识创建到流分析作业,也可以创建用户分配的托管标识,以便使用存储帐户进行身份验证。 使用托管标识时,托管标识必须是存储帐户上适当角色的成员。
路径模式(可选) 用于定位指定容器中的 blob 的文件路径。 如果想从容器根目录读取 blob,请不要设置路径模式。 在路径中,可以指定以下 3 个变量的一个或多个实例:{date}{time}{partition}

示例 1:cluster1/logs/{date}/{time}/{partition}

示例 2:cluster1/logs/{date}

* 字符不是路径前缀允许使用的值。 仅允许使用有效的 Azure blob 字符。 不包括容器名称或文件名。
日期格式(可选) 如果在路径中使用日期变量,则文件是按照日期格式来组织的。 示例: YYYY/MM/DD

当 Blob 输入路径中包含 {date}{time},流分析会按时间升序查看文件夹。
时间格式(可选) 如果在路径中使用时间变量,文件按时间格式组织。 目前唯一支持的值是 HH,表示小时。
分区键 仅当将作业配置为使用 兼容级别 1.2 或更高版本时,该字段才可用。 如果按属性对输入进行分区,可以在此处添加此属性的名称。 如果你的查询包含此属性的 PARTITION BY 或 GROUP BY 子句,则此属性用于提高该查询性能。 如果此作业使用兼容性级别 1.2 或更高版本,则此字段默认为“PartitionId”。
输入分区数量 此字段只有在路径模式中存在 {partition} 的情况下才会存在。 此属性的值为 >=1 的整数。 无论 {partition} 在 pathPattern 中出现在哪里,都会使用一个介于 0 和此字段值减去 1 之间的数字。
事件序列化格式 传入数据流的序列化格式(JSON、CSV、Avro)。 确保 JSON 格式与规范保持一致,并且不包括十进制数字的前导 0。
编码 对于 CSV 和 JSON,目前只支持 UTF-8 这种编码格式。
压缩 用于读取传入数据流的压缩类型,例如None(默认)、Gzip 或 Deflate。

当数据来自 Blob 存储源时,可以在流分析查询中访问以下元数据字段:

属性 说明
BlobName 事件来自的输入 blob 的名称。
EventProcessedUtcTime 流分析处理事件的日期和时间。
BlobLastModifiedUtcTime 上次修改 blob 的日期和时间。
PartitionId 输入适配器的以零为基础的分区 ID。

通过使用这些字段,可以编写如下示例所示的查询:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

后续步骤