使用参考数据在流分析中查找Using reference data for lookups in Stream Analytics

参考数据(也称为查找表)是本质上静态或缓慢变化的有限数据集,用于执行查找或扩充数据流。Reference data (also known as a lookup table) is a finite data set that is static or slowly changing in nature, used to perform a lookup or to augment your data streams. 例如,在 IoT 方案中,可以将关于传感器的元数据(不经常更改)存储在参考数据中,并将其与实时 IoT 数据流相联接。For example, in an IoT scenario, you could store metadata about sensors (which don't change often) in reference data and join it with real time IoT data streams. Azure 流分析在内存中加载参考数据以实现低延迟流处理。Azure Stream Analytics loads reference data in memory to achieve low latency stream processing. 为了在 Azure 流分析作业中利用参考数据,通常会在查询中使用参考数据联接To make use of reference data in your Azure Stream Analytics job, you will generally use a Reference Data Join in your query.

示例Example

当汽车经过收费站时,可以实时生成事件流。You can have a real time stream of events generated when cars pass a toll booth. 收费站可以实时捕获车牌,并与包含注册详细信息的静态数据集联接,以识别已过期的车牌。The toll booth can capture the license plate in real time and join with a static dataset that has registration details to identify license plates that have expired.

SELECT I1.EntryTime, I1.LicensePlate, I1.TollId, R.RegistrationId  
FROM Input1 I1 TIMESTAMP BY EntryTime  
JOIN Registration R  
ON I1.LicensePlate = R.LicensePlate  
WHERE R.Expired = '1'

流分析支持将 Azure Blob 存储和 Azure SQL 数据库用作参考数据的存储层。Stream Analytics supports Azure Blob storage and Azure SQL Database as the storage layer for Reference Data. 你还可以通过 Azure 数据工厂对参考数据进行转换并/或将其复制到 Blob 存储,以使用任意数量的基于云的数据存储和本地数据存储You can also transform and/or copy reference data to Blob storage from Azure Data Factory to use any number of cloud-based and on-premises data stores.

Azure Blob 存储Azure Blob storage

引用数据建模为 blob 序列(在输入配置中定义),这些 blob 按blob 名称中指定的日期/时间顺序升序排列。Reference data is modeled as a sequence of blobs (defined in the input configuration) in ascending order of the date/time specified in the blob name. 支持使用大于序列中最后一个 blob 指定的日期/时间的日期/时间添加到序列的末尾。It only supports adding to the end of the sequence by using a date/time greater than the one specified by the last blob in the sequence.

配置 blob 参考数据Configure blob reference data

若要配置引用数据,首先需要创建一个属于“引用数据”类型的输入。To configure your reference data, you first need to create an input that is of type Reference Data. 下表介绍在根据说明创建引用数据输入时需要提供的每个属性:The table below explains each property that you will need to provide while creating the reference data input with its description:

属性名称Property Name 说明Description
输入别名Input Alias 一个友好名称会用于作业查询,以便引用此输入。A friendly name that will be used in the job query to reference this input.
存储帐户Storage Account 存储 blob 的存储帐户的名称。The name of the storage account where your blobs are located. 如果其订阅与流分析作业相同,可以从下拉菜单中选择它。If it's in the same subscription as your Stream Analytics Job, you can select it from the drop-down.
存储帐户密钥Storage Account Key 与存储帐户关联的密钥。The secret key associated with the storage account. 如果存储帐户的订阅与流分析作业相同,则自动填充此密钥。This gets automatically populated if the storage account is in the same subscription as your Stream Analytics job.
存储容器Storage Container 容器对存储在 Microsoft Azure Blob 服务中的 blob 进行逻辑分组。Containers provide a logical grouping for blobs stored in the Microsoft Azure Blob service. 将 blob 上传到 Blob 服务时,必须为该 blob 指定一个容器。When you upload a blob to the Blob service, you must specify a container for that blob.
路径模式Path Pattern 这是必需属性,用于在指定的容器中定位 blob。This is a required property that is used to locate your blobs within the specified container. 在路径中,可以选择指定一个或多个使用以下 2 个变量的实例:Within the path, you may choose to specify one or more instances of the following 2 variables:
{date}、{time}{date}, {time}
示例 1:products/{date}/{time}/product-list.csvExample 1: products/{date}/{time}/product-list.csv
示例 2:products/{date}/product-list.csvExample 2: products/{date}/product-list.csv
示例 3:product-list.csvExample 3: product-list.csv

如果指定路径中不存在 blob,流分析作业将无限期地等待 blob 变为可用状态。If the blob doesn't exist in the specified path, the Stream Analytics job will wait indefinitely for the blob to become available.
日期格式 [可选]Date Format [optional] 如果在指定的路径模式中使用了 {date},则可从支持格式的下拉列表中选择组织 blob 所用的日期格式。If you have used {date} within the Path Pattern that you specified, then you can select the date format in which your blobs are organized from the drop-down of supported formats.
示例:YYYY/MM/DD、MM/DD/YYYY,等等。Example: YYYY/MM/DD, MM/DD/YYYY, etc.
时间格式 [可选]Time Format [optional] 如果在指定的路径模式中使用了 {time},则可从支持格式的下拉列表中选择组织 blob 所用的时间格式。If you have used {time} within the Path Pattern that you specified, then you can select the time format in which your blobs are organized from the drop-down of supported formats.
示例:HH、HH/mm、或 HH-mm。Example: HH, HH/mm, or HH-mm.
事件序列化格式Event Serialization Format 为确保查询按预计的方式运行,流分析需要了解你对传入数据流使用哪种序列化格式。To make sure your queries work the way you expect, Stream Analytics needs to know which serialization format you're using for incoming data streams. 对于引用数据,所支持的格式是 CSV 和 JSON。For Reference Data, the supported formats are CSV and JSON.
编码Encoding 目前只支持 UTF-8 这种编码格式。UTF-8 is the only supported encoding format at this time.

静态参考数据Static reference data

如果不希望参考数据发生变化,则可以通过在输入配置中指定静态路径来启用对静态参考数据的支持。If your reference data is not expected to change, then support for static reference data is enabled by specifying a static path in the input configuration. Azure 流分析从指定路径中获取 Blob。Azure Stream Analytics picks up the blob from the specified path. 不需要 {date} 和 {time} 替换令牌。{date} and {time} substitution tokens aren't required. 由于参考数据在流分析中不可变,因此不建议覆盖静态参考数据 Blob。Because reference data is immutable in Stream Analytics, overwriting a static reference data blob is not recommended.

按计划生成参考数据Generate reference data on a schedule

如果引用数据是缓慢变化的数据集,则使用 {date} 和 {time} 替换令牌在输入配置中指定路径模式即可实现对刷新引用数据的支持。If your reference data is a slowly changing data set, then support for refreshing reference data is enabled by specifying a path pattern in the input configuration using the {date} and {time} substitution tokens. 流分析会根据此路径模式选取更新的引用数据定义。Stream Analytics picks up the updated reference data definitions based on this path pattern. 例如,使用 sample/{date}/{time}/products.csv 模式时,日期格式为“YYYY-MM-DD”,时间格式为“HH-mm”,可指示流分析在 2015 年 4 月 16 日下午 5:30(UTC 时区)提取更新的 Blob sample/2015-04-16/17-30/products.csvFor example, a pattern of sample/{date}/{time}/products.csv with a date format of "YYYY-MM-DD" and a time format of "HH-mm" instructs Stream Analytics to pick up the updated blob sample/2015-04-16/17-30/products.csv at 5:30 PM on April 16th, 2015 UTC time zone.

Azure 流分析每间隔一分钟都会自动扫描刷新的参考数据 Blob。Azure Stream Analytics automatically scans for refreshed reference data blobs at a one minute interval. 如果时间戳为 10:30:00 的 blob 上传时有一个小延迟(例如,10:30:30),你会注意到引用此 blob 的流分析作业中也会有一个小延迟。If a blob with timestamp 10:30:00 is uploaded with a small delay (for example, 10:30:30), you will notice a small delay in Stream Analytics job referencing this blob. 为了避免这种情况,建议在目标有效时间(本例中为 10:30:00)之前上传 blob,以便流分析作业有足够的时间来发现它并将它加载到内存中执行操作。To avoid such scenarios, it is recommended to upload the blob earlier than the target effective time (10:30:00 in this example) to allow the Stream Analytics job enough time to discover and load it in memory and perform operations.

Note

当前,流分析作业仅在计算机时间提前于 blob 名称中的编码时间时才查找 blob 刷新。Currently Stream Analytics jobs look for the blob refresh only when the machine time advances to the time encoded in the blob name. 例如,该作业将尽可能查找 sample/2015-04-16/17-30/products.csv,但不会早于 2015 年 4 月 16 日下午 5:30(UTC 时区)。For example, the job will look for sample/2015-04-16/17-30/products.csv as soon as possible but no earlier than 5:30 PM on April 16th, 2015 UTC time zone. 永远不会查找编码时间早于发现的上一个 blob 的 blob。It will never look for a blob with an encoded time earlier than the last one that is discovered.

例如,作业找到 blob sample/2015-04-16/17-30/products.csv 后,会忽略编码日期早于 2015 年 4 月 16 日下午 5:30 的任何文件,因此如果晚到达的 sample/2015-04-16/17-25/products.csv blob 在同一容器中创建,该作业将不会使用它。For example, once the job finds the blob sample/2015-04-16/17-30/products.csv it will ignore any files with an encoded date earlier than 5:30 PM April 16th, 2015 so if a late arriving sample/2015-04-16/17-25/products.csv blob gets created in the same container the job will not use it.

同样,如果 sample/2015-04-16/17-30/products.csv 仅在 2015 年 4 月 16 日晚上 10:03 生成,但容器中没有更早日期的 blob,则该作业将从 2015 年 4 月 16 日晚上 10:03 起开始使用此文件,而在此之前使用以前的引用数据。Likewise if sample/2015-04-16/17-30/products.csv is only produced at 10:03 PM April 16th, 2015 but no blob with an earlier date is present in the container, the job will use this file starting at 10:03 PM April 16th, 2015 and use the previous reference data until then.

这种情况的一个例外是,当作业需要按时重新处理数据时或第一次启动作业时。An exception to this is when the job needs to re-process data back in time or when the job is first started. 开始时,作业查找的是在指定的作业开始时间之前生成的最新 blob。At start time the job is looking for the most recent blob produced before the job start time specified. 这样做是为了确保在作业开始时存在非空引用数据集。This is done to ensure that there is a non-empty reference data set when the job starts. 如果找不到引用数据集,该作业会显示以下诊断:Initializing input without a valid reference data blob for UTC time <start time>If one cannot be found, the job displays the following diagnostic: Initializing input without a valid reference data blob for UTC time <start time>.

Azure 数据工厂可用来安排以下任务:创建流分析更新引用数据定义所需的已更新 blob。Azure Data Factory can be used to orchestrate the task of creating the updated blobs required by Stream Analytics to update reference data definitions. 数据工厂是一项基于云的数据集成服务,可对数据移动和转换进行安排并使其实现自动化。Data Factory is a cloud-based data integration service that orchestrates and automates the movement and transformation of data. 数据工厂支持连接到大量基于云和本地的数据存储以及按指定的定期计划轻松地移动数据。Data Factory supports connecting to a large number of cloud based and on-premises data stores and moving data easily on a regular schedule that you specify. 有关如何将数据工厂管道设置为生成按预定义计划刷新的流分析引用数据的详细信息和分步指导,请查看此 GitHub 示例For more information and step by step guidance on how to set up a Data Factory pipeline to generate reference data for Stream Analytics which refreshes on a pre-defined schedule, check out this GitHub sample.

有关刷新 Blob 参考数据的提示Tips on refreshing blob reference data

  1. 请勿覆盖参考数据 Blob,因为它们是不可变的。Do not overwrite reference data blobs as they are immutable.
  2. 刷新参考数据的推荐方法是:The recommended way to refresh reference data is to:
    • 使用路径模式中的 {date}/{time}Use {date}/{time} in the path pattern
    • 使用作业输入中定义的相同容器和路径模式来添加新 BlobAdd a new blob using the same container and path pattern defined in the job input
    • 使用大于序列中最后一个 Blob 指定的日期/时间。Use a date/time greater than the one specified by the last blob in the sequence.
  3. 引用数据 Blob 按 Blob 的“上次修改”时间排序,而是按 Blob 名称中使用 {date} 和 {time} 替换项指定的日期和时间排序。Reference data blobs are not ordered by the blob's "Last Modified" time but only by the time and date specified in the blob name using the {date} and {time} substitutions.
  4. 为了避免必须列出大量 blob,请考虑删除不再对其进行处理的非常旧的 blob。To avoid having to list large number of blobs, consider deleting very old blobs for which processing will no longer be done. 请注意,在某些情况下(如重新启动),ASA 可能需要重新处理一小部分 blob。Please note that ASA might go have to reprocess a small amount in some scenarios like a restart.

Azure SQL DatabaseAzure SQL Database

Azure SQL 数据库参考数据由流分析作业进行检索并作为快照存储在内存中以用于处理。Azure SQL Database reference data is retrieved by your Stream Analytics job and is stored as a snapshot in memory for processing. 参考数据的快照还存储在你在配置设置中指定的存储帐户中的一个容器中。The snapshot of your reference data is also stored in a container in a storage account that you specify in the configuration settings. 作业启动时自动创建容器。The container is auto-created when the job starts. 如果作业已停止或进入失败状态,则在重新启动作业时将删除自动创建的容器。If the job is stopped or enters a failed state, the auto-created containers are deleted when the job is restarted.

如果参考数据是缓慢变化的数据集,则需要定期刷新作业中使用的快照。If your reference data is a slowly changing data set, you need to periodically refresh the snapshot that is used in your job. 流分析允许你在配置 Azure SQL 数据库输入连接时设置刷新率。Stream Analytics allows you to set a refresh rate when you configure your Azure SQL Database input connection. 流分析运行时将按刷新率指定的时间间隔查询 Azure SQL 数据库。The Stream Analytics runtime will query your Azure SQL Database at the interval specified by the refresh rate. 支持的最快刷新率是每分钟一次。The fastest refresh rate supported is once per minute. 对于每次刷新,流分析都会在所提供的存储帐户中存储一个新快照。For each refresh, Stream Analytics stores a new snapshot in the storage account provided.

流分析提供了两个用于查询 Azure SQL 数据库的选项。Stream Analytics provides two options for querying your Azure SQL Database. 快照查询是必需的,必须包括在每个作业中。A snapshot query is mandatory and must be included in each job. 流分析根据刷新时间间隔定期运行快照查询,并将查询结果(快照)用作参考数据集。Stream Analytics runs the snapshot query periodically based on your refresh interval and uses the result of the query (the snapshot) as the reference data set. 快照查询在大多数情况下应该都是适用的,但如果在使用大型数据集和快速刷新率时出现性能问题,则可以使用增量查询选项。The snapshot query should fit most scenarios, but if you run into performance issues with large data sets and fast refresh rates, you can use the delta query option. 返回参考数据集所需时间超过 60 秒的查询会导致超时。Queries that take more than 60 seconds to return reference data set will result in a timeout.

使用增量查询选项时,流分析最初会运行快照查询来获取基线参考数据集。With the delta query option, Stream Analytics runs the snapshot query initially to get a baseline reference data set. 之后,流分析会根据刷新时间间隔定期运行增量查询来检索增量更改。After, Stream Analytics runs the delta query periodically based on your refresh interval to retrieve incremental changes. 这些增量更改不断适用于参考数据集,以使其保持更新。These incremental changes are continually applied to the reference data set to keep it updated. 使用增量查询有助于减少存储成本和网络 I/O 操作。Using delta query may help reduce storage cost and network I/O operations.

配置 SQL 数据库参考Configure SQL Database reference

若要配置 SQL 数据库参考数据,首先需要创建参考数据输入。To configure your SQL Database reference data, you first need to create Reference Data input. 下表介绍了在创建参考数据输入时需要提供的每个属性及其说明。The table below explains each property that you will need to provide while creating the reference data input with its description. 有关详细信息,请参阅将 SQL 数据库中的参考数据用于 Azure 流分析作业For more information, see Use reference data from a SQL Database for an Azure Stream Analytics job.

可以使用 Azure SQL 数据库托管实例作为参考数据输入。You can use Azure SQL Database Managed Instance as a reference data input. 必须在 Azure SQL 数据库托管实例中配置公共终结点,然后在 Azure 流分析中手动配置以下设置。You have to configure public endpoint in Azure SQL Database Managed Instance and then manually configure the following settings in Azure Stream Analytics. 如果手动配置以下设置,则还支持其运行的 SQL Server 附加了数据库的 Azure 虚拟机。Azure virtual machine running SQL Server with a database attached is also supported by manually configuring the settings below.

属性名称Property Name 说明Description
输入别名Input alias 一个友好名称会用于作业查询,以便引用此输入。A friendly name that will be used in the job query to reference this input.
订阅Subscription 选择自己的订阅Choose your subscription
数据库Database 包含参考数据的 Azure SQL 数据库。The Azure SQL Database that contains your reference data. 对于 Azure SQL 数据库托管实例,需要指定端口 3342。For Azure SQL Database Managed Instance, it is required to specify the port 3342. 例如,“sampleserver.public.database.chinacloudapi.cn,3342” For example, sampleserver.public.database.chinacloudapi.cn,3342
用户名Username 与 Azure SQL 数据库关联的用户名。The username associated with your Azure SQL Database.
密码Password 与 Azure SQL 数据库关联的密码。The password associated with your Azure SQL Database.
定期刷新Refresh periodically 此选项用来选择刷新率。This option allows you to choose a refresh rate. 选择“开启”将允许你以 DD:HH:MM 格式指定刷新率。Choosing "On" will allow you to specify the refresh rate in DD:HH:MM.
快照查询Snapshot query 这是从 SQL 数据库中检索参考数据的默认查询选项。This is the default query option that retrieves the reference data from your SQL Database.
增量查询Delta query 对于使用大型数据集和较短刷新率的高级方案,可选择此项来添加增量查询。For advanced scenarios with large data sets and a short refresh rate, choose to add a delta query.

大小限制Size limitation

建议使用小于 300 MB 的参考数据集,以获得最佳性能。It is recommended to use reference datasets which are less than 300 MB for best performance. 具有 6 个或更多 SU 的作业支持使用大于 300 MB 的参考数据。Usage of reference data greater than 300 MB is supported in jobs with 6 SUs or more. 此功能为预览版,不得在生产环境中使用。This functionality is in preview and must not be used in production. 使用非常大的参考数据可能会影响作业的性能。Using a very large reference data may impact performance of your job. 随着查询复杂性增加以包括有状态处理(如开窗聚合、临时联接接和临时分析函数),支持的参考数据最大大小将会减小。As the complexity of query increases to include stateful processing, such as windowed aggregates, temporal joins and temporal analytic functions, it is expected that the maximum supported size of reference data decreases. 如果 Azure 流分析无法加载参考数据并执行复杂操作,则作业将耗尽内存并失败。If Azure Stream Analytics cannot load the reference data and perform complex operations, the job will run out of memory and fail. 在这种情况下,SU % 利用率指标将达到 100%。In such cases, SU % Utilization metric will reach 100%.

流单元数Number of Streaming Units 建议的大小Recommended Size
11 50 MB 或更小50 MB or lower
33 150 MB 或更小150 MB or lower
至少 66 and beyond 300 MB 或更小。300 MB or lower. 预览版支持使用大于 300 MB 的参考数据,但这可能会影响作业的性能。Using reference data greater than 300 MB is supported in preview and could impact performance of your job.

对压缩的支持不可用于参考数据。Support for compression is not available for reference data.

在作业中联接多个参考数据集Joining multiple reference datasets in a job

在查询的单个步骤中,只能将一个流输入与一个参考数据输入联接在一起。You can join only one stream input with one reference data input in a single step of your query. 但是,可以通过将查询分解成多个步骤来联接多个参考数据集。However, you can join multiple reference datasets by breaking down your query into multiple steps. 下面显示了一个示例。An example is shown below.

With Step1 as (
    --JOIN input stream with reference data to get 'Desc'
    SELECT streamInput.*, refData1.Desc as Desc
    FROM    streamInput
    JOIN    refData1 ON refData1.key = streamInput.key 
)
--Now Join Step1 with second reference data
SELECT *
INTO    output 
FROM    Step1
JOIN    refData2 ON refData2.Desc = Step1.Desc 

后续步骤Next steps