Azure 流分析服务支持处理采用 CSV、JSON 和 Avro 数据格式的事件。 JSON 和 Avro 数据都可以结构化,并包含一些复杂类型,例如嵌套对象(记录)和数组。
记录数据类型
如果在输入数据流中使用相应的格式,记录数据类型将用于表示 JSON 和 Avro 数组。 这些示例演示示例传感器,该传感器读取 JSON 格式的输入事件。 下面是单一事件的示例:
{
"DeviceId" : "12345",
"Location" :
{
"Lat": 47,
"Long": 122
},
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"SensorMetadata" :
{
"Manufacturer":"ABC",
"Version":"1.2.45"
}
}
}
访问已知架构中的嵌套字段
使用点表示法(.)直接从查询访问嵌套字段。 例如,此查询选择上述 JSON 数据中 Location 属性下的纬度和经度坐标。 使用点表示法导航多个级别,如以下代码片段所示:
SELECT
DeviceID,
Location.Lat,
Location.Long,
SensorReadings.Temperature,
SensorReadings.SensorMetadata.Version
FROM input
结果为:
|DeviceID|Lat|Long|Temperature|Version|
|-|-|-|-|-|
|12345|47|122|80|1.2.45|
选择所有属性
可以使用通配符选择嵌套记录 * 的所有属性。 请看下面的示例:
SELECT
DeviceID,
Location.*
FROM input
结果为:
|DeviceID|Lat|Long|
|-|-|-|
|12345|47|122|
当属性名称是变量时访问嵌套字段
如果属性名称是变量,请使用 GetRecordPropertyValue 函数。 此函数可帮助你生成动态查询,而无需硬编码属性名称。
例如,假设示例数据流需要与包含每个设备传感器阈值的参考数据相联接: 以下代码片段中显示了此类参考数据的代码片段。
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 85
},
{
"DeviceId" : "12345",
"SensorName" : "Humidity",
"Value" : 65
}
目标是将文章顶部的示例数据集联接到该引用数据,并为每个传感器度量值输出一个高于其阈值的事件。 此联接意味着,如果多个传感器超出各自的阈值,则单个事件可以生成多个输出事件。 若要在不使用联接的情况下获得类似结果,请参阅以下示例:
SELECT
input.DeviceID,
thresholds.SensorName,
"Alert: Sensor above threshold" AS AlertMessage
FROM input -- stream input
JOIN thresholds -- reference data input
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
GetRecordPropertyValue 在 SensorReadings 中选择与来自引用数据的属性名称匹配的属性。 然后,它会从 SensorReadings 中提取关联的值。
结果为:
|DeviceID|SensorName|AlertMessage|
| - | - | - |
| 12345 | Humidity | Alert: Sensor above threshold |
将记录字段转换为单独的事件
若要将记录字段转换为单独事件,请结合使用 APPLY 运算符和 GetRecordProperties 函数。
通过使用原始示例数据,可以使用以下查询将属性提取到不同的事件中:
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
结果为:
|DeviceID|SensorName|AlertMessage|
|-|-|-|
|12345|Temperature|80|
|12345|Humidity|70|
|12345|CustomSensor01|5|
|12345|CustomSensor02|99|
|12345|SensorMetadata|[object Object]|
通过使用 WITH,可以将这些事件路由到不同的目标:
WITH Stage0 AS
(
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)
SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'
分析 SQL 参考数据中的 JSON 记录
在作业中使用 Azure SQL Database 用作引用数据时,可以包含一个包含 JSON 格式数据的列。 以下示例显示了以下格式:
|DeviceID|Data|
|-|-|
|12345|{"key": "value1"}|
|54321|{"key": "value2"}|
可以通过编写简单的 JavaScript 用户定义函数来分析 Data 列中的 JSON 记录。
function parseJson(string) {
return JSON.parse(string);
}
若要访问 JSON 记录的字段,请在流分析查询中创建一个步骤,如以下示例所示。
WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)
SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson
ON streamInput.DeviceID = parseJson.DeviceID
数组数据类型
数组数据类型是按顺序排列的值集合。 本部分详细介绍了数组值上的一些典型操作。 这些事例使用函数 GetArrayElement、GetArrayElements、GetArrayLength 和 APPLY 运算符。
下面是事件的示例。
CustomSensor03 和 SensorMetadata 都是数组类型的:
{
"DeviceId" : "12345",
"SensorReadings" :
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor02" : 99,
"CustomSensor03": [12,-5,0]
},
"SensorMetadata":[
{
"smKey":"Manufacturer",
"smValue":"ABC"
},
{
"smKey":"Version",
"smValue":"1.2.45"
}
]
}
使用特定数组元素
选择指定索引处的数组元素(选择第一个数组元素):
SELECT
GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input
结果为:
|firstElement|
|-|
|12|
选择数组长度
SELECT
GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input
结果为:
|arrayLength|
|-|
|3|
将数组元素转换为单独的事件
选择所有数组元素作为单个事件。 结合使用 APPLY 运算符和 GetArrayElements 内置函数,提取所有数组元素作为各个事件:
SELECT
DeviceId,
CustomSensor03Record.ArrayIndex,
CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record
结果为:
|DeviceId|ArrayIndex|ArrayValue|
|-|-|-|
|12345|0|12|
|12345|1|-5|
|12345|2|0|
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
结果为:
|DeviceId|smKey|smValue|
|-|-|-|
|12345|Manufacturer|ABC|
|12345|Version|1.2.45|
若要在列中显示提取的字段,请使用 WITH 语法和 JOIN 作透视数据集。 此联接需要一个 时间边界 条件以防止重复:
WITH DynamicCTE AS (
SELECT
i.DeviceId,
SensorMetadataRecords.ArrayValue.smKey as smKey,
SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords
)
SELECT
i.DeviceId,
i.Location.*,
V.smValue AS 'smVersion',
M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0
结果为:
|DeviceId|Lat|Long|smVersion|smManufacturer|
|-|-|-|-|-|
|12345|47|122|1.2.45|ABC|