Azure 流分析中的查询以类似于 SQL 的查询语言表示。 语言构造记录在 流分析查询语言参考 指南中。
查询设计可以表达简单的直通逻辑,以将事件数据从一个输入流移动到输出数据存储,也可以执行丰富的模式匹配和时态分析,以使用流分析指南在各种时间窗口(如 生成 IoT 解决方案 中所示)计算聚合。 可以联接来自多个输入的数据来合并流事件,并且可以针对静态引用数据执行查找以扩充事件值。 还可以将数据写入多个输出。
本文概述了基于真实场景的多个常见查询模式的解决方案。
支持的数据格式
Azure 流分析支持以 CSV、JSON 和 Avro 数据格式处理事件。 JSON 和 Avro 格式可以包含复杂类型,例如嵌套对象(记录)或数组。 有关使用这些复杂数据类型的详细信息,请参阅 分析 JSON 和 AVRO 数据。
将数据发送到多个输出
多个 SELECT 语句可用于将数据输出到不同的输出接收器。 例如,一个 SELECT 语句可以输出基于阈值的警报,而另一个语句可将事件输出到 Blob 存储。
请考虑以下 输入:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
你希望从查询中获得以下两个输出:
ArchiveOutput:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
AlertOutput:
| Make | Time | Count |
| --- | --- | --- |
| Make2 |2023-01-01T00:00:10.0000000Z |3 |
使用两个 SELECT 语句进行查询,其中包含存档输出和警报输出作为输出:
SELECT
*
INTO
ArchiveOutput
FROM
Input TIMESTAMP BY Time
SELECT
Make,
System.TimeStamp() AS Time,
COUNT(*) AS [Count]
INTO
AlertOutput
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
HAVING
[Count] >= 3
INTO 子句告知流分析服务,要将数据写入到哪个输出。 第一个 SELECT 定义一个传递查询,该查询从输入接收数据,并将其发送到名为 ArchiveOutput 的输出。 第二个查询在将结果发送到名为 AlertOutput 的下游警报系统输出之前聚合和筛选数据。
WITH 子句可用于定义多个子查询块。 此选项的优点是开放输入源的读者较少。
查询:
WITH ReaderQuery AS (
SELECT
*
FROM
Input TIMESTAMP BY Time
)
SELECT * INTO ArchiveOutput FROM ReaderQuery
SELECT
Make,
System.TimeStamp() AS Time,
COUNT(*) AS [Count]
INTO AlertOutput
FROM ReaderQuery
GROUP BY
Make,
TumblingWindow(second, 10)
HAVING [Count] >= 3
有关详细信息,请参阅 WITH 子句。
简单的直通查询
简单的直通查询可用于将输入流数据复制到输出中。 例如,如果需要将包含实时车辆信息的数据流保存在 SQL 数据库中供以后分析,则简单的直通查询会执行该作业。
请考虑以下 输入:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
希望 输出 与输入相同:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
下面是 查询:
SELECT
*
INTO Output
FROM Input
此 SELECT * 查询将传入事件的所有字段映射到输出。 相反,只能投影 SELECT 语句中的必填字段。 在下面的示例中, SELECT 语句仅投影输入数据的 Make 和 Time 字段。
请考虑以下 输入:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |1000 |
| Make1 |2023-01-01T00:00:02.0000000Z |2000 |
| Make2 |2023-01-01T00:00:04.0000000Z |1500 |
希望 输出 只有“生成”和“时间”字段:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:04.0000000Z |
下面是仅投影所需字段的 查询:
SELECT
Make, Time
INTO Output
FROM Input
与 LIKE 和 NOT LIKE 匹配的字符串
LIKE 和 NOT LIKE 可用于验证字段是否与特定模式匹配。 例如,可以使用筛选器仅返回以字母 A 开头且以数字 9结尾的车牌。
请考虑以下 输入:
| Make | License_plate | Time |
| --- | --- | --- |
| Make1 |ABC-123 |2023-01-01T00:00:01.0000000Z |
| Make2 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make3 |ABC-369 |2023-01-01T00:00:03.0000000Z |
希望 输出 具有以字母 A 开头且以数字 9结尾的车牌:
| Make | License_plate | Time |
| --- | --- | --- |
| Make2 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make3 |ABC-369 |2023-01-01T00:00:03.0000000Z |
下面是使用 LIKE 运算符的 查询 :
SELECT
*
FROM
Input TIMESTAMP BY Time
WHERE
License_plate LIKE 'A%9'
使用 LIKE 语句检查 License_plate 字段值。 它应以字母 A开头,然后具有零个或多个字符的任何字符串,以数字 9 结尾。
过去事件的计算
LAG 函数可用于查看时间范围内过去的事件,并将其与当前事件进行比较。 例如,如果当前汽车的品牌与通过收费亭的最后一辆车的品牌不同,则可以输出当前汽车的品牌。
示例 输入:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
示例 输出:
| Make | Time |
| --- | --- |
| Make2 |2023-01-01T00:00:02.0000000Z |
示例 查询:
SELECT
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(minute, 1)) <> Make
使用 LAG 查看输入流,返回一个事件,检索 Make 值并将其与当前事件的 Make 值进行比较,并输出事件。
有关详细信息,请参阅 LAG。
返回窗口中的最后一个事件
由于事件被系统实时处理,因此没有函数可以确定事件是否是该时间窗口内到达的最后一个事件。 为此,需要将输入数据流与另一个数据流联接,其中事件时间代表该时间窗口中所有事件的最大时间。
示例 输入:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| RMV 8282 |Make1 |2023-07-27T00:05:01.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
示例 输出 ,其中包含两个十分钟时间窗口中最后一辆车的信息:
| License_plate | Make | Time |
| --- | --- | --- |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
示例 查询:
WITH LastInWindow AS
(
SELECT
MAX(Time) AS LastEventTime
FROM
Input TIMESTAMP BY Time
GROUP BY
TumblingWindow(minute, 10)
)
SELECT
Input.License_plate,
Input.Make,
Input.Time
FROM
Input TIMESTAMP BY Time
INNER JOIN LastInWindow
ON DATEDIFF(minute, Input, LastInWindow) BETWEEN 0 AND 10
AND Input.Time = LastInWindow.LastEventTime
查询的第一步在 10 分钟窗口中查找最大时间戳,即该窗口最后一个事件的时间戳。 第二步将第一个查询的结果与原始流联接,以查找与每个窗口中最后一个时间戳匹配的事件。
DATEDIFF 是一个特定于日期的函数,用于比较并返回两个 DateTime 字段之间的时间差,有关详细信息,请参阅 日期函数。
有关合并流的详细信息,请参阅 JOIN。
随时间推移的数据聚合
若要在时间范围内计算信息,可以聚合数据。 在此示例中,该语句计算每种特定汽车品牌在过去 10 秒内的计数。
示例 输入:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |1000 |
| Make1 |2023-01-01T00:00:02.0000000Z |2000 |
| Make2 |2023-01-01T00:00:04.0000000Z |1500 |
示例 输出:
| Make | Count |
| --- | --- |
| Make1 | 2 |
| Make2 | 1 |
查询:
SELECT
Make,
COUNT(*) AS Count
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
此聚合按 Make 对汽车进行分组,每 10 秒计算一次。 输出包含通过收费站的汽车的品牌
TumblingWindow 是一个窗口函数,用于将事件组合在一起。 可对所有分组事件应用聚合。 有关详细信息,请参阅 开窗函数。
有关聚合的详细信息,请参阅 聚合函数。
定期输出值
当事件缺失或不规则时,可以从更稀疏的数据输入生成常规间隔输出。 例如,每隔 5 秒生成一个报告最近看到的数据点的事件。
示例 输入:
| Time | Value |
| --- | --- |
| "2014-01-01T06:01:00" |1 |
| "2014-01-01T06:01:05" |2 |
| "2014-01-01T06:01:10" |3 |
| "2014-01-01T06:01:15" |4 |
| "2014-01-01T06:01:30" |5 |
| "2014-01-01T06:01:35" |6 |
示例 输出(前 10 行):
| Window_end | Last_event.Time | Last_event.Value |
| --- | --- | --- |
| 2014-01-01T14:01:00.000Z |2014-01-01T14:01:00.000Z |1 |
| 2014-01-01T14:01:05.000Z |2014-01-01T14:01:05.000Z |2 |
| 2014-01-01T14:01:10.000Z |2014-01-01T14:01:10.000Z |3 |
| 2014-01-01T14:01:15.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:20.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:25.000Z |2014-01-01T14:01:15.000Z |4 |
| 2014-01-01T14:01:30.000Z |2014-01-01T14:01:30.000Z |5 |
| 2014-01-01T14:01:35.000Z |2014-01-01T14:01:35.000Z |6 |
| 2014-01-01T14:01:40.000Z |2014-01-01T14:01:35.000Z |6 |
| 2014-01-01T14:01:45.000Z |2014-01-01T14:01:35.000Z |6 |
示例 查询:
SELECT
System.Timestamp() AS Window_end,
TopOne() OVER (ORDER BY Time DESC) AS Last_event
FROM
Input TIMESTAMP BY Time
GROUP BY
HOPPINGWINDOW(second, 300, 5)
此查询每隔 5 秒生成一次事件,并输出之前收到的最后一个事件。 HOPPINGWINDOW 持续时间决定查询回溯以查找最新事件的时间范围。
有关详细信息,请参阅 跃进窗口。
关联流中的事件
通过使用 LAG 函数查看过去的事件,可以在同一流中关联事件。 例如,在过去 90 秒内,每当同一 Make 的两辆车连续通过收费站时,都可以生成一个输出。
示例 输入:
| Make | License_plate | Time |
| --- | --- | --- |
| Make1 |ABC-123 |2023-01-01T00:00:01.0000000Z |
| Make1 |AAA-999 |2023-01-01T00:00:02.0000000Z |
| Make2 |DEF-987 |2023-01-01T00:00:03.0000000Z |
| Make1 |GHI-345 |2023-01-01T00:00:04.0000000Z |
示例 输出:
| Make | Time | Current_car_license_plate | First_car_license_plate | First_car_time |
| --- | --- | --- | --- | --- |
| Make1 |2023-01-01T00:00:02.0000000Z |AAA-999 |ABC-123 |2023-01-01T00:00:01.0000000Z |
示例 查询:
SELECT
Make,
Time,
License_plate AS Current_car_license_plate,
LAG(License_plate, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_license_plate,
LAG(Time, 1) OVER (LIMIT DURATION(second, 90)) AS First_car_time
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(second, 90)) = Make
LAG 函数可以查看输入流,返回一个事件并检索 Make 值,并将其与当前事件的 Make 值进行比较。 满足条件后,可以使用 SELECT 语句中的 LAG 投影上一个事件中的数据。
有关详细信息,请参阅 LAG。
检测事件之间的持续时间
一旦收到“结束”事件,可以通过查看最后一个“开始”事件来计算事件的持续时间。 此查询可用于确定用户在页面或功能上花费的时间。
示例 输入:
| User | Feature | Event | Time |
| --- | --- | --- | --- |
| user@location.com |RightMenu |Start |2023-01-01T00:00:01.0000000Z |
| user@location.com |RightMenu |End |2023-01-01T00:00:08.0000000Z |
示例 输出:
| User | Feature | Duration |
| --- | --- | --- |
| user@location.com |RightMenu |7 |
示例 查询:
SELECT
[user],
feature,
DATEDIFF(
second,
LAST(Time) OVER (PARTITION BY [user], feature LIMIT DURATION(hour, 1) WHEN Event = 'start'),
Time) as duration
FROM input TIMESTAMP BY Time
WHERE
Event = 'end'
LAST 函数可用于检索特定条件中的最后一个事件。 在此示例中,条件是 Start 类型的事件,按 PARTITION BY 用户和功能对搜索进行分区。 这样,在搜索“开始”事件时,每个用户和特性都会被独立对待。 LIMIT DURATION 将回溯搜索时间限制为在结束事件和开始事件之间的 1 小时内。
计算唯一值
COUNT 和 DISTINCT 可用于计算在时间范围内在流中显示的唯一字段值的数量。 可以创建一个查询,以计算在 2 秒时段内通过收费站的唯一 汽车数量 。
示例 输入:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make1 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
示例 输出:
| Count_make | Time |
| --- | --- |
| 2 |2023-01-01T00:00:02.000Z |
| 1 |2023-01-01T00:00:04.000Z |
示例 查询:
SELECT
COUNT(DISTINCT Make) AS Count_make,
System.TIMESTAMP() AS Time
FROM Input TIMESTAMP BY TIME
GROUP BY
TumblingWindow(second, 2)
COUNT(DISTINCT Make) 返回指定时间范围内 Make 列中不同值的计数。 有关详细信息,请参阅 COUNT 聚合函数。
检索窗口中的第一个事件
IsFirst 可用于检索时间窗口中的第一个事件。 例如,每隔 10 分钟输出一次首辆汽车的信息。
示例 输入:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| RMV 8282 |Make1 |2023-07-27T00:05:01.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| VFE 1616 |Make2 |2023-07-27T00:09:31.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
示例 输出:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
示例 查询:
SELECT
License_plate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) = 1
IsFirst 还可以对数据进行分区,并确定在每 10 分钟间隔内发现的每个特定汽车 Make 的第一个事件。
示例 输出:
| License_plate | Make | Time |
| --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:05.0000000Z |
| YZK 5704 |Make3 |2023-07-27T00:02:17.0000000Z |
| YHN 6970 |Make2 |2023-07-27T00:06:00.0000000Z |
| QYF 9358 |Make1 |2023-07-27T00:12:02.0000000Z |
| MDR 6128 |Make4 |2023-07-27T00:13:45.0000000Z |
示例 查询:
SELECT
License_plate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) OVER (PARTITION BY Make) = 1
有关详细信息,请参阅 IsFirst。
删除窗口中的重复事件
执行诸如计算给定时间范围内事件的平均值等作时,应筛选重复事件。 在以下示例中,第二个事件是第一个事件的副本。
示例 输入:
| DeviceId | Time | Attribute | Value |
| --- | --- | --- | --- |
| 1 |2018-07-27T00:00:01.0000000Z |Temperature |50 |
| 1 |2018-07-27T00:00:01.0000000Z |Temperature |50 |
| 2 |2018-07-27T00:00:01.0000000Z |Temperature |40 |
| 1 |2018-07-27T00:00:05.0000000Z |Temperature |60 |
| 2 |2018-07-27T00:00:05.0000000Z |Temperature |50 |
| 1 |2018-07-27T00:00:10.0000000Z |Temperature |100 |
示例 输出:
| AverageValue | DeviceId |
| --- | --- |
| 70 | 1 |
|45 | 2 |
示例 查询:
WITH Temp AS (
SELECT Value, DeviceId
FROM Input TIMESTAMP BY Time
GROUP BY Value, DeviceId, System.Timestamp()
)
SELECT
AVG(Value) AS AverageValue, DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)
执行第一个语句时,重复记录合并为一个,因为 GROUP BY 子句中的字段完全相同。 因此,它会删除重复项。
为不同的事例/值指定逻辑(CASE 语句)
CASE 语句可以根据特定条件为不同的字段提供不同的计算。 例如,将车道 A 分配给 Make1 的汽车,将车道 B 分配给其他任何制造商的汽车。
示例 输入:
| Make | Time |
| --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |
| Make2 |2023-01-01T00:00:02.0000000Z |
| Make2 |2023-01-01T00:00:03.0000000Z |
示例 输出:
| Make |Dispatch_to_lane | Time |
| --- | --- | --- |
| Make1 |"A" |2023-01-01T00:00:01.0000000Z |
| Make2 |"B" |2023-01-01T00:00:02.0000000Z |
示例 查询:
SELECT
Make
CASE
WHEN Make = "Make1" THEN "A"
ELSE "B"
END AS Dispatch_to_lane,
System.TimeStamp() AS Time
FROM
Input TIMESTAMP BY Time
CASE 表达式将表达式与一组简单表达式进行比较,以确定其结果。 在此示例中,Make1的车辆被派往车道A,而其他制造商的车辆将被分配到车道B。
有关详细信息,请参阅 事例表达式。
数据转换
可以使用 CAST 方法实时转换数据。 例如,可以将汽车重量从 nvarchar(max) 类型转换为 bigint 类型,并在数值计算中使用。
示例 输入:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |"1000" |
| Make1 |2023-01-01T00:00:02.0000000Z |"2000" |
示例 输出:
| Make | Weight |
| --- | --- |
| Make1 |3000 |
示例 查询:
SELECT
Make,
SUM(CAST(Weight AS BIGINT)) AS Weight
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
使用 CAST 语句指定其数据类型。 请参阅数据类型 (Azure 流分析)上支持的数据类型列表。
有关 数据转换函数的详细信息。
检测条件的持续时间
对于跨越多个事件的条件, LAG 函数可用于标识该条件的持续时间。 例如,假设一个 Bug 导致所有汽车的重量不正确(超过 20,000 磅),并且必须计算该 Bug 的持续时间。
示例 输入:
| Make | Time | Weight |
| --- | --- | --- |
| Make1 |2023-01-01T00:00:01.0000000Z |2000 |
| Make2 |2023-01-01T00:00:02.0000000Z |25000 |
| Make1 |2023-01-01T00:00:03.0000000Z |26000 |
| Make2 |2023-01-01T00:00:04.0000000Z |25000 |
| Make1 |2023-01-01T00:00:05.0000000Z |26000 |
| Make2 |2023-01-01T00:00:06.0000000Z |25000 |
| Make1 |2023-01-01T00:00:07.0000000Z |26000 |
| Make2 |2023-01-01T00:00:08.0000000Z |2000 |
示例 输出:
| Start_fault | End_fault |
| --- | --- |
| 2023-01-01T00:00:02.000Z |2023-01-01T00:00:07.000Z |
示例 查询:
WITH SelectPreviousEvent AS
(
SELECT
*,
LAG([time]) OVER (LIMIT DURATION(hour, 24)) as previous_time,
LAG([weight]) OVER (LIMIT DURATION(hour, 24)) as previous_weight
FROM input TIMESTAMP BY [time]
)
SELECT
LAG(time) OVER (LIMIT DURATION(hour, 24) WHEN previous_weight < 20000 ) [Start_fault],
previous_time [End_fault]
FROM SelectPreviousEvent
WHERE
[weight] < 20000
AND previous_weight > 20000
第一个 SELECT 语句将当前权重度量与上一个度量相关联,并将其投影到当前度量中。 第二个 SELECT 将回顾previous_weight小于 20000 的最后一个事件,其中当前权重小于 20000,当前事件的previous_weight大于 20000。
End_fault是当前非故障事件,而上一个事件是故障事件,Start_fault是之前的最后一个非故障事件。
使用独立时间处理事件(子流)
由于事件生成者之间的时钟偏差、分区之间的时钟偏差或网络延迟,事件可能会延迟或无序到达。 例如,TollID 2 的设备时钟比 TollID 1 落后 5 秒,TollID 3 的设备时钟比 TollID 1 落后 10 秒。 对于每个收费站,计算可以独立进行,只考虑其自己的时钟数据作为时间戳。
示例 输入:
| LicensePlate | Make | Time | TollID |
| --- | --- | --- | --- |
| DXE 5291 |Make1 |2023-07-27T00:00:01.0000000Z | 1 |
| YHN 6970 |Make2 |2023-07-27T00:00:05.0000000Z | 1 |
| QYF 9358 |Make1 |2023-07-27T00:00:01.0000000Z | 2 |
| GXF 9462 |Make3 |2023-07-27T00:00:04.0000000Z | 2 |
| VFE 1616 |Make2 |2023-07-27T00:00:10.0000000Z | 1 |
| RMV 8282 |Make1 |2023-07-27T00:00:03.0000000Z | 3 |
| MDR 6128 |Make3 |2023-07-27T00:00:11.0000000Z | 2 |
| YZK 5704 |Make4 |2023-07-27T00:00:07.0000000Z | 3 |
示例 输出:
| TollID | Count |
| --- | --- |
| 1 | 2 |
| 2 | 2 |
| 1 | 1 |
| 3 | 1 |
| 2 | 1 |
| 3 | 1 |
示例 查询:
SELECT
TollId,
COUNT(*) AS Count
FROM input
TIMESTAMP BY Time OVER TollId
GROUP BY TUMBLINGWINDOW(second, 5), TollId
TIMESTAMP OVER BY 子句通过子流独立查看每个设备的时间线。 每个 TollID 的输出事件在计算时生成,这意味着事件按每个 TollID 的顺序排列,而不是重新排序,就像所有设备都在同一时钟上一样。
有关详细信息,请参阅 TIMESTAMP BY OVER。
会话窗口
会话窗口是一个会随着事件发生而不断扩展的窗口。如果在特定时间内没有收到任何事件或者窗口达到了其最大持续时间,它将关闭以进行计算。 计算用户交互数据时,此窗口特别有用。 当用户开始与系统交互时,窗口将启动,当没有观察到更多事件时关闭,这意味着用户已停止交互。 例如,用户正在与记录点击次数的网页交互,会话窗口可用于了解用户与网站交互的时间。
示例 输入:
| User_id | Time | URL |
| --- | --- | --- |
| 0 | 2017-01-26T00:00:00.0000000Z | "www.example.com/a.html" |
| 0 | 2017-01-26T00:00:20.0000000Z | "www.example.com/b.html" |
| 1 | 2017-01-26T00:00:55.0000000Z | "www.example.com/c.html" |
| 0 | 2017-01-26T00:01:10.0000000Z | "www.example.com/d.html" |
| 1 | 2017-01-26T00:01:15.0000000Z | "www.example.com/e.html" |
示例 输出:
| User_id | StartTime | EndTime | Duration_in_seconds |
| --- | --- | --- | --- |
| 0 | 2017-01-26T00:00:00.0000000Z | 2017-01-26T00:01:10.0000000Z | 70 |
| 1 | 2017-01-26T00:00:55.0000000Z | 2017-01-26T00:01:15.0000000Z | 20 |
示例 查询:
SELECT
user_id,
MIN(time) as StartTime,
MAX(time) as EndTime,
DATEDIFF(second, MIN(time), MAX(time)) AS duration_in_seconds
FROM input TIMESTAMP BY time
GROUP BY
user_id,
SessionWindow(minute, 1, 60) OVER (PARTITION BY user_id)
SELECT 将投影与用户交互相关的数据,以及交互的持续时间。 按用户和 SessionWindow 对数据进行分组,该窗口将在 1 分钟内没有交互时关闭,最大窗口大小为 60 分钟。
有关 SessionWindow 的详细信息,请参阅 会话窗口 。
JavaScript 中的用户定义的函数
可以使用用 JavaScript 语言编写的自定义函数扩展 Azure 流分析查询语言。 用户定义的函数(UDF)是自定义/复杂的计算,无法使用 SQL 语言轻松表示。 这些 UDF 可以定义一次,并在查询中多次使用。 例如,UDF 可用于将十六进制 nvarchar(max) 值转换为 bigint 值。
示例 输入:
| Device_id | HexValue |
| --- | --- |
| 1 | "B4" |
| 2 | "11B" |
| 3 | "121" |
示例 输出:
| Device_id | Decimal |
| --- | --- |
| 1 | 180 |
| 2 | 283 |
| 3 | 289 |
function hex2Int(hexValue){
return parseInt(hexValue, 16);
}
public static class MyUdfClass {
public static long Hex2Int(string hexValue){
return int.Parse(hexValue, System.Globalization.NumberStyles.HexNumber);
}
}
SELECT
Device_id,
udf.Hex2Int(HexValue) AS Decimal
From
Input
User-Defined 函数根据使用的每个事件计算 HexValue 中的 bigint 值。
有关详细信息,请参阅 JavaScript。
使用 MATCH_RECOGNIZE 进行高级模式匹配
MATCH_RECOGNIZE 是一种高级模式匹配机制,可用于将事件序列与定义完善的正则表达式模式匹配。 例如,自动取款机在运行期间被实时监控故障,如果接连出现两条警告信息,则需要通知管理员。
输入:
| ATM_id | Operation_id | Return_Code | Time |
| --- | --- | --- | --- |
| 1 | "Entering Pin" | "Success" | 2017-01-26T00:10:00.0000000Z |
| 2 | "Opening Money Slot" | "Success" | 2017-01-26T00:10:07.0000000Z |
| 2 | "Closing Money Slot" | "Success" | 2017-01-26T00:10:11.0000000Z |
| 1 | "Entering Withdraw Quantity" | "Success" | 2017-01-26T00:10:08.0000000Z |
| 1 | "Opening Money Slot" | "Warning" | 2017-01-26T00:10:14.0000000Z |
| 1 | "Printing Bank Balance" | "Warning" | 2017-01-26T00:10:19.0000000Z |
输出:
| ATM_id | First_Warning_Operation_id | Warning_Time |
| --- | --- | --- |
| 1 | "Opening Money Slot" | 2017-01-26T00:10:14.0000000Z |
SELECT *
FROM input TIMESTAMP BY time OVER ATM_id
MATCH_RECOGNIZE (
LIMIT DURATION(minute, 1)
PARTITION BY ATM_id
MEASURES
First(Warning.ATM_id) AS ATM_id,
First(Warning.Operation_Id) AS First_Warning_Operation_id,
First(Warning.Time) AS Warning_Time
AFTER MATCH SKIP TO NEXT ROW
PATTERN (Success+ Warning{2,})
DEFINE
Success AS Success.Return_Code = 'Success',
Warning AS Warning.Return_Code <> 'Success'
) AS patternMatch
此查询至少匹配两个连续失败事件,并在满足条件时生成警报。 PATTERN 定义了用于匹配的正则表达式,在本例中,该表达式用于在至少一次成功操作后至少出现两个连续警告的情况。 成功和警告是通过Return_Code值来定义的,一旦条件满足,MEASURES将与ATM_id一同进行投影,还有第一个警告操作和第一个警告时间。
有关详细信息,请参阅 MATCH_RECOGNIZE。
地理围栏和地理空间查询
Azure 流分析提供内置的地理空间功能,可用于实现车队管理、骑行共享、联网汽车和资产跟踪等方案。 地理空间数据可以以 GeoJSON 或 WKT 格式引入,作为事件流或参考数据的一部分。 例如,一家专门从事打印护照的制造机器的公司,将其机器租赁给政府和领事馆。 这些机器的位置受到严格控制,以避免错位和可能用于伪造护照。 每台计算机都装有 GPS 跟踪器,该信息将中继回 Azure 流分析作业。 制造者希望跟踪这些机器的位置,并在其中一台机器离开授权区域时发出警报,这样他们就可以远程禁用、提醒当局并检索设备。
输入:
| Equipment_id | Equipment_current_location | Time |
| --- | --- | --- |
| 1 | "POINT(-122.13288797982818 47.64082002051315)" | 2017-01-26T00:10:00.0000000Z |
| 1 | "POINT(-122.13307252987875 47.64081350934929)" | 2017-01-26T00:11:00.0000000Z |
| 1 | "POINT(-122.13308862313283 47.6406508603241)" | 2017-01-26T00:12:00.0000000Z |
| 1 | "POINT(-122.13341048821462 47.64043760861279)" | 2017-01-26T00:13:00.0000000Z |
引用数据输入:
| Equipment_id | Equipment_lease_location |
| --- | --- |
| 1 | "POLYGON((-122.13326028450979 47.6409833866794,-122.13261655434621 47.6409833866794,-122.13261655434621 47.64061471602751,-122.13326028450979 47.64061471602751,-122.13326028450979 47.6409833866794))" |
输出:
| Equipment_id | Equipment_alert_location | Time |
| --- | --- | --- |
| 1 | "POINT(-122.13341048821462 47.64043760861279)" | 2017-01-26T00:13:00.0000000Z |
SELECT
input.Equipment_id AS Equipment_id,
input.Equipment_current_location AS Equipment_current_location,
input.Time AS Time
FROM input TIMESTAMP BY time
JOIN
referenceInput
ON input.Equipment_id = referenceInput.Equipment_id
WHERE
ST_WITHIN(input.Equipment_current_location, referenceInput.Equipment_lease_location) = 1
该查询使制造商能够自动监视计算机位置,在计算机离开允许的地理围栏时收到警报。 内置的地理空间函数允许用户在没有第三方库的情况下在查询中使用 GPS 数据。
有关详细信息,请参阅Azure 流分析中的地理围栏和地理空间聚合方案文章。
获取帮助
如需获取进一步的帮助,可前往 Azure 流分析的 Microsoft 问答页面。