Parse JSON and Avro data in Azure Stream Analytics

Azure Stream Analytics support processing events in CSV, JSON, and Avro data formats. Both JSON and Avro data can be structured and contain some complex types such as nested objects (records) and arrays.

Record data types

Record data types are used to represent JSON and Avro arrays when corresponding formats are used in the input data streams. These examples demonstrate a sample sensor, which is reading input events in JSON format. Here's example of a single event:

{
    "DeviceId" : "12345",
    "Location" :
    {
        "Lat": 47,
        "Long": 122
    },
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "SensorMetadata" : 
        {
        "Manufacturer":"ABC",
        "Version":"1.2.45"
        }
    }
}

Access nested fields in known schema

Use dot notation (.) to easily access nested fields directly from your query. For example, this query selects the Latitude and Longitude coordinates under the Location property in the preceding JSON data. The dot notation can be used to navigate multiple levels as shown below.

SELECT
    DeviceID,
    Location.Lat,
    Location.Long,
    SensorReadings.Temperature,
    SensorReadings.SensorMetadata.Version
FROM input

The result is:

DeviceID Lat Long Temperature Version
12345 47 122 80 1.2.45

Select all properties

You can select all the properties of a nested record using '*' wildcard. Consider the following example:

SELECT
    DeviceID,
    Location.*
FROM input

The result is:

DeviceID Lat Long
12345 47 122

Access nested fields when property name is a variable

Use the GetRecordPropertyValue function if the property name is a variable. This allows for building dynamic queries without hardcoding property names.

For example, imagine the sample data stream needs to be joined with reference data containing thresholds for each device sensor. A snippet of such reference data is shown below.

{
    "DeviceId" : "12345",
    "SensorName" : "Temperature",
    "Value" : 85
},
{
    "DeviceId" : "12345",
    "SensorName" : "Humidity",
    "Value" : 65
}

The goal here is to join our sample dataset from the top of the article to that reference data, and output one event for each sensor measure above its threshold. That means our single event above can generate multiple output events if multiple sensors are above their respective thresholds, thanks to the join. To achieve similar results without a join, see the section below.

SELECT
    input.DeviceID,
    thresholds.SensorName,
    "Alert: Sensor above threshold" AS AlertMessage
FROM input      -- stream input
JOIN thresholds -- reference data input
ON
    input.DeviceId = thresholds.DeviceId
WHERE
    GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value

GetRecordPropertyValue selects the property in SensorReadings, which name matches the property name coming from the reference data. Then the associated value from SensorReadings is extracted.

The result is:

DeviceID SensorName AlertMessage
12345 Humidity Alert : Sensor above threshold

Convert record fields into separate events

To convert record fields into separate events, use the APPLY operator together with the GetRecordProperties function.

With the original sample data, the following query could be used to extract properties into different events.

SELECT
    event.DeviceID,
    sensorReading.PropertyName,
    sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading

The result is:

DeviceID SensorName AlertMessage
12345 Temperature 80
12345 Humidity 70
12345 CustomSensor01 5
12345 CustomSensor02 99
12345 SensorMetadata [object Object]

Using WITH, it's then possible to route those events to different destinations:

WITH Stage0 AS
(
    SELECT
        event.DeviceID,
        sensorReading.PropertyName,
        sensorReading.PropertyValue
    FROM input as event
    CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
)

SELECT DeviceID, PropertyValue AS Temperature INTO TemperatureOutput FROM Stage0 WHERE PropertyName = 'Temperature'
SELECT DeviceID, PropertyValue AS Humidity INTO HumidityOutput FROM Stage0 WHERE PropertyName = 'Humidity'

Parse JSON record in SQL reference data

When using Azure SQL Database as reference data in your job, it's possible to have a column that has data in JSON format. An example is shown below.

DeviceID Data
12345 {"key": "value1"}
54321 {"key": "value2"}

You can parse the JSON record in the Data column by writing a simple JavaScript user-defined function.

function parseJson(string) {
return JSON.parse(string);
}

You can then create a step in your Stream Analytics query as shown below to access the fields of your JSON records.

WITH parseJson as
(
SELECT DeviceID, udf.parseJson(sqlRefInput.Data) as metadata,
FROM sqlRefInput
)

SELECT metadata.key
INTO output
FROM streamInput
JOIN parseJson 
ON streamInput.DeviceID = parseJson.DeviceID

Array data types

Array data types are an ordered collection of values. Some typical operations on array values are detailed below. These examples use the functions GetArrayElement, GetArrayElements, GetArrayLength, and the APPLY operator.

Here's an example of a event. Both CustomSensor03 and SensorMetadata are of type array:

{
    "DeviceId" : "12345",
    "SensorReadings" :
    {
        "Temperature" : 80,
        "Humidity" : 70,
        "CustomSensor01" : 5,
        "CustomSensor02" : 99,
        "CustomSensor03": [12,-5,0]
     },
    "SensorMetadata":[
        {          
            "smKey":"Manufacturer",
            "smValue":"ABC"                
        },
        {
            "smKey":"Version",
            "smValue":"1.2.45"
        }
    ]
}

Working with a specific array element

Select array element at a specified index (selecting the first array element):

SELECT
    GetArrayElement(SensorReadings.CustomSensor03, 0) AS firstElement
FROM input

The result is:

firstElement
12

Select array length

SELECT
    GetArrayLength(SensorReadings.CustomSensor03) AS arrayLength
FROM input

The result is:

arrayLength
3

Convert array elements into separate events

Select all array element as individual events. The APPLY operator together with the GetArrayElements built-in function extracts all array elements as individual events:

SELECT
    DeviceId,
	CustomSensor03Record.ArrayIndex,
	CustomSensor03Record.ArrayValue
FROM input
CROSS APPLY GetArrayElements(SensorReadings.CustomSensor03) AS CustomSensor03Record

The result is:

DeviceId ArrayIndex ArrayValue
12345 0 12
12345 1 -5
12345 2 0
SELECT   
    i.DeviceId,	
    SensorMetadataRecords.ArrayValue.smKey as smKey,
    SensorMetadataRecords.ArrayValue.smValue as smValue
FROM input i
CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords

The result is:

DeviceId smKey smValue
12345 Manufacturer ABC
12345 Version 1.2.45

If the extracted fields need to appear in columns, it is possible to pivot the dataset using the WITH syntax in addition to the JOIN operation. That join will require a time boundary condition that prevents duplication:

WITH DynamicCTE AS (
	SELECT   
		i.DeviceId,
		SensorMetadataRecords.ArrayValue.smKey as smKey,
		SensorMetadataRecords.ArrayValue.smValue as smValue
	FROM input i
	CROSS APPLY GetArrayElements(SensorMetadata) AS SensorMetadataRecords 
)

SELECT
	i.DeviceId,
	i.Location.*,
	V.smValue AS 'smVersion',
	M.smValue AS 'smManufacturer'
FROM input i
LEFT JOIN DynamicCTE V ON V.smKey = 'Version' and V.DeviceId = i.DeviceId AND DATEDIFF(minute,i,V) BETWEEN 0 AND 0 
LEFT JOIN DynamicCTE M ON M.smKey = 'Manufacturer' and M.DeviceId = i.DeviceId AND DATEDIFF(minute,i,M) BETWEEN 0 AND 0

The result is:

DeviceId Lat Long smVersion smManufacturer
12345 47 122 1.2.45 ABC

See Also

Data Types in Azure Stream Analytics