将数据作为流分析的输入进行流式传输Stream data as input into Stream Analytics

流分析与下述三种资源提供的作为输入的 Azure 数据流进行一流的集成:Stream Analytics has first-class integration with Azure data streams as inputs from three kinds of resources:

这些输入资源与流分析作业可以属于同一 Azure 订阅,也可以属于不同的订阅。These input resources can live in the same Azure subscription as your Stream Analytics job or a different subscription.

压缩Compression

流分析支持跨所有数据流输入源的压缩功能。Stream Analytics supports compression across all data stream input sources. 当前支持的引用类型包括:无、GZip 和 Deflate 压缩。Currently supported reference types are: None, GZip, and Deflate compression. 对压缩的支持不可用于参考数据。Support for compression is not available for reference data. 如果输入格式为压缩的 Avro 数据,则会以透明方式对其进行处理。If the input format is Avro data that is compressed, it's handled transparently. 不需要通过 Avro 序列化指定压缩类型。You don't need to specify compression type with Avro serialization.

创建、编辑或测试输入Create, edit, or test inputs

可以使用 Azure 门户创建新输入以及查看或编辑流式处理作业上的现有输入。You can use the Azure portal to create new inputs and view or edit existing inputs on your streaming job. 还可以通过示例数据测试输入连接以及测试查询。You can also test input connections and test queries from sample data. 编写查询时,将在 FROM 子句中列出输入。When you write a query, you will list the input in the FROM clause. 可以在门户的“查询”页中获取可用输入的列表。You can get the list of available inputs from the Query page in the portal. 若要使用多个输入,可以对其执行 JOIN 操作,也可以编写多个 SELECT 查询。If you wish to use multiple inputs, you can JOIN them or write multiple SELECT queries.

从事件中心对数据进行流式传输Stream data from Event Hubs

Azure 事件中心提供高度可缩放的发布-订阅事件引入器。Azure Event Hubs provides highly scalable publish-subscribe event ingestors. 事件中心每秒可收集数百万个事件,使用户能够处理和分析互连设备与应用程序生成的海量数据。An event hub can collect millions of events per second, so that you can process and analyze the massive amounts of data produced by your connected devices and applications. 事件中心和流分析一起提供进行实时分析所需的端到端解决方案。Together, Event Hubs and Stream Analytics provide an end-to-end solution for real-time analytics. 可以通过事件中心将事件实时馈送到 Azure 中,以便流分析作业对这些事件进行实时处理。Event Hubs lets you feed events into Azure in real-time, and Stream Analytics jobs can process those events in real-time. 例如,用户可以将 Web 点击操作、传感器读数或联机日志事件发送到事件中心。For example, you can send web clicks, sensor readings, or online log events to Event Hubs. 然后可以创建流分析作业,将事件中心用作输入数据流,以便进行实时筛选、聚合和关联操作。You can then create Stream Analytics jobs to use Event Hubs as the input data streams for real-time filtering, aggregating, and correlation.

EventEnqueuedUtcTime 是事件到达事件中心的时间戳,也是事件从事件中心发送到流分析的默认时间戳。EventEnqueuedUtcTime is the timestamp of an event's arrival in an event hub and is the default timestamp of events coming from Event Hubs to Stream Analytics. 若要在事件有效负载中使用时间戳以流方式处理数据,必须使用 TIMESTAMP BY 关键字。To process the data as a stream using a timestamp in the event payload, you must use the TIMESTAMP BY keyword.

使用者组Consumer groups

应对每个流分析事件中心输入进行配置,使之拥有自己的使用者组。You should configure each Stream Analytics event hub input to have its own consumer group. 如果作业包含自联接或具有多个输入,则某些输入可能会由下游的多个读取器读取。When a job contains a self-join or has multiple inputs, some inputs might be read by more than one reader downstream. 这种情况会影响单个使用者组中的读取器数量。This situation impacts the number of readers in a single consumer group. 为了避免超出针对事件中心设置的每个分区每个使用者组 5 个读取器的限制,最好是为每个流分析作业指定一个使用者组。To avoid exceeding the Event Hubs limit of five readers per consumer group per partition, it's a best practice to designate a consumer group for each Stream Analytics job. 此外,还有一项限制,即每个事件中心只能有 20 个使用者组。There is also a limit of 20 consumer groups per event hub.

从事件中心对数据进行流式传输Stream data from Event Hubs

下表介绍了 Azure 门户的“新输入”页中用于从事件中心流式传输数据输入的每个属性:The following table explains each property in the New input page in the Azure portal to stream data input from an event hub:

属性Property 说明Description
输入别名Input alias 在作业查询中用于引用此输入的友好名称。A friendly name that you use in the job's query to reference this input.
订阅Subscription 选择事件中心资源所在的订阅。Choose the subscription in which the Event hub resource exists.
事件中心命名空间Event Hub namespace 事件中心命名空间是包含一组消息传递实体的容器。The Event Hub namespace is a container for a set of messaging entities. 创建新的事件中心后,另请创建命名空间。When you create a new event hub, you also create the namespace.
事件中心名称Event Hub name 要用作输入的事件中心的名称。The name of the event hub to use as input.
事件中心策略名称Event Hub policy name 提供对事件中心的访问权限的共享访问策略。The shared access policy that provides access to the Event Hub. 每个共享访问策略具有名称、所设权限以及访问密钥。Each shared access policy has a name, permissions that you set, and access keys. 此选项会自动进行填充,除非已选择手动提供事件中心设置的选项。This option is automatically populated, unless you select the option to provide the Event Hub settings manually.
事件中心使用者组(推荐)Event Hub consumer group (recommended) 强烈建议对每个流分析作业使用不同的使用者组。It is highly recommended to use a distinct consumer group for each Stream Analytics job. 此字符串标识的使用者组用于从事件中心引入数据。This string identifies the consumer group to use to ingest data from the event hub. 如果未指定任何使用者组,流分析作业将使用 $Default 使用者组。If no consumer group is specified, the Stream Analytics job uses the $Default consumer group.
事件序列化格式Event serialization format 传入数据流的序列化格式(JSON、CSV 或 Avro)。The serialization format (JSON, CSV, or Avro) of the incoming data stream. 请确保该 JSON 格式符合规范,对于十进制数字不包括前导 0。Ensure the JSON format aligns with the specification and doesn�t include leading 0 for decimal numbers.
编码Encoding 目前只支持 UTF-8 这种编码格式。UTF-8 is currently the only supported encoding format.
事件压缩类型Event compression type 用于读取传入数据流的压缩类型,例如 None(默认)、GZip 或 Deflate。The compression type used to read the incoming data stream, such as None (default), GZip, or Deflate.

如果数据来自事件中心流输入,则可以在流分析查询中访问以下元数据字段:When your data comes from an Event Hub stream input, you have access to the following metadata fields in your Stream Analytics query:

属性Property 说明Description
EventProcessedUtcTimeEventProcessedUtcTime 流分析处理事件的日期和时间。The date and time that the event was processed by Stream Analytics.
EventEnqueuedUtcTimeEventEnqueuedUtcTime 事件中心收到事件的日期和时间。The date and time that the event was received by Event Hubs.
PartitionIdPartitionId 输入适配器的从零开始的分区 ID。The zero-based partition ID for the input adapter.

例如,可以使用这些字段编写如以下示例所示的查询:For example, using these fields, you can write a query like the following example:

SELECT
    EventProcessedUtcTime,
    EventEnqueuedUtcTime,
    PartitionId
FROM Input

Note

当使用事件中心作为 IoT 中心路由的终结点时,可通过 GetMetadataPropertyValue 函数访问 IoT 中心元数据。When using Event Hub as an endpoint for IoT Hub Routes, you can access to the IoT Hub metadata using the GetMetadataPropertyValue function.

从 IoT 中心流式传输数据Stream data from IoT Hub

Azure Iot 中心是已针对 IoT 进行优化,具有高度伸缩性的发布-订阅事件引入器。Azure Iot Hub is a highly scalable publish-subscribe event ingestor optimized for IoT scenarios.

在流分析中,来自 IoT 中心的事件的默认时间戳是事件到达 IoT 中心的时间戳,即 EventEnqueuedUtcTimeThe default timestamp of events coming from an IoT Hub in Stream Analytics is the timestamp that the event arrived in the IoT Hub, which is EventEnqueuedUtcTime. 若要在事件有效负载中使用时间戳以流方式处理数据,必须使用 TIMESTAMP BY 关键字。To process the data as a stream using a timestamp in the event payload, you must use the TIMESTAMP BY keyword.

使用者组Consumer groups

应该对每个流分析 IoT 中心输入进行配置,使之拥有自己的使用者组。You should configure each Stream Analytics IoT Hub input to have its own consumer group. 如果作业包含自联接或具有多个输入,则某些输入可能会由下游的多个读取器读取。When a job contains a self-join or when it has multiple inputs, some input might be read by more than one reader downstream. 这种情况会影响单个使用者组中的读取器数量。This situation impacts the number of readers in a single consumer group. 为了避免超出针对 Azure IoT 中心设置的每个分区每个使用者组 5 个读取器的限制,最好是为每个流分析作业指定一个使用者组。To avoid exceeding the Azure IoT Hub limit of five readers per consumer group per partition, it's a best practice to designate a consumer group for each Stream Analytics job.

将 IoT 中心配置为数据流输入Configure an IoT Hub as a data stream input

下表介绍了将 IoT 中心配置为流输入时,Azure 门户的“新输入”页中的每个属性。The following table explains each property in the New input page in the Azure portal when you configure an IoT Hub as a stream input.

属性Property 说明Description
输入别名Input alias 在作业查询中用于引用此输入的友好名称。A friendly name that you use in the job's query to reference this input.
订阅Subscription 选择 IoT 中心资源所在的订阅。Choose the subscription in which the IoT Hub resource exists.
IoT 中心IoT Hub 可用作输入的 IoT 中心的名称。The name of the IoT Hub to use as input.
终结点Endpoint IoT 中心终结点。The endpoint for the IoT Hub.
共享访问策略名称Shared access policy name 提供对 IoT 中心的访问权限的共享访问策略。The shared access policy that provides access to the IoT Hub. 每个共享访问策略具有名称、所设权限以及访问密钥。Each shared access policy has a name, permissions that you set, and access keys.
共享访问策略密钥Shared access policy key 用于授予对 IoT 中心的访问权限的共享访问密钥。The shared access key used to authorize access to the IoT Hub. 此选项会自动进行填充,除非已选择手动提供 IoT 中心设置的选项。This option is automatically populated in unless you select the option to provide the Iot Hub settings manually.
使用者组Consumer group 强烈建议对每个流分析作业使用不同的使用者组。It is highly recommended that you use a different consumer group for each Stream Analytics job. 用于从 IoT 中心引入数据的使用者组。The consumer group is used to ingest data from the IoT Hub. 流分析使用 $Default 所有者组,除非指定了其他组。Stream Analytics uses the $Default consumer group unless you specify otherwise.
事件序列化格式Event serialization format 传入数据流的序列化格式(JSON、CSV 或 Avro)。The serialization format (JSON, CSV, or Avro) of the incoming data stream. 请确保该 JSON 格式符合规范,对于十进制数字不包括前导 0。Ensure the JSON format aligns with the specification and doesn't include leading 0 for decimal numbers.
编码Encoding 目前只支持 UTF-8 这种编码格式。UTF-8 is currently the only supported encoding format.
事件压缩类型Event compression type 用于读取传入数据流的压缩类型,例如 None(默认)、GZip 或 Deflate。The compression type used to read the incoming data stream, such as None (default), GZip, or Deflate.

如果使用的流数据来自 IoT 中心,则可以在流分析查询中访问以下元数据字段:When you use stream data from an IoT Hub, you have access to the following metadata fields in your Stream Analytics query:

属性Property 说明Description
EventProcessedUtcTimeEventProcessedUtcTime 处理事件的日期和时间。The date and time that the event was processed.
EventEnqueuedUtcTimeEventEnqueuedUtcTime IoT 中心收到事件的日期和时间。The date and time that the event was received by the IoT Hub.
PartitionIdPartitionId 输入适配器的从零开始的分区 ID。The zero-based partition ID for the input adapter.
IoTHub.MessageIdIoTHub.MessageId 用于关联 IoT 中心内的双向通信的 ID。An ID that's used to correlate two-way communication in IoT Hub.
IoTHub.CorrelationIdIoTHub.CorrelationId 用于 IoT 中心内的消息响应和反馈的 ID。An ID that's used in message responses and feedback in IoT Hub.
IoTHub.ConnectionDeviceIdIoTHub.ConnectionDeviceId 用于发送此消息的身份验证 ID。The authentication ID used to send this message. 此值由 IoT 中心在服务绑定的消息上加盖标记。This value is stamped on servicebound messages by the IoT Hub.
IoTHub.ConnectionDeviceGenerationIdIoTHub.ConnectionDeviceGenerationId 用于发送此消息的经过身份验证设备的生成 ID。The generation ID of the authenticated device that was used to send this message. 此值由 IoT 中心在服务绑定的消息上加盖标记。This value is stamped on servicebound messages by the IoT Hub.
IoTHub.EnqueuedTimeIoTHub.EnqueuedTime IoT 中心收到消息的时间。The time when the message was received by the IoT Hub.

从 Blob 存储流式传输数据Stream data from Blob storage

对于需要将大量非结构化数据存储在云中的情况,Azure Blob 存储提供了一种经济高效且可伸缩的解决方案。For scenarios with large quantities of unstructured data to store in the cloud, Azure Blob storage offers a cost-effective and scalable solution. 通常情况下,可以将 Blob 存储中的数据视为静态数据,但 Blob 数据可以作为数据流由流分析处理。Data in Blob storage is usually considered data at rest; however, blob data can be processed as a data stream by Stream Analytics.

通过流分析来使用 Blob 存储输入时,日志处理是一种常用方案。Log processing is a commonly used scenario for using Blob storage inputs with Stream Analytics. 在此方案中,首先从某个系统捕获遥测数据文件,然后根据需要对这些数据进行分析和处理以提取有意义的数据。In this scenario, telemetry data files have been captured from a system and need to be parsed and processed to extract meaningful data.

流分析中 Blob 存储事件的默认时间戳是上次修改 Blob 的时间戳,即 BlobLastModifiedUtcTimeThe default timestamp of Blob storage events in Stream Analytics is the timestamp that the blob was last modified, which is BlobLastModifiedUtcTime. 若要在事件有效负载中使用时间戳以流方式处理数据,必须使用 TIMESTAMP BY 关键字。To process the data as a stream using a timestamp in the event payload, you must use the TIMESTAMP BY keyword. 如果 blob 文件可用,流分析作业将每秒从 Azure Blob 存储输入中拉取数据。A Stream Analytics job pulls data from Azure Blob storage input every second if the blob file is available. 如果 blob 文件不可用,则存在指数回退,且最长时间延迟为 90 秒。If the blob file is unavailable, there is an exponential backoff with a maximum time delay of 90 seconds.

CSV 格式的输入需要标头行来定义数据集的字段,并且所有标头行字段必须是唯一的。CSV-formatted inputs require a header row to define fields for the data set, and all header row fields must be unique.

流分析目前不支持对事件中心捕获或 IoT 中心 Azure 存储容器自定义终结点生成的 AVRO 消息执行反序列化操作。Stream Analytics currently does not support deserializing AVRO messages generated by Event Hub capture or IoT Hub Azure Storage Container custom endpoint.

Note

流分析不支持将内容添加到现有 blob 文件。Stream Analytics does not support adding content to an existing blob file. 流分析将仅查看每个文件一次,并且在作业读取数据后对文件所做的任何更改都不会得到处理。Stream Analytics will view each file only once, and any changes that occur in the file after the job has read the data are not processed. 最佳做法是立即上传 blob 文件的全部数据,然后将其他较新的事件添加到其他全新的 blob 文件中。Best practice is to upload all the data for a blob file at once and then add additional newer events to a different, new blob file.

将 Blob 存储配置为流输入Configure Blob storage as a stream input

下表介绍了将 Blob 存储配置为流输入时,Azure 门户的“新输入”页中的每个属性。The following table explains each property in the New input page in the Azure portal when you configure Blob storage as a stream input.

属性Property 说明Description
输入别名Input alias 在作业查询中用于引用此输入的友好名称。A friendly name that you use in the job's query to reference this input.
订阅Subscription 选择 IoT 中心资源所在的订阅。Choose the subscription in which the IoT Hub resource exists.
存储帐户Storage account 存储 Blob 文件所在的存储帐户的名称。The name of the storage account where the blob files are located.
存储帐户密钥Storage account key 与存储帐户关联的密钥。The secret key associated with the storage account. 此选项会自动进行填充,除非已选择手动提供 Blob 存储设置的选项。This option is automatically populated in unless you select the option to provide the Blob storage settings manually.
容器Container 用于 Blob 输入的容器。The container for the blob input. 容器对存储在 Azure Blob 服务中的 blob 进行逻辑分组。Containers provide a logical grouping for blobs stored in the Azure Blob service. 将 Blob 上传到 Azure Blob 存储服务时,必须为该 Blob 指定一个容器。When you upload a blob to the Azure Blob storage service, you must specify a container for that blob. 可以选择“使用现有”,以便使用现有的容器;也可以选择“新建”,以便创建新的容器。You can choose either Use existing container or Create new to have a new container created.
路径模式(可选)Path pattern (optional) 用于定位指定容器中的 blob 的文件路径。The file path used to locate the blobs within the specified container. 在路径中,可以指定以下 3 个变量的一个或多个实例:{date}{time}{partition}Within the path, you can specify one or more instances of the following three variables: {date}, {time}, or {partition}

示例 1:cluster1/logs/{date}/{time}/{partition}Example 1: cluster1/logs/{date}/{time}/{partition}

示例 2:cluster1/logs/{date}Example 2: cluster1/logs/{date}

* 字符不是路径前缀允许使用的值。The * character is not an allowed value for the path prefix. 仅允许使用有效的 Azure blob 字符Only valid Azure blob characters are allowed. 不包括容器名称或文件名。No not include container names or file names.
日期格式(可选)Date format (optional) 如果在路径中使用日期变量,则为组织文件的日期格式。If you use the date variable in the path, the date format in which the files are organized. 示例: YYYY/MM/DDExample: YYYY/MM/DD
时间格式(可选)Time format (optional) 如果在路径中使用时间变量,则为组织文件的时间格式。If you use the time variable in the path, the time format in which the files are organized. 目前唯一支持的值是 HH,表示小时。Currently the only supported value is HH for hours.
事件序列化格式Event serialization format 传入数据流的序列化格式(JSON、CSV 或 Avro)。The serialization format (JSON, CSV, or Avro) of the incoming data stream. 请确保该 JSON 格式符合规范,对于十进制数字不包括前导 0。Ensure the JSON format aligns with the specification and doesn't include leading 0 for decimal numbers.
编码Encoding 对于 CSV 和 JSON,目前只支持 UTF-8 这种编码格式。For CSV and JSON, UTF-8 is currently the only supported encoding format.
压缩Compression 用于读取传入数据流的压缩类型,例如 None(默认)、GZip 或 Deflate。The compression type used to read the incoming data stream, such as None (default), GZip, or Deflate.

如果数据来自 Blob 存储源,则可以在流分析查询中访问以下元数据字段:When your data comes from a Blob storage source, you have access to the following metadata fields in your Stream Analytics query:

属性Property 说明Description
BlobNameBlobName 提供事件的输入 blob 的名称。The name of the input blob that the event came from.
EventProcessedUtcTimeEventProcessedUtcTime 流分析处理事件的日期和时间。The date and time that the event was processed by Stream Analytics.
BlobLastModifiedUtcTimeBlobLastModifiedUtcTime 上次修改 blob 的日期和时间。The date and time that the blob was last modified.
PartitionIdPartitionId 输入适配器的从零开始的分区 ID。The zero-based partition ID for the input adapter.

例如,可以使用这些字段编写如以下示例所示的查询:For example, using these fields, you can write a query like the following example:

SELECT
    BlobName,
    EventProcessedUtcTime,
    BlobLastModifiedUtcTime
FROM Input

后续步骤Next steps