在 Azure 流分析中分析 JSON 和 Avro 数据Parse JSON and Avro data in Azure Stream Analytics

Azure 流分析支持处理采用 CSV 和 JSON 格式的事件。Azure Stream Analytics support processing events in CSV and JSON formats. JSON 和 Avro 数据都可以结构化,并包含一些复杂类型,例如嵌套对象(记录)和数组。Both JSON and Avro data can be structured and contain some complex types such as nested objects (records) and arrays.

记录数据类型Record data types

如果在输入数据流中使用相应的格式,记录数据类型将用于表示 JSON 数组。Record data types are used to represent JSON arrays when corresponding formats are used in the input data streams. 这些示例演示示例传感器,该传感器读取 JSON 格式的输入事件。These examples demonstrate a sample sensor, which is reading input events in JSON format. 下面是单一事件的示例:Here is example of a single event:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

访问已知架构中的嵌套字段Access nested fields in known schema

使用点表示法 (.) 可以轻松地直接从查询访问嵌套字段。Use dot notation (.) to easily access nested fields directly from your query. 例如,此查询选择上述 JSON 数据中 Location 属性下的纬度和经度坐标。For example, this query selects the Latitude and Longitude coordinates under the Location property in the preceding JSON data. 点表示法可用于浏览多个级别,如下所示。The dot notation can be used to navigate multiple levels as shown below.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

结果为:The result is:

DeviceIDDeviceID LatLat LongLong 温度Temperature 版本Version
1234512345 4747 122122 8080 1.2.451.2.45

选择所有属性Select all properties

可以使用“*”通配符选择嵌套记录的所有属性。You can select all the properties of a nested record using '*' wildcard. 请考虑以下示例:Consider the following example:

SELECT
    DeviceID,
    Location.*
FROM input

结果为:The result is:

DeviceIDDeviceID LatLat LongLong
1234512345 4747 122122

当属性名称是变量时访问嵌套字段Access nested fields when property name is a variable

如果属性名称是变量,请使用 GetRecordPropertyValue 函数。Use the GetRecordPropertyValue function if the property name is a variable. 这样可以构建动态查询,无需对属性名称进行硬编码。This allows for building dynamic queries without hardcoding property names.

例如,假设示例数据流需要与包含每个设备传感器阈值的参考数据相联接For example, imagine the sample data stream needs to be joined with reference data containing thresholds for each device sensor. 下面显示了此类参考数据的代码片段。A snippet of such reference data is shown below.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

此处的目标是将本文顶部的示例数据集联接到该参考数据,并针对高于其阈值的每个传感器度量值输出一个事件。The goal here is to join our sample dataset from the top of the article to that reference data, and output one event for each sensor measure above its threshold. 这意味着,如果有多个传感器超出其各自的阈值,借助于联接,上述单个事件可以生成多个输出事件。That means our single event above can generate multiple output events if multiple sensors are above their respective thresholds, thanks to the join. 若要在不使用联接的情况下实现类似结果,请参阅下面的部分。To achieve similar results without a join, see the section below.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert : Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue 选择 SensorReadings 中名称与来自参考数据的属性名称匹配的属性。GetRecordPropertyValue selects the property in SensorReadings, which name matches the property name coming from the reference data. 然后从 SensorReadings 中提取关联的值。Then the associated value from SensorReadings is extracted.

结果为:The result is:

DeviceIDDeviceID SensorNameSensorName AlertMessageAlertMessage
1234512345 湿度Humidity Alert :Sensor above thresholdAlert : Sensor above threshold

将记录字段转换为单独的事件Convert record fields into separate events

若要将记录字段转换为单独事件,请结合使用 APPLY 运算符和 GetRecordProperties 函数。To convert record fields into separate events, use the APPLY operator together with the GetRecordProperties function.

使用原始示例数据时,可以使用以下查询将属性提取到不同的事件中。With the original sample data, the following query could be used to extract properties into different events.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

结果为:The result is:

DeviceIDDeviceID SensorNameSensorName AlertMessageAlertMessage
1234512345 温度Temperature 8080
1234512345 湿度Humidity 7070
1234512345 CustomSensor01CustomSensor01 55
1234512345 CustomSensor02CustomSensor02 9999
1234512345 SensorMetadataSensorMetadata [object Object][object Object]

然后,可以使用 WITH 将这些事件路由到不同的目标:Using WITH, it's then possible to route those events to different destinations:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

分析 SQL 参考数据中的 JSON 记录Parse JSON record in SQL reference data

在作业中使用 Azure SQL 数据库作为参考数据时,可能会有一个列包含 JSON 格式的数据。When using Azure SQL Database as reference data in your job, it's possible to have a column that has data in JSON format. 下面显示了一个示例。An example is shown below.

DeviceIDDeviceID 数据Data
1234512345 {"key" : "value1"}{"key" : "value1"}
5432154321 {"key" : "value2"}{"key" : "value2"}

可以通过编写简单的 JavaScript 用户定义函数来分析 Data 列中的 JSON 记录。You can parse the JSON record in the Data column by writing a simple JavaScript user-defined function.

function parseJson(string) {
return JSON.parse(string);
}

然后,可以如下所示在流分析查询中创建一个步骤,以访问 JSON 记录的字段。You can then create a step in your Stream Analytics query as shown below to access the fields of your JSON records.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

数组数据类型Array data types

数组数据类型是按顺序排列的值集合。Array data types are an ordered collection of values. 下面详细介绍一些针对数组值执行的典型操作。Some typical operations on array values are detailed below. 这些事例使用函数 GetArrayElementGetArrayElementsGetArrayLengthAPPLY 运算符。These examples use the functions GetArrayElement, GetArrayElements, GetArrayLength, and the APPLY operator.

下面是单一事件的示例。Here is an example of a single event. CustomSensor03SensorMetadata 都是数组类型的:Both CustomSensor03 and SensorMetadata are of type array:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

处理特定的数组元素Working with a specific array element

选择指定索引中的数组元素(选择第一个数组元素):Select array element at a specified index (selecting the first array element):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

结果为:The result is:

firstElementfirstElement
1212

选择数组长度Select array length

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

结果为:The result is:

arrayLengtharrayLength
33

将数组元素转换为单独的事件Convert array elements into separate events

选择所有数组元素作为各个事件。Select all array element as individual events. 结合使用 APPLY 运算符和 GetArrayElements 内置函数,提取所有数组元素作为各个事件:The APPLY operator together with the GetArrayElements built-in function extracts all array elements as individual events:

SELECT
    DeviceId,
    CustomSensor03Record.ArrayIndex,
    CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

结果为:The result is:

DeviceIdDeviceId ArrayIndexArrayIndex ArrayValueArrayValue
1234512345 00 1212
1234512345 11 -5-5
1234512345 22 00
SELECT   
    i.DeviceId, 
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

结果为:The result is:

DeviceIdDeviceId smKeysmKey smValuesmValue
1234512345 制造商Manufacturer ABCABC
1234512345 版本Version 1.2.451.2.45

如果提取的字段需要显示在列中,则除了 JOIN 操作外,还可以使用 WITH 语法来透视数据集。If the extracted fields need to appear in columns, it is possible to pivot the dataset using the WITH syntax in addition to the JOIN operation. 该联接需要一个时间边界条件来防止重复:That join will require a time boundary condition that prevents duplication:

WITH DynamicCTE AS (
    SELECT   
        i.DeviceId,
        SensorMetadataRecords.ArrayValue.smKey as smKey,
        SensorMetadataRecords.ArrayValue.smValue as smValue
    FROM input i
    CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
    i.DeviceId,
    i.Location.*,
    V.smValue AS 'smVersion',
    M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

结果为:The result is:

DeviceIdDeviceId LatLat LongLong smVersionsmVersion smManufacturersmManufacturer
1234512345 4747 122122 1.2.451.2.45 ABCABC

另请参阅See Also

Azure 流分析中的数据类型Data Types in Azure Stream Analytics