在 Azure 流分析中分析 JSON 和 Avro 数据

Azure 流分析支持处理采用 CSV、JSON 和 Avro 数据格式的事件。 JSON 和 Avro 数据都可以结构化,并包含一些复杂类型,例如嵌套对象(记录)和数组。

记录数据类型

如果在输入数据流中使用相应的格式,记录数据类型将用于表示 JSON 和 Avro 数组。 这些示例演示示例传感器,该传感器读取 JSON 格式的输入事件。 下面是单一事件的示例:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

访问已知架构中的嵌套字段

使用点表示法 (.) 可以轻松地直接从查询访问嵌套字段。 例如,此查询选择上述 JSON 数据中 Location 属性下的纬度和经度坐标。 点表示法可用于浏览多个级别,如下所示。

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

结果为:

DeviceID Lat Long 温度 版本
12345 47 122 80 1.2.45

选择所有属性

可以使用“*”通配符选择嵌套记录的所有属性。 请考虑以下示例:

SELECT
    DeviceID,
    Location.*
FROM input

结果为:

DeviceID Lat Long
12345 47 122

当属性名称是变量时访问嵌套字段

如果属性名称是变量,请使用 GetRecordPropertyValue 函数。 这样可以构建动态查询,无需对属性名称进行硬编码。

例如,假设示例数据流需要与包含每个设备传感器阈值的参考数据相联接: 下面显示了此类参考数据的代码片段。

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

此处的目标是将本文顶部的示例数据集联接到该参考数据,并针对高于其阈值的每个传感器度量值输出一个事件。 这意味着,如果有多个传感器超出其各自的阈值,借助于联接,上述单个事件可以生成多个输出事件。 若要在不使用联接的情况下实现类似结果,请参阅下面的部分。

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue 选择 SensorReadings 中名称与来自参考数据的属性名称匹配的属性。 然后从 SensorReadings 中提取关联的值。

结果为:

DeviceID SensorName AlertMessage
12345 湿度 Alert :Sensor above threshold

将记录字段转换为单独的事件

若要将记录字段转换为单独事件,请结合使用 APPLY 运算符和 GetRecordProperties 函数。

使用原始示例数据时,可以使用以下查询将属性提取到不同的事件中。

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

结果为:

DeviceID SensorName AlertMessage
12345 温度 80
12345 湿度 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

然后,可以使用 WITH 将这些事件路由到不同的目标:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

分析 SQL 参考数据中的 JSON 记录

在作业中使用 Azure SQL 数据库作为参考数据时,可能会有一个列包含 JSON 格式的数据。 下面显示了一个示例。

DeviceID 数据
12345 {"key": "value1"}
54321 {"key": "value2"}

可以通过编写简单的 JavaScript 用户定义函数来分析 Data 列中的 JSON 记录。

function parseJson(string) {
return JSON.parse(string);
}

然后,可以如下所示在流分析查询中创建一个步骤,以访问 JSON 记录的字段。

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

数组数据类型

数组数据类型是按顺序排列的值集合。 下面详细介绍一些针对数组值执行的典型操作。 这些事例使用函数 GetArrayElementGetArrayElementsGetArrayLengthAPPLY 运算符。

下面是事件的示例。 CustomSensor03SensorMetadata 都是数组类型的:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

处理特定的数组元素

选择指定索引中的数组元素(选择第一个数组元素):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

结果为:

firstElement
12

选择数组长度

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

结果为:

arrayLength
3

将数组元素转换为单独的事件

选择所有数组元素作为各个事件。 结合使用 APPLY 运算符和 GetArrayElements 内置函数,提取所有数组元素作为各个事件:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

结果为:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

结果为:

DeviceId smKey smValue
12345 制造商 ABC
12345 版本 1.2.45

如果提取的字段需要显示在列中,则除了 JOIN 操作外,还可以使用 WITH 语法来透视数据集。 该联接需要一个时间边界条件来防止重复:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

结果为:

DeviceId Lat Long smVersion smManufacturer
12345 47 122 1.2.45 ABC

另请参阅

Azure 流分析中的数据类型