在 Azure 流分析中分析 JSON 和 Avro 数据

Azure 流分析服务支持处理采用 CSV、JSON 和 Avro 数据格式的事件。 JSON 和 Avro 数据都可以结构化,并包含一些复杂类型,例如嵌套对象(记录)和数组。

记录数据类型

如果在输入数据流中使用相应的格式,记录数据类型将用于表示 JSON 和 Avro 数组。 这些示例演示示例传感器,该传感器读取 JSON 格式的输入事件。 下面是单一事件的示例:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

访问已知架构中的嵌套字段

使用点表示法(.)直接从查询访问嵌套字段。 例如,此查询选择上述 JSON 数据中 Location 属性下的纬度和经度坐标。 使用点表示法导航多个级别,如以下代码片段所示:

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

结果为:

|DeviceID|Lat|Long|Temperature|Version|
|-|-|-|-|-|
|12345|47|122|80|1.2.45|

选择所有属性

可以使用通配符选择嵌套记录 * 的所有属性。 请看下面的示例:

SELECT
    DeviceID,
    Location.*
FROM input

结果为:

|DeviceID|Lat|Long|
|-|-|-|
|12345|47|122|

当属性名称是变量时访问嵌套字段

如果属性名称是变量,请使用 GetRecordPropertyValue 函数。 此函数可帮助你生成动态查询,而无需硬编码属性名称。

例如,假设示例数据流需要与包含每个设备传感器阈值的参考数据相联接: 以下代码片段中显示了此类参考数据的代码片段。

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

目标是将文章顶部的示例数据集联接到该引用数据,并为每个传感器度量值输出一个高于其阈值的事件。 此联接意味着,如果多个传感器超出各自的阈值,则单个事件可以生成多个输出事件。 若要在不使用联接的情况下获得类似结果,请参阅以下示例:

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValueSensorReadings 中选择与来自引用数据的属性名称匹配的属性。 然后,它会从 SensorReadings 中提取关联的值。

结果为:

|DeviceID|SensorName|AlertMessage|
| - | - | - |
| 12345 | Humidity | Alert: Sensor above threshold |

将记录字段转换为单独的事件

若要将记录字段转换为单独事件,请结合使用 APPLY 运算符和 GetRecordProperties 函数。

通过使用原始示例数据,可以使用以下查询将属性提取到不同的事件中:

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

结果为:

|DeviceID|SensorName|AlertMessage|
|-|-|-|
|12345|Temperature|80|
|12345|Humidity|70|
|12345|CustomSensor01|5|
|12345|CustomSensor02|99|
|12345|SensorMetadata|[object Object]|

通过使用 WITH,可以将这些事件路由到不同的目标:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

分析 SQL 参考数据中的 JSON 记录

在作业中使用 Azure SQL Database 用作引用数据时,可以包含一个包含 JSON 格式数据的列。 以下示例显示了以下格式:

|DeviceID|Data|
|-|-|
|12345|{"key": "value1"}|
|54321|{"key": "value2"}|

可以通过编写简单的 JavaScript 用户定义函数来分析 Data 列中的 JSON 记录。

function parseJson(string) {
return JSON.parse(string);
}

若要访问 JSON 记录的字段,请在流分析查询中创建一个步骤,如以下示例所示。

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

数组数据类型

数组数据类型是按顺序排列的值集合。 本部分详细介绍了数组值上的一些典型操作。 这些事例使用函数 GetArrayElementGetArrayElementsGetArrayLengthAPPLY 运算符。

下面是事件的示例。 CustomSensor03SensorMetadata 都是数组类型的:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

使用特定数组元素

选择指定索引处的数组元素(选择第一个数组元素):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

结果为:

|firstElement|
|-|
|12|

选择数组长度

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

结果为:

|arrayLength|
|-|
|3|

将数组元素转换为单独的事件

选择所有数组元素作为单个事件。 结合使用 APPLY 运算符和 GetArrayElements 内置函数,提取所有数组元素作为各个事件:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

结果为:

|DeviceId|ArrayIndex|ArrayValue|
|-|-|-|
|12345|0|12|
|12345|1|-5|
|12345|2|0|
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

结果为:

|DeviceId|smKey|smValue|
|-|-|-|
|12345|Manufacturer|ABC|
|12345|Version|1.2.45|

若要在列中显示提取的字段,请使用 WITH 语法和 JOIN 作透视数据集。 此联接需要一个 时间边界 条件以防止重复:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

结果为:

|DeviceId|Lat|Long|smVersion|smManufacturer|
|-|-|-|-|-|
|12345|47|122|1.2.45|ABC|

Azure 流分析中的数据类型