使用参考数据在流分析中查找

参考数据是一个有限的数据集,它的性质是静态的或缓慢变化的。 它用于执行查找或增加数据流。 参考数据也被称为查找表。

以 IoT 方案为例。 你可以在参考数据中存储有关传感器的元数据,这些数据不经常变化。 然后,可以将它与实时 IoT 数据流联接起来。

Azure 流分析在内存中加载参考数据以实现低延迟流处理。 为了在流分析作业中利用参考数据,通常会在查询中使用参考数据联接

示例

当汽车经过收费站时,你可以有一个实时生成的事件流。 收费站可以实时捕获车牌。 该数据可以与包含注册详细信息的静态数据集相联接,用来识别已过期的车牌。

SELECT I1.EntryTime, I1.LicensePlate, I1.TollId, R.RegistrationId  
FROM Input1 I1 TIMESTAMP BY EntryTime  
JOIN Registration R  
ON I1.LicensePlate = R.LicensePlate  
WHERE R.Expired = '1'

流分析支持将 Azure Blob 存储和 Azure SQL 数据库用作参考数据的存储层。 如果其他数据存储中有参考数据,请尝试使用 Azure 数据工厂将数据提取、转换和加载到受支持的数据存储之一。 有关更多信息,请参阅 Azure 数据工厂中复制活动的概述

Azure Blob 存储

参考数据建模为 blob 序列,这些 blob 按 blob 名称中指定的日期/时间顺序升序排列。 只有通过使用大于序列中最后一个 blob 指定的日期/时间的日期/时间,才能将 blob 添加到序列的末尾。 blob 是在输入配置中定义的。

有关详细信息,请参阅将 Blob 存储中的参考数据用于流分析作业

配置 blob 参考数据

为了配置参考数据,首先需要创建一个属于“参考数据”类型的输入。 下表介绍了在创建参考数据输入时需要提供的每个属性及其说明。

属性名称 说明
输入别名 作业查询中使用的一个友好名称,用于引用此输入。
存储帐户 存储 blob 的存储帐户的名称。 如果它与你的流分析作业在同一个订阅中,可以从下拉菜单中进行选择。
存储帐户密钥 与存储帐户关联的密钥。 如果存储帐户和你的流分析作业在同一个订阅中,此密钥会自动填充。
存储容器 容器对存储在 Blob 存储中的 blob 进行逻辑分组。 将 blob 上传到 Blob 存储时,必须为该 blob 指定一个容器。
路径模式 这个必需的属性用于在指定的容器中定位 blob。 在路径中,可以选择指定一个或多个使用变量 {date} 和 {time} 的实例。
示例 1:products/{date}/{time}/product-list.csv
示例 2:products/{date}/product-list.csv
示例 3:product-list.csv

如果 blob 不存在于指定的路径中,流分析作业将无限期地等待 blob 变为可用状态。
日期格式 [可选] 如果你在指定的路径模式中使用了 {date},请从支持的格式的下拉列表中选择用于组织 blob 的日期格式。
示例:YYYY/MM/DD 或 MM/DD/YYYY
时间格式 [可选] 如果你在指定的路径模式中使用了 {time},请从支持的格式的下拉列表中选择用于组织 blob 的时间格式。
示例:HH、HH/mm、或 HH-mm
事件序列化格式 为确保查询按预计的方式运行,流分析需要了解你对传入数据流使用哪种序列化格式。 对于参考数据,支持的格式为 CSV 和 JSON。
编码 目前只支持 UTF-8 这种编码格式。

静态参考数据

你的参考数据应该不会更改。 要启用对静态参考数据的支持,请在输入配置中指定一个静态路径。

流分析从指定的路径选取 blob。 {date} 和 {time} 替换标记不是必需的。 由于参考数据在流分析中是不可变的,因此不建议覆盖静态参考数据 blob。

按计划生成参考数据

参考数据可能是一个缓慢变化的数据集。 为了刷新参考数据,可使用 {date} 和 {time} 替换标记在输入配置中指定路径模式。 流分析根据此路径模式选取更新的引用数据定义。

例如,日期格式为“YYYY-MM-DD”、时间格式为“HH-mm”的 sample/{date}/{time}/products.csv 模式可指示流分析在 2015 年 4 月 16 日下午 5:30 (UTC) 选取更新后的 blob sample/2015-04-16/17-30/products.csv

流分析以一分钟的时间间隔自动扫描刷新的参考数据 blob。 时间戳为 10:30:00 的 blob 在上传时可能会有短暂的延迟,例如 10:30:30。 你将注意到引用此 blob 的流分析作业会有短暂延迟。

为了避免这种情况,请上传早于目标生效时间的 blob,在本例中为 10:30:00。 流分析作业现在有足够的时间来发现 blob、将其加载到内存中并执行操作。

注意

目前,流分析作业只有在计算机时间提前到 blob 名称中的编码时间时,才会查找 blob 刷新。 例如,作业会尽快查找 sample/2015-04-16/17-30/products.csv,但不会早于 2015 年 4 月 16 日下午 5:30 (UTC)。 它绝不会查找编码时间早于发现的上一个 blob 的 blob。

例如,作业在找到 blob sample/2015-04-16/17-30/products.csv 后,将忽略编码日期早于 2015 年 4 月 16 日下午 5:30 的任何文件。 如果在同一个容器中创建了晚到的 sample/2015-04-16/17-25/products.csv blob,作业就不会使用它。

在另一个示例中,仅在 2015 年 4 月 16 日晚上 10:03 生成了 sample/2015-04-16/17-30/products.csv,但容器中没有更早日期的 blob。 因此,作业从 2015 年 4 月 16 日晚上 10:03 开始使用此文件,并在此之前使用以前的参考数据。

这种行为的一个例外是,当作业需要重新处理过去的数据时,或者当作业第一次启动时。

在开始时,作业会查找在指定的作业开始时间之前生成的最新 blob。 此行为可确保在作业开始时有一个非空的参考数据集。 如果找不到参考数据集,作业会显示以下诊断:Initializing input without a valid reference data blob for UTC time <start time>

刷新参考数据集时,将生成诊断日志:Loaded new reference data from <blob path>。 由于多种原因,作业可能需要重新加载以前的参考数据集。 最常见的原因是要重新处理过去的数据。 这时会生成相同的诊断日志。 此操作并不意味着当前流数据将使用过去的参考数据。

Azure 数据工厂可用来安排以下任务:创建流分析更新引用数据定义所需的已更新 blob。

数据工厂是一项基于云的数据集成服务,可对数据移动和转换进行安排并使其实现自动化。 数据工厂支持连接到大量基于云的数据存储和本地数据存储。 它可以按照你指定的定期日程安排轻松地移动数据。

有关如何设置数据工厂管道,为流分析生成参考数据,并按预定的日程安排刷新的详细信息,请查看此 GitHub 示例

有关刷新 Blob 参考数据的提示

  • 不要覆盖参考数据 blob,因为它们是不可变的。
  • 刷新参考数据的推荐方法是:
    • 在路径模式中使用 {date}/{time}。
    • 使用作业输入中定义的相同容器和路径模式来添加新 blob。
    • 使用大于序列中最后一个 Blob 指定的日期/时间。
  • 参考数据 blob 不是按 blob 的“上次修改”时间排序的。 它们只按 blob 名称中使用 {date} 和 {time} 替换标记指定的日期和时间排序。
  • 为了避免列出大量 blob,请删除不再对其进行处理的旧 blob。 在某些情况下(如重启),流分析可能需要重新处理少量数据。

Azure SQL Database

流分析作业会检索 SQL 数据库参考数据,并将其作为快照存储在内存中进行处理。 参考数据的快照还存储在存储帐户中的一个容器中。 你可在配置设置中指定该存储帐户。

作业启动时自动创建容器。 如果作业已停止或进入失败状态,则在重启作业时会删除自动创建的容器。

如果参考数据是一个缓慢变化的数据集,你需要定期刷新作业中使用的快照。

使用流分析,可以在配置 SQL 数据库输入连接时设置刷新率。 流分析运行时将按刷新率指定的间隔查询 SQL 数据库实例。 支持的最快刷新率是每分钟一次。 对于每次刷新,流分析都会在所提供的存储帐户中存储一个新快照。

流分析提供了两个用于查询 SQL 数据库实例的选项。 快照查询是必需的,必须包括在每个作业中。 流分析按照你的刷新间隔定期运行快照查询。 它使用查询的结果(快照)作为参考数据集。

快照查询应该适用于大多数场景。 如果你遇到大数据集和快速刷新率的性能问题,请使用增量查询选项。 返回参考数据集所需时间超过 60 秒的查询会导致超时。

使用增量查询选项时,流分析最初会运行快照查询来获取基线参考数据集。 之后,流分析会根据刷新间隔定期运行增量查询来检索增量更改。 这些增量更改不断应用到参考数据集,使其保持更新。 使用增量查询选项有助于减少存储成本和网络 I/O 操作。

配置 SQL 数据库参考数据

为了配置 SQL 数据库参考数据,首先需要创建参考数据输入。 下表介绍了在创建参考数据输入时需要提供的每个属性及其说明。

可以使用 Azure SQL 托管实例作为参考数据输入。 必须在 SQL 托管实例中配置公共终结点。 然后,在流分析中手动配置以下设置。 通过手动配置以下设置,还支持运行附加了数据库的 SQL Server 的 Azure 虚拟机。

属性名称 说明
输入别名 作业查询中使用的一个友好名称,用于引用此输入。
订阅 你的订阅。
数据库 包含你的参考数据的 SQL 数据库实例。 对于 SQL 托管实例,必须指定端口 3342。 示例为 sampleserver.public.database.chinacloudapi.cn,3342
用户名 与你的 SQL 数据库实例关联的用户名。
Password 与你的 SQL 数据库实例关联的密码。
定期刷新 此选项用来选择刷新率。 选择“开”将可以按 DD:HH:MM 格式指定刷新率。
快照查询 这是从你的 SQL 数据库实例中检索参考数据的默认查询选项。
增量查询 对于具有大型数据集和较快刷新率的高级方案,请添加增量查询。

大小限制

请使用小于 300 MB 的参考数据集,以获得最佳性能。 具有 6 个或更多流单元的作业支持 5 GB 或更小的参考数据集。 使用较大的参考数据集可能会影响作业的端到端延迟。

查询的复杂性可以增加到包括有状态处理,如窗口化聚合、临时联接和临时分析函数。 当复杂性增加时,支持的参考数据的最大大小将减小。

如果流分析无法加载参考数据并执行复杂的操作,作业将耗尽内存并失败。 在这种情况下,流单元利用率百分比指标将达到 100%。

流单元数 建议大小
1 50 MB 或更小
3 150 MB 或更小
至少 6 5 GB 或更小

参考数据不支持压缩。

在作业中联接多个参考数据集

只能将参考数据输入联接到流式处理输入。 要联接多个参考数据集,请将查询分解为多个步骤。 下面是一个示例:

With Step1 as (
    --JOIN input stream with reference data to get 'Desc'
    SELECT streamInput.*, refData1.Desc as Desc
    FROM    streamInput
    JOIN    refData1 ON refData1.key = streamInput.key 
)
--Now Join Step1 with second reference data
SELECT *
INTO    output 
FROM    Step1
JOIN    refData2 ON refData2.Desc = Step1.Desc 

IoT Edge 作业

流分析边缘作业仅支持本地参考数据。 将作业部署到 IoT Edge 设备时,它将从用户定义的文件路径加载参考数据。 在设备上将参考数据文件准备就绪。

对于 Windows 容器,请将参考数据文件放置在本地驱动器上并通过 Docker 容器共享本地驱动器。 对于 Linux 容器,请创建一个 Docker 卷并将该数据文件填充到该卷。

IoT Edge 上的参考数据更新是由部署触发的。 触发后,流分析模块在不停止正在运行的作业的情况下选取更新后的数据。

可以通过两种方式更新参考数据:

  • 从 Azure 门户中更新流分析作业中的参考数据路径。
  • 更新 IoT Edge 部署。

后续步骤