了解 Azure 流分析的输出Understand outputs from Azure Stream Analytics

本文介绍适用于 Azure 流分析作业的输出类型。This article describes the types of outputs available for an Azure Stream Analytics job. 输出可帮助存储和保存流分析作业的结果。Outputs let you store and save the results of the Stream Analytics job. 使用输出数据,可进一步进行业务分析和数据的数据仓储。By using the output data, you can do further business analytics and data warehousing of your data.

设计流分析查询时,使用 INTO 子句引用输出的名称。When you design your Stream Analytics query, refer to the name of the output by using the INTO clause. 可针对每个作业使用单个输出,如果需要,也可通过在查询中提供多个 INTO 子句,针对每个流式处理作业使用多个输出。You can use a single output per job, or multiple outputs per streaming job (if you need them) by providing multiple INTO clauses in the query.

要创建、编辑和测试流分析作业输出,可使用 Azure 门户Azure PowerShell.NET APIREST APITo create, edit, and test Stream Analytics job outputs, you can use the Azure portal, Azure PowerShell, .NET API, and REST API.

某些输出类型支持分区Some outputs types support partitioning. 输出批大小可变,这是为了优化吞吐量。Output batch sizes vary to optimize throughput.

SQL 数据库SQL Database

可以将 Azure SQL 数据库用作本质上为关系型数据的输出,也可以将其用于所依赖的内容在关系数据库中托管的应用程序。You can use Azure SQL Database as an output for data that's relational in nature or for applications that depend on content being hosted in a relational database. 流分析作业将写入到 SQL 数据库的现有表中。Stream Analytics jobs write to an existing table in SQL Database. 表架构必须与作业输出中的字段及其类型完全匹配。The table schema must exactly match the fields and their types in your job's output. 还可以通过 SQL 数据库输出选项将 Azure SQL 数据仓库指定为输出。You can also specify Azure SQL Data Warehouse as an output via the SQL Database output option. 若要了解提高写入吞吐量的方法,请参阅以 Azure SQL 数据库作为输出的流分析一文。To learn about ways to improve write throughput, see the Stream Analytics with Azure SQL Database as output article.

还可以使用 Azure SQL 托管实例作为输出。You can also use Azure SQL Managed Instance as an output. 必须在 SQL 托管实例中配置公共终结点,然后在 Azure 流分析中手动配置以下设置。You have to configure public endpoint in SQL Managed Instance and then manually configure the following settings in Azure Stream Analytics. 还可以通过手动配置以下设置来支持运行附加了数据库的 SQL Server 的 Azure 虚拟机。Azure virtual machine running SQL Server with a database attached is also supported by manually configuring the settings below.

下表列出了属性名称和用于创建 SQL 数据库输出的属性说明。The following table lists the property names and their description for creating a SQL Database output.

属性名称Property name 说明Description
输出别名Output alias 在查询中使用的友好名称,用于将查询输出定向到此数据库。A friendly name used in queries to direct the query output to this database.
数据库Database 将输出发送到的数据库的名称。The name of the database where you're sending your output.
服务器名称Server name 逻辑 SQL 服务器名称或托管实例名称。The logical SQL server name or managed instance name. 对于 SQL 托管实例,需要指定端口 3342。For SQL Managed Instance, it is required to specify the port 3342. 例如,“sampleserver.public.database.chinacloudapi.cn,3342”For example, sampleserver.public.database.chinacloudapi.cn,3342
用户名Username 对数据库拥有写入访问权限的用户名。The username that has write access to the database. 流分析仅支持 SQL 身份验证。Stream Analytics supports only SQL authentication.
密码Password 用于连接到数据库的密码。The password to connect to the database.
Table 将写入输出的表名称。The table name where the output is written. 表名称区分大小写。The table name is case-sensitive. 此表的架构应与字段数量以及作业输出生成的字段类型完全匹配。The schema of this table should exactly match the number of fields and their types that your job output generates.
继承分区方案Inherit partition scheme 一个用于继承先前查询步骤的分区方案,以启用具有多个表的写入器的完全并行拓扑的选项。An option for inheriting the partitioning scheme of your previous query step, to enable fully parallel topology with multiple writers to the table. 有关详细信息,请参阅从 Azure 流分析输出到 Azure SQL 数据库For more information, see Azure Stream Analytics output to Azure SQL Database.
最大批数Max batch count 每个批量插入事务一起发送的推荐记录数上限。The recommended upper limit on the number of records sent with every bulk insert transaction.

Blob 存储和 Azure Data Lake Gen2Blob storage and Azure Data Lake Gen2

Data Lake Storage Gen2 使 Azure 存储成为在 Azure 上构建企业 Data Lake 的基础。Data Lake Storage Gen2 makes Azure Storage the foundation for building enterprise data lakes on Azure. Data Lake Storage Gen2 从一开始就设计为处理数千万亿字节的信息,同时保持数百千兆位的吞吐量,可让你轻松管理大量数据。Data Lake Storage Gen2 的一个基本特征是在 Blob 存储中添加分层命名空间。Designed from the start to service multiple petabytes of information while sustaining hundreds of gigabits of throughput, Data Lake Storage Gen2 allows you to easily manage massive amounts of data.A fundamental part of Data Lake Storage Gen2 is the addition of a hierarchical namespace to Blob storage.

Azure Blob 存储提供了一种经济高效且可扩展的解决方案,用于在云中存储大量非结构化数据。Azure Blob storage offers a cost-effective and scalable solution for storing large amounts of unstructured data in the cloud. 有关 Blob 存储及其用法的简介,请参阅使用 Azure 门户上传、下载和列出 blobFor an introduction on Blob storage and its usage, see Upload, download, and list blobs with the Azure portal.

下表列出了用于创建 blob 或 ADLS Gen2 输出的属性名称及其说明。The following table lists the property names and their descriptions for creating a blob or ADLS Gen2 output.

属性名称Property name 说明Description
输出别名Output alias 查询中使用的友好名称,用于将查询输出定向到此 blob 存储。A friendly name used in queries to direct the query output to this blob storage.
存储帐户Storage account 存储帐户的名称(正在向该存储帐户发送输出)。The name of the storage account where you're sending your output.
存储帐户密钥Storage account key 与存储帐户关联的密钥。The secret key associated with the storage account.
存储容器Storage container 对存储在 Azure Blob 服务中的 blob 进行逻辑分组。A logical grouping for blobs stored in the Azure Blob service. 将 blob 上传到 Blob 服务时,必须为该 blob 指定一个容器。When you upload a blob to the Blob service, you must specify a container for that blob.
路径模式Path pattern 可选。Optional. 用于写入指定容器中的 blob 的文件路径模式。The file path pattern that's used to write your blobs within the specified container.

在路径模式中,可以选择使用数据时间变量的一个或多个实例指定 blob 写入的频率:In the path pattern, you can choose to use one or more instances of the date and time variables to specify the frequency that blobs are written:
{date}、{time}{date}, {time}

可以使用自定义 blob 分区从事件数据中指定一个自定义 {field} 名称来对 blob 进行分区。You can use custom blob partitioning to specify one custom {field} name from your event data to partition blobs. 字段名称是字母数字,并且可以包含空格、连字符和下划线。The field name is alphanumeric and can include spaces, hyphens, and underscores. 对自定义字段的限制包括以下内容:Restrictions on custom fields include the following:
  • 字段名称不区分大小写。Field names aren't case-sensitive. 例如,服务无法区分列“ID”和列“id”。For example, the service can't differentiate between column "ID" and column "id."
  • 不允许嵌套字段。Nested fields are not permitted. 在作业查询中改用别名来“平展”字段。Instead, use an alias in the job query to "flatten" the field.
  • 不能使用表达式作为字段名称。Expressions can't be used as a field name.

通过此功能可以在路径中使用自定义日期/时间格式说明符配置。This feature enables the use of custom date/time format specifier configurations in the path. 一次只能指定一个自定义日期和时间格式,并将其用 {datetime:<specifier>} 关键字括起来。Custom date and time formats must be specified one at a time, enclosed by the {datetime:<specifier>} keyword. <specifier> 允许的输入为 yyyy、MM、M、dd、d、HH、H、mm、m、ss 或 s。Allowable inputs for <specifier> are yyyy, MM, M, dd, d, HH, H, mm, m, ss, or s. 可以在路径中多次使用 {datetime:<specifier>} 关键字以构成自定义的日期/时间配置。The {datetime:<specifier>} keyword can be used multiple times in the path to form custom date/time configurations.

示例:Examples:
  • 示例 1:cluster1/logs/{date}/{time}Example 1: cluster1/logs/{date}/{time}
  • 示例 2:cluster1/logs/{date}Example 2: cluster1/logs/{date}
  • 示例 3:cluster1/{client_id}/{date}/{time}Example 3: cluster1/{client_id}/{date}/{time}
  • 示例 4:cluster1/{datetime:ss}/{myField},其中查询是:SELECT data.myField AS myField FROM Input;Example 4: cluster1/{datetime:ss}/{myField} where the query is: SELECT data.myField AS myField FROM Input;
  • 示例 5:cluster1/year={datetime:yyyy}/month={datetime:MM}/day={datetime:dd}Example 5: cluster1/year={datetime:yyyy}/month={datetime:MM}/day={datetime:dd}

创建的文件夹结构的时间戳遵循 UTC 而不是本地时间。The time stamp of the created folder structure follows UTC and not local time.

文件命名采用以下约定:File naming uses the following convention:

{路径前缀模式}/schemaHashcode_Guid_Number.extension{Path Prefix Pattern}/schemaHashcode_Guid_Number.extension

示例输出文件:Example output files:
  • Myoutput/20170901/00/45434_gguid_1.csvMyoutput/20170901/00/45434_gguid_1.csv
  • Myoutput/20170901/01/45434_gguid_1.csvMyoutput/20170901/01/45434_gguid_1.csv

有关此功能的详细信息,请参阅 Azure 流分析自定义 blob 输出分区For more information about this feature, see Azure Stream Analytics custom blob output partitioning.
日期格式Date format 可选。Optional. 如果在前缀路径中使用日期令牌,可以选择组织文件所采用的日期格式。If the date token is used in the prefix path, you can select the date format in which your files are organized. 示例:YYYY/MM/DDExample: YYYY/MM/DD
时间格式Time format 可选。Optional. 如果在前缀路径中使用时间令牌,可指定组织文件所采用的时间格式。If the time token is used in the prefix path, specify the time format in which your files are organized. 目前唯一支持的值是 HH。Currently the only supported value is HH.
事件序列化格式Event serialization format 输出数据的序列化格式。Serialization format for output data. 支持 JSON、CSV、Avro 和 Parquet。JSON, CSV, Avro, and Parquet are supported.
最小行数Minimum rows 每批的最小行数。The number of minimum rows per batch. 对于 Parquet,每个批处理都将创建一个新文件。For Parquet, every batch will create a new file. 当前默认值为 2000 行,允许的最大值为 10000 行。The current default value is 2,000 rows and the allowed maximum is 10,000 rows.
最长时间Maximum time 每批的最长等待时间。The maximum wait time per batch. 在此时间后,即使不满足最小行数要求,也会将该批写入输出。After this time, the batch will be written to the output even if the minimum rows requirement is not met. 当前默认值为 1 分钟,允许的最大值为 2 小时。The current default value is 1 minute and the allowed maximum is 2 hours. 如果 blob 输出具有路径模式频率,则等待时间不能超出分区时间范围。If your blob output has path pattern frequency, the wait time cannot be higher than the partition time range.
编码Encoding 如果使用 CSV 或 JSON 格式,则必须指定一种编码格式。If you're using CSV or JSON format, an encoding must be specified. 目前只支持 UTF-8 这种编码格式。UTF-8 is the only supported encoding format at this time.
分隔符Delimiter 仅适用于 CSV 序列化。Applicable only for CSV serialization. 流分析支持大量的常见分隔符以对 CSV 数据进行序列化。Stream Analytics supports a number of common delimiters for serializing CSV data. 支持的值为逗号、分号、空格、制表符和竖线。Supported values are comma, semicolon, space, tab, and vertical bar.
格式Format 仅适用于 JSON 序列化。Applicable only for JSON serialization. 分隔行指定通过新行分隔各个 JSON 对象,从而格式化输出。Line separated specifies that the output is formatted by having each JSON object separated by a new line. 如果选择“行分隔”,则读取 JSON 时,一次读取一个对象。If you select Line separated, the JSON is read one object at a time. 整个内容本身将不是有效的 JSON。The whole content by itself would not be a valid JSON. 数组指定输出会被格式化为 JSON 对象的数组。Array specifies that the output is formatted as an array of JSON objects. 仅当作业停止或流分析移动到下个时间段时,才关闭此数组。This array is closed only when the job stops or Stream Analytics has moved on to the next time window. 一般而言,最好使用分隔行 JSON,因为在继续写入输出文件时,无需任何特殊处理。In general, it's preferable to use line-separated JSON, because it doesn't require any special handling while the output file is still being written to.

使用 blob 存储作为输出时,在以下情况下 blob 中创建一个新文件:When you're using Blob storage as output, a new file is created in the blob in the following cases:

  • 如果文件超出了允许的最大块数(目前为 50,000)。If the file exceeds the maximum number of allowed blocks (currently 50,000). 可达到允许的最大块数,但不能达到允许的最大 blob 大小。You might reach the maximum allowed number of blocks without reaching the maximum allowed blob size. 例如,如果输出率很高,则可以看到每个块的字节更多,并且文件大小会更大。For example, if the output rate is high, you can see more bytes per block, and the file size is larger. 输出率较低时,每个块都有较少的数据,且文件大小较小。If the output rate is low, each block has less data, and the file size is smaller.
  • 如果输出中出现架构更改,输出格式也需要固定的架构(CSV 和 Avro)。If there's a schema change in the output, and the output format requires fixed schema (CSV and Avro).
  • 如果作业重新启动,可选择在外部由用户停止或启动,或在内部进行系统维护或错误恢复。If a job is restarted, either externally by a user stopping it and starting it, or internally for system maintenance or error recovery.
  • 如果对查询进行完全分区,会为每个输出分区创建新文件。If the query is fully partitioned, and a new file is created for each output partition.
  • 如果用户删除存储帐户的文件或容器。If the user deletes a file or a container of the storage account.
  • 如果使用路径前缀模式对输出进行了时间分区,当查询移动到下一个小时后,会使用新的 blob。If the output is time partitioned by using the path prefix pattern, and a new blob is used when the query moves to the next hour.
  • 如果按自定义字段对输出进行分区,则每个分区键都会创建新的 blob(如果不存在)。If the output is partitioned by a custom field, and a new blob is created per partition key if it does not exist.
  • 如果按照自定义字段对输出进行分区(其中分区键基数超过 8,000),并为每个分区键创建一个新的 blob。If the output is partitioned by a custom field where the partition key cardinality exceeds 8,000, and a new blob is created per partition key.

事件中心Event Hubs

Azure 事件中心服务是具有高扩展性的发布 - 订阅事件引入器。The Azure Event Hubs service is a highly scalable publish-subscribe event ingestor. 事件中心每秒可收集数百万个事件。It can collect millions of events per second. 当流分析作业的输出成为另一个流式处理作业的输入时,可以将事件中心用作输出。One use of an event hub as output is when the output of a Stream Analytics job becomes the input of another streaming job. 有关最大消息大小和批大小优化的信息,请参阅输出批大小部分。For information about the maximum message size and batch size optimization, see the output batch size section.

需要使用几个参数将事件中心的数据流配置为输出。You need a few parameters to configure data streams from event hubs as an output.

属性名称Property name 说明Description
输出别名Output alias 查询中使用的易记名称,用于将查询输出定向到此事件中心。A friendly name used in queries to direct the query output to this event hub.
事件中心命名空间Event hub namespace 一组消息处理实体的容器。A container for a set of messaging entities. 创建新的事件中心后,还创建了事件中心命名空间。When you created a new event hub, you also created an event hub namespace.
事件中心名称Event hub name 事件中心输出的名称。The name of your event hub output.
事件中心策略名称Event hub policy name 共享访问策略,可以在事件中心的“配置”选项卡上创建。每个共享访问策略具有名称、所设权限以及访问密钥。The shared access policy, which you can create on the event hub's Configure tab. Each shared access policy has a name, permissions that you set, and access keys.
事件中心策略密钥Event hub policy key 用于对事件中心命名空间的访问权限进行身份验证的共享访问密钥。The shared access key that's used to authenticate access to the event hub namespace.
分区键列Partition key column 可选。Optional. 包含事件中心输出的分区键的列。A column that contains the partition key for event hub output.
事件序列化格式Event serialization format 输出数据的序列化格式。The serialization format for output data. 支持 JSON、CSV 和 Avro。JSON, CSV, and Avro are supported.
编码Encoding 对于 CSV 和 JSON,目前只支持 UTF-8 这种编码格式。For CSV and JSON, UTF-8 is the only supported encoding format at this time.
分隔符Delimiter 仅适用于 CSV 序列化。Applicable only for CSV serialization. 流分析支持大量的常见分隔符以对 CSV 格式的数据进行序列化。Stream Analytics supports a number of common delimiters for serializing data in CSV format. 支持的值为逗号、分号、空格、制表符和竖线。Supported values are comma, semicolon, space, tab, and vertical bar.
格式Format 仅适用于 JSON 序列化。Applicable only for JSON serialization. 分隔行指定通过新行分隔各个 JSON 对象,从而格式化输出。Line separated specifies that the output is formatted by having each JSON object separated by a new line. 如果选择“行分隔”,则读取 JSON 时,一次读取一个对象。If you select Line separated, the JSON is read one object at a time. 整个内容本身将不是有效的 JSON。The whole content by itself would not be a valid JSON. 数组指定输出会被格式化为 JSON 对象的数组。Array specifies that the output is formatted as an array of JSON objects.
属性列Property columns 可选。Optional. 需要作为传出消息(而不是有效负载)的用户属性附加的以逗号分隔的列。Comma-separated columns that need to be attached as user properties of the outgoing message instead of the payload. 输出的自定义元数据属性部分详细介绍了此功能。More information about this feature is in the section Custom metadata properties for output.

表存储Table storage

Azure 表存储 提供了具有高可用性且可大规模缩放的存储,因此应用程序可以自动缩放以满足用户需求。Azure Table storage offers highly available, massively scalable storage, so that an application can automatically scale to meet user demand. 表存储是 Microsoft 推出的 NoSQL 键/属性存储,适用于对架构的约束性较少的结构化数据。Table storage is Microsoft's NoSQL key/attribute store, which you can use for structured data with fewer constraints on the schema. Azure 表存储可用于持久地存储数据,方便进行高效的检索。Azure Table storage can be used to store data for persistence and efficient retrieval.

下表列出了用于创建表输出的属性名称及其说明。The following table lists the property names and their descriptions for creating a table output.

属性名称Property name 说明Description
输出别名Output alias 该名称是在查询中使用的友好名称,用于将查询输出定向到此表存储。A friendly name used in queries to direct the query output to this table storage.
存储帐户Storage account 存储帐户的名称(正在向该存储帐户发送输出)。The name of the storage account where you're sending your output.
存储帐户密钥Storage account key 与存储帐户关联的访问密钥。The access key associated with the storage account.
表名称Table name 表的名称。The name of the table. 如果表不存在,则创建该表。The table gets created if it doesn't exist.
分区键Partition key 包含分区键的输出列的名称。The name of the output column that contains the partition key. 分区键是某个表中分区的唯一标识符,分区键构成了实体主键的第一部分。The partition key is a unique identifier for the partition within a table that forms the first part of an entity's primary key. 它是最大可为 1 KB 的字符串值。It's a string value that can be up to 1 KB in size.
行键Row key 包含行键的输出列的名称。The name of the output column that contains the row key. 行键是分区中实体的唯一标识符。The row key is a unique identifier for an entity within a partition. 行键构成了实体主键的第二部分。It forms the second part of an entity's primary key. 行键是一个最大为 1 KB 的字符串值。The row key is a string value that can be up to 1 KB in size.
批大小Batch size 批处理操作的记录数。The number of records for a batch operation. 默认值 (100) 对大部分作业来说都已足够。The default (100) is sufficient for most jobs. 有关修改设置的详细信息,请参阅表批处理操作规范See the Table Batch Operation spec for more details on modifying this setting.

服务总线队列Service Bus queues

服务总线队列为一个或多个竞争使用方提供 FIFO 消息传递方式。Service Bus queues offer a FIFO message delivery to one or more competing consumers. 通常,接收方按照消息添加到队列中的临时顺序来接收并处理消息。Typically, messages are received and processed by the receivers in the temporal order in which they were added to the queue. 每条消息仅由一个消息使用者接收和处理。Each message is received and processed by only one message consumer.

兼容性级别 1.2 中,Azure 流分析使用高级消息队列协议 (AMQP) 消息传递协议将内容写入服务总线队列和主题。In compatibility level 1.2, Azure Stream Analytics uses Advanced Message Queueing Protocol (AMQP) messaging protocol to write to Service Bus Queues and Topics. 通过 AMQP 可使用开放标准协议构建跨平台的混合应用程序。AMQP enables you to build cross-platform, hybrid applications using an open standard protocol.

下表列出了用于创建队列输出的属性名称及其说明。The following table lists the property names and their descriptions for creating a queue output.

属性名称Property name 说明Description
输出别名Output alias 该名称是在查询中使用的易记名称,用于将查询输出定向到此服务总线队列。A friendly name used in queries to direct the query output to this Service Bus queue.
服务总线命名空间Service Bus namespace 一组消息处理实体的容器。A container for a set of messaging entities.
队列名称Queue name 服务总线队列的名称。The name of the Service Bus queue.
队列策略名称Queue policy name 创建队列时,还可以在“队列配置”选项卡上创建共享的访问策略。每个共享访问策略具有名称、所设权限以及访问密钥。When you create a queue, you can also create shared access policies on the queue's Configure tab. Each shared access policy has a name, permissions that you set, and access keys.
队列策略密钥Queue policy key 用于对服务总线命名空间的访问权限进行身份验证的共享访问密钥。The shared access key that's used to authenticate access to the Service Bus namespace.
事件序列化格式Event serialization format 输出数据的序列化格式。The serialization format for output data. 支持 JSON、CSV 和 Avro。JSON, CSV, and Avro are supported.
编码Encoding 对于 CSV 和 JSON,目前只支持 UTF-8 这种编码格式。For CSV and JSON, UTF-8 is the only supported encoding format at this time.
分隔符Delimiter 仅适用于 CSV 序列化。Applicable only for CSV serialization. 流分析支持大量的常见分隔符以对 CSV 格式的数据进行序列化。Stream Analytics supports a number of common delimiters for serializing data in CSV format. 支持的值为逗号、分号、空格、制表符和竖线。Supported values are comma, semicolon, space, tab, and vertical bar.
格式Format 仅适用于 JSON 类型。Applicable only for JSON type. 分隔行指定通过新行分隔各个 JSON 对象,从而格式化输出。Line separated specifies that the output is formatted by having each JSON object separated by a new line. 如果选择“行分隔”,则读取 JSON 时,一次读取一个对象。If you select Line separated, the JSON is read one object at a time. 整个内容本身将不是有效的 JSON。The whole content by itself would not be a valid JSON. 数组指定输出会被格式化为 JSON 对象的数组。Array specifies that the output is formatted as an array of JSON objects.
属性列Property columns 可选。Optional. 需要作为传出消息(而不是有效负载)的用户属性附加的以逗号分隔的列。Comma-separated columns that need to be attached as user properties of the outgoing message instead of the payload. 输出的自定义元数据属性部分详细介绍了此功能。More information about this feature is in the section Custom metadata properties for output.
系统属性列System Property columns 可选。Optional. 需要附加到传出消息而不是附加到有效负载的系统属性和相应列名的键值对。Key value pairs of System Properties and corresponding column names that need to be attached to the outgoing message instead of the payload. 服务总线队列和主题输出的系统属性部分详细介绍了此功能。More information about this feature is in the section System properties for Service Bus Queue and Topic outputs

分区数基于服务总线 SKU 和大小The number of partitions is based on the Service Bus SKU and size. 分区键是每个分区的唯一整数值。Partition key is a unique integer value for each partition.

服务总线主题Service Bus Topics

服务总线队列提供从发送方到接收方的一对一通信方法。Service Bus queues provide a one-to-one communication method from sender to receiver. 服务总线主题提供一对多形式的通信。Service Bus topics provide a one-to-many form of communication.

下表列出了用于创建服务总线主题输出的属性名称及其说明。The following table lists the property names and their descriptions for creating a Service Bus topic output.

属性名称Property name 说明Description
输出别名Output alias 该名称是在查询中使用的易记名称,用于将查询输出定向到此服务总线主题。A friendly name used in queries to direct the query output to this Service Bus topic.
服务总线命名空间Service Bus namespace 包含一组消息实体的容器。A container for a set of messaging entities. 创建新的事件中心后,还创建了 Service Bus 命名空间。When you created a new event hub, you also created a Service Bus namespace.
主题名称Topic name 主题是消息传递实体,类似于事件中心和队列。Topics are messaging entities, similar to event hubs and queues. 设计用于从设备和服务收集事件流。They're designed to collect event streams from devices and services. 在创建主题时,还会为其提供特定的名称。When a topic is created, it's also given a specific name. 发送到主题的消息在创建订阅后才会提供给用户,因此请确保主题下存在一个或多个订阅The messages sent to a topic aren't available unless a subscription is created, so ensure there's one or more subscriptions under the topic.
主题策略名称Topic policy name 创建服务总线主题时,还可以在主题的“配置”选项卡上创建共享的访问策略。每个共享访问策略具有名称、所设权限以及访问密钥。When you create a Service Bus topic, you can also create shared access policies on the topic's Configure tab. Each shared access policy has a name, permissions that you set, and access keys.
主题策略密钥Topic policy key 用于对服务总线命名空间的访问权限进行身份验证的共享访问密钥。The shared access key that's used to authenticate access to the Service Bus namespace.
事件序列化格式Event serialization format 输出数据的序列化格式。The serialization format for output data. 支持 JSON、CSV 和 Avro。JSON, CSV, and Avro are supported.
编码Encoding 如果使用 CSV 或 JSON 格式,则必须指定一种编码格式。If you're using CSV or JSON format, an encoding must be specified. 目前只支持 UTF-8 这种编码格式。UTF-8 is the only supported encoding format at this time.
分隔符Delimiter 仅适用于 CSV 序列化。Applicable only for CSV serialization. 流分析支持大量的常见分隔符以对 CSV 格式的数据进行序列化。Stream Analytics supports a number of common delimiters for serializing data in CSV format. 支持的值为逗号、分号、空格、制表符和竖线。Supported values are comma, semicolon, space, tab, and vertical bar.
属性列Property columns 可选。Optional. 需要作为传出消息(而不是有效负载)的用户属性附加的以逗号分隔的列。Comma-separated columns that need to be attached as user properties of the outgoing message instead of the payload. 输出的自定义元数据属性部分详细介绍了此功能。More information about this feature is in the section Custom metadata properties for output.
系统属性列System Property columns 可选。Optional. 需要附加到传出消息而不是附加到有效负载的系统属性和相应列名的键值对。Key value pairs of System Properties and corresponding column names that need to be attached to the outgoing message instead of the payload. 服务总线队列和主题输出的系统属性部分详细介绍了此功能。More information about this feature is in the section System properties for Service Bus Queue and Topic outputs

分区数基于服务总线 SKU 和大小The number of partitions is based on the Service Bus SKU and size. 分区键是每个分区的唯一整数值。The partition key is a unique integer value for each partition.

Azure Cosmos DBAzure Cosmos DB

Azure Cosmos DB 是一种分布全球的数据库服务,它提供全球范围内不设限的弹性缩放、丰富查询和自动索引(经由与构架无关的数据模型)。Azure Cosmos DB is a globally distributed database service that offers limitless elastic scale around the globe, rich query, and automatic indexing over schema-agnostic data models. 若要了解流分析的 Azure Cosmos DB 容器选项,请参阅将 Azure Cosmos DB 用作输出的流分析一文。To learn about Azure Cosmos DB container options for Stream Analytics, see the Stream Analytics with Azure Cosmos DB as output article.

流分析中的 Azure Cosmos DB 输出当前不可在 Azure 中国世纪互联和 Azure 德国 (T-Systems International) 区域中使用。Azure Cosmos DB output from Stream Analytics is currently not available in the Azure China 21Vianet and Azure Germany (T-Systems International) regions.

备注

目前,Azure 流分析仅支持使用 SQL API 连接到 Azure Cosmos DB。At this time, Azure Stream Analytics only supports connection to Azure Cosmos DB by using the SQL API. 尚不支持使用其他 Azure Cosmos DB API。Other Azure Cosmos DB APIs are not yet supported. 如果使用其他 API 将 Azure 流分析指向 创建的 Azure Cosmos DB 帐户,则可能无法正确存储数据。If you point Azure Stream Analytics to the Azure Cosmos DB accounts created with other APIs, the data might not be properly stored.

下表描述了用于创建 Azure Cosmos DB 输出的属性。The following table describes the properties for creating an Azure Cosmos DB output.

属性名称Property name 说明Description
输出别名Output alias 用于在流分析查询中引用此输出的别名。An alias to refer this output in your Stream Analytics query.
接收器Sink Azure Cosmos DB。Azure Cosmos DB.
导入选项Import option 选择“从订阅中选择 Cosmos DB”或“手动提供 Cosmos DB 设置” 。Choose either Select Cosmos DB from your subscription or Provide Cosmos DB settings manually.
帐户 IDAccount ID Azure Cosmos DB 帐户的名称或终结点 URI。The name or endpoint URI of the Azure Cosmos DB account.
帐户密钥Account key Azure Cosmos DB 帐户的共享访问密钥。The shared access key for the Azure Cosmos DB account.
数据库Database Azure Cosmos DB 数据库名称。The Azure Cosmos DB database name.
容器名称Container name 要使用的容器名称,该名称必须在 Cosmos DB 中存在。The container name to be used, which must exist in Cosmos DB. 示例:Example:
  • MyContainer:名为“MyContainer”的容器必须存在。MyContainer: A container named "MyContainer" must exist.
文档 IDDocument ID 可选。Optional. 输出事件中的字段的名称,该字段用于指定插入或更新操作所基于的主键。The name of the field in output events that's used to specify the primary key on which insert or update operations are based.

Azure FunctionsAzure Functions

Azure Functions 是一个无服务器计算服务,可以使用它按需运行代码,而无需显式预配或管理基础结构。Azure Functions is a serverless compute service that you can use to run code on-demand without having to explicitly provision or manage infrastructure. 它允许实现由 Azure 或合作伙伴服务中出现的事件所触发的代码。It lets you implement code that's triggered by events occurring in Azure or partner services. Azure Functions 响应触发的这一功能使其成为 Azure 流分析的自然输出。This ability of Azure Functions to respond to triggers makes it a natural output for Azure Stream Analytics. 此输出适配器允许用户将流分析连接到 Azure Functions,并运行脚本或一段代码来响应各种事件。This output adapter enables users to connect Stream Analytics to Azure Functions, and run a script or piece of code in response to a variety of events.

流分析中的 Azure Functions 输出当前不可在 Azure 中国世纪互联和 Azure 德国 (T-Systems International) 区域中使用。Azure Functions output from Stream Analytics is currently not available in the Azure China 21Vianet and Azure Germany (T-Systems International) regions.

Azure 流分析通过 HTTP 触发器调用 Azure Functions。Azure Stream Analytics invokes Azure Functions via HTTP triggers. 提供具有以下可配置属性的 Azure Functions 输出适配器:The Azure Functions output adapter is available with the following configurable properties:

属性名称Property name 说明Description
函数应用Function app Azure Functions 应用的名称。The name of your Azure Functions app.
函数Function Azure Functions 应用中的函数的名称。The name of the function in your Azure Functions app.
Key 若要使用其他订阅中的 Azure 函数,可提供用于访问该函数的密钥。If you want to use an Azure Function from another subscription, you can do so by providing the key to access your function.
最大批大小Max batch size 此属性可用于设置将发送到 Azure 函数的每个输出批的最大大小。A property that lets you set the maximum size for each output batch that's sent to your Azure function. 输入单元以字节为单位。The input unit is in bytes. 默认情况下,此值为 262,144 字节 (256 KB)。By default, this value is 262,144 bytes (256 KB).
最大批数Max batch count 一个用于指定发送到 Azure Functions 的每个批中的最大事件数的属性。A property that lets you specify the maximum number of events in each batch that's sent to Azure Functions. 默认值为 100。The default value is 100.

对于已成功处理的批,Azure 流分析预期函数应用中的 HTTP 状态为 200。Azure Stream Analytics expects HTTP status 200 from the Functions app for batches that were processed successfully.

当 Azure 流分析从 Azure 函数中收到 413(“http 请求实体过大”)异常时,它将减小发送到 Azure Functions 的批的大小。When Azure Stream Analytics receives a 413 ("http Request Entity Too Large") exception from an Azure function, it reduces the size of the batches that it sends to Azure Functions. 在 Azure 函数代码中,使用此异常以确保 Azure 流分析不会发送过大的批。In your Azure function code, use this exception to make sure that Azure Stream Analytics doesn't send oversized batches. 此外,确保函数中使用的最大批数和最大批大小值与在流分析门户中输入的值一致。Also, make sure that the maximum batch count and size values used in the function are consistent with the values entered in the Stream Analytics portal.

备注

在测试连接过程中,流分析会将空批发送到 Azure Functions,以测试两者之间的连接是否正常。During test connection, Stream Analytics sends an empty batch to Azure Functions to test if the connection between the two works. 确保 Functions 应用处理空批请求,以确保通过连接测试。Make sure that your Functions app handles empty batch requests to make sure test connection passes.

另外,如果时间窗口中没有任何事件登录,则不生成任何输出。Also, in a situation where there's no event landing in a time window, no output is generated. 因此,不会调用 computeResult 函数。As a result, the computeResult function isn't called. 此行为与内置窗口化聚合函数一致。This behavior is consistent with the built-in windowed aggregate functions.

输出的自定义元数据属性Custom metadata properties for output

可将查询列作为用户属性附加到传出的消息。You can attach query columns as user properties to your outgoing messages. 这些列不会进入有效负载。These columns don't go into the payload. 这些属性以字典形式在输出消息中提供。The properties are present in the form of a dictionary on the output message. 键是列名,值是属性字典中的列值。 Key is the column name and value is the column value in the properties dictionary. 支持除“记录”和“数组”以外的其他所有流分析数据类型。All Stream Analytics data types are supported except Record and Array.

支持的输出:Supported outputs:

  • 服务总线队列Service Bus queue
  • 服务总线主题Service Bus topic
  • 事件中心Event hub

以下示例将 DeviceIdDeviceStatus 这两个字段添加到了元数据。In the following example, we add the two fields DeviceId and DeviceStatus to the metadata.

  • 查询:select *, DeviceId, DeviceStatus from iotHubInputQuery: select *, DeviceId, DeviceStatus from iotHubInput
  • 输出配置:DeviceId,DeviceStatusOutput configuration: DeviceId,DeviceStatus

属性列

以下屏幕截图显示了在事件中心通过服务总线资源管理器检查的输出消息属性。The following screenshot shows output message properties inspected in EventHub through Service Bus Explorer.

事件自定义属性

服务总线队列和主题输出的系统属性System properties for Service Bus Queue and Topic outputs

可以将查询列作为系统属性附加到传出的服务总线队列或主题消息。You can attach query columns as system properties to your outgoing service bus Queue or Topic messages. 这些列不会进入有效负载,而是将查询列值填充到相应的 BrokeredMessage 系统属性中。These columns don't go into the payload instead the corresponding BrokeredMessage system property is populated with the query column values. 支持这些系统属性 - MessageId, ContentType, Label, PartitionKey, ReplyTo, SessionId, CorrelationId, To, ForcePersistence, TimeToLive, ScheduledEnqueueTimeUtcThese system properties are supported - MessageId, ContentType, Label, PartitionKey, ReplyTo, SessionId, CorrelationId, To, ForcePersistence, TimeToLive, ScheduledEnqueueTimeUtc. 这些列的字符串值将分析成相应的系统属性值类型,任何分析失败将被视为数据错误。String values of these columns are parsed as corresponding system property value type and any parsing failures are treated as data errors. 此字段以 JSON 对象格式提供。This field is provided as a JSON object format. 有关此格式的详细信息如下:Details about this format are as follows -

  • 用大括号 {} 括住。Surrounded by curly braces {}.
  • 以键值对的形式编写。Written in key/value pairs.
  • 键和值必须是字符串。Keys and values must be strings.
  • 键是系统属性名称,值是查询列名。Key is the system property name and value is the query column name.
  • 键和值以冒号分隔。Keys and values are separated by a colon.
  • 每个键值对以逗号分隔。Each key/value pair is separated by a comma.

下面演示了此属性的用法 -This shows how to use this property -

  • 查询:select *, column1, column2 INTO queueOutput FROM iotHubInputQuery: select *, column1, column2 INTO queueOutput FROM iotHubInput
  • 系统属性列:{ "MessageId": "column1", "PartitionKey": "column2"}System Property Columns: { "MessageId": "column1", "PartitionKey": "column2"}

这会在服务总线队列消息的 MessageId 中设置 column1 的值,并在 PartitionKey 中设置 column2 的值。This sets the MessageId on service bus queue messages with column1's values and PartitionKey is set with column2's values.

分区Partitioning

下表汇总了分区支持,以及每个输出类型的输出写入器数目:The following table summarizes the partition support and the number of output writers for each output type:

输出类型Output type 分区支持Partitioning support 分区键Partition key 输出写入器数目Number of output writers
Azure Data Lake StoreAzure Data Lake Store Yes 在路径前缀模式中使用 {date} 和 {time} 标记。Use {date} and {time} tokens in the path prefix pattern. 选择日期格式,例如 YYYY/MM/DD、DD/MM/YYYY 或 MM-DD-YYYY。Choose the date format, such as YYYY/MM/DD, DD/MM/YYYY, or MM-DD-YYYY. HH 用于时间格式。HH is used for the time format. 按照完全可并行化的查询的输入分区。Follows the input partitioning for fully parallelizable queries.
Azure SQL 数据库Azure SQL Database 是,但需要启用。Yes, needs to enabled. 基于查询中的 PARTITION BY 子句。Based on the PARTITION BY clause in the query. 如果已启用“继承分区”选项,请遵循完全可并行化的查询的输入分区。When Inherit Partitioning option is enabled, follows the input partitioning for fully parallelizable queries. 若要详细了解在将数据载入 Azure SQL 数据库时如何提高写入吞吐量性能,请参阅从 Azure 流分析输出到 Azure SQL 数据库To learn more about achieving better write throughput performance when you're loading data into Azure SQL Database, see Azure Stream Analytics output to Azure SQL Database.
Azure Blob 存储Azure Blob storage Yes 在路径模式中使用事件字段中的 {date} 和 {time} 标记。Use {date} and {time} tokens from your event fields in the path pattern. 选择日期格式,例如 YYYY/MM/DD、DD/MM/YYYY 或 MM-DD-YYYY。Choose the date format, such as YYYY/MM/DD, DD/MM/YYYY, or MM-DD-YYYY. HH 用于时间格式。HH is used for the time format. 可以通过单个自定义事件属性 {fieldname} 或 {datetime:<specifier>} 对 Blob 输出进行分区。Blob output can be partitioned by a single custom event attribute {fieldname} or {datetime:<specifier>}. 按照完全可并行化的查询的输入分区。Follows the input partitioning for fully parallelizable queries.
Azure 事件中心Azure Event Hubs Yes Yes 按分区对齐方式变化。Varies depending on partition alignment.
如果事件中心输出的分区键与上游(上一个)查询步骤不相符,写入器的数量与事件中心输出中的分区数量相同。When the partition key for event hub output is equally aligned with the upstream (previous) query step, the number of writers is the same as the number of partitions in event hub output. 每个写入器使用 EventHubSender 类将事件发送到特定分区。Each writer uses the EventHubSender class to send events to the specific partition.
如果事件中心输出的分区键与上游(上一个)查询步骤不相符,写入器的数量与该上一步骤中的分区数量相同。When the partition key for event hub output is not aligned with the upstream (previous) query step, the number of writers is the same as the number of partitions in that prior step. 每个写入器使用 EventHubClient 中的 SendBatchAsync 类将事件发送到所有输出分区。Each writer uses the SendBatchAsync class in EventHubClient to send events to all the output partitions.
Power BIPower BI No None 不适用。Not applicable.
Azure 表存储Azure Table storage Yes 任何输出列。Any output column. 按照完全并行化的查询的输入分区。Follows the input partitioning for fully parallelized queries.
Azure 服务总线主题Azure Service Bus topic Yes 自动选择。Automatically chosen. 分区数基于服务总线 SKU 和大小The number of partitions is based on the Service Bus SKU and size. 分区键是每个分区的唯一整数值。The partition key is a unique integer value for each partition. 与输出主题中的分区数量相同。Same as the number of partitions in the output topic.
Azure 服务总线队列Azure Service Bus queue Yes 自动选择。Automatically chosen. 分区数基于服务总线 SKU 和大小The number of partitions is based on the Service Bus SKU and size. 分区键是每个分区的唯一整数值。The partition key is a unique integer value for each partition. 与输出队列中的分区数量相同。Same as the number of partitions in the output queue.
Azure Cosmos DBAzure Cosmos DB Yes 基于查询中的 PARTITION BY 子句。Based on the PARTITION BY clause in the query. 按照完全并行化的查询的输入分区。Follows the input partitioning for fully parallelized queries.
Azure FunctionsAzure Functions Yes 基于查询中的 PARTITION BY 子句。Based on the PARTITION BY clause in the query. 按照完全并行化的查询的输入分区。Follows the input partitioning for fully parallelized queries.

还可以在查询中使用 INTO <partition count>子句(请参阅 INTO)来控制输出写入器的数量,这可能有助于实现所需的作业拓扑。The number of output writers can also be controlled using INTO <partition count> (see INTO) clause in your query, which can be helpful in achieving a desired job topology. 如果输出适配器未分区,则一个输入分区中缺少数据将导致延迟最多可达延迟到达的时间量。If your output adapter is not partitioned, lack of data in one input partition will cause a delay up to the late arrival amount of time. 在这种情况下,输出将合并到单个写入器,这可能会导致管道中出现瓶颈。In such cases, the output is merged to a single writer, which might cause bottlenecks in your pipeline. 若要了解有关延迟到达策略的详细信息,请参阅 Azure 流分析事件顺序注意事项To learn more about late arrival policy, see Azure Stream Analytics event order considerations.

输出批大小Output batch size

Azure 流分析使用大小可变的批来处理事件和写入到输出。Azure Stream Analytics uses variable-size batches to process events and write to outputs. 通常流分析引擎不会一次写入一条消息,而是使用批来提高效率。Typically the Stream Analytics engine doesn't write one message at a time, and uses batches for efficiency. 当传入和传出事件的速率较高时,流分析将使用更大的批。When the rate of both the incoming and outgoing events is high, Stream Analytics uses larger batches. 输出速率低时,使用较小的批来保证低延迟。When the egress rate is low, it uses smaller batches to keep latency low.

下表阐述了输出批处理的一些注意事项:The following table explains some of the considerations for output batching:

输出类型Output type 最大消息大小Max message size 批大小优化Batch size optimization
Azure Data Lake StoreAzure Data Lake Store 请参阅 Data Lake Storage 限制See Data Lake Storage limits. 每个写入操作最高 4 MB。Use up to 4 MB per write operation.
Azure SQL 数据库Azure SQL Database 可使用最大批计数进行配置。Configurable using Max batch count. 默认情况下,单个批量插入操作最多可插入 10,000 行,最少可插入 100 行。10,000 maximum and 100 minimum rows per single bulk insert by default.
请参阅 Azure SQL 限制See Azure SQL limits.
每个批最初是按照最大批计数批量插入的。Every batch is initially bulk inserted with maximum batch count. 根据 SQL 的可重试错误对半拆分批(直到达到最小批计数)。Batch is split in half (until minimum batch count) based on retryable errors from SQL.
Azure Blob 存储Azure Blob storage 参阅 Azure 存储限制See Azure Storage limits. 最大 Blob 块大小为 4 MB。The maximum blob block size is 4 MB.
最大 Blob 块计数为 50,000。The maximum blob bock count is 50,000.
Azure 事件中心Azure Event Hubs 每条消息 256 KB 或 1 MB。256 KB or 1 MB per message.
请参阅事件中心限制See Event Hubs limits.
如果输入/输出分区未对齐,则每个事件将单独打包成不超过最大消息大小的 EventData,并在批中发送。When input/output partitioning isn't aligned, each event is packed individually in EventData and sent in a batch of up to the maximum message size. 如果使用自定义元数据属性,也会发生这种情况。This also happens if custom metadata properties are used.

如果输入/输出分区已对齐,则多个事件将打包成不超过最大消息大小的单个 EventData 实例,然后发送。When input/output partitioning is aligned, multiple events are packed into a single EventData instance, up to the maximum message size, and sent.
Power BIPower BI 请参阅 Power BI REST API 限制See Power BI Rest API limits.
Azure 表存储Azure Table storage 参阅 Azure 存储限制See Azure Storage limits. 默认为单个事务 100 个实体。The default is 100 entities per single transaction. 可根据需要将其配置为更小的值。You can configure it to a smaller value as needed.
Azure 服务总线队列Azure Service Bus queue 在标准层中为每条消息 256 KB,在高级层中为 1MB。256 KB per message for Standard tier, 1MB for Premium tier.
请参阅服务总线限制See Service Bus limits.
对每条消息使用单个事件。Use a single event per message.
Azure 服务总线主题Azure Service Bus topic 在标准层中为每条消息 256 KB,在高级层中为 1MB。256 KB per message for Standard tier, 1MB for Premium tier.
请参阅服务总线限制See Service Bus limits.
对每条消息使用单个事件。Use a single event per message.
Azure Cosmos DBAzure Cosmos DB 请参阅 Azure Cosmos DB 限制See Azure Cosmos DB limits. 批大小和写入频率根据 Azure Cosmos DB 响应动态调整。Batch size and write frequency are adjusted dynamically based on Azure Cosmos DB responses.
流分析不会施加预先确定的限制。There are no predetermined limitations from Stream Analytics.
Azure FunctionsAzure Functions 默认批大小为 262,144 字节 (256 KB)。The default batch size is 262,144 bytes (256 KB).
每个批的默认事件计数为 100。The default event count per batch is 100.
批大小是可配置的,可在流分析输出选项中增加或减少。The batch size is configurable and can be increased or decreased in the Stream Analytics output options.

后续步骤Next steps