RedisStreamTrigger for Azure Functions

The RedisStreamTrigger reads new entries from a stream and surfaces those elements to the function.

Tier Basic Standard, Premium Enterprise, Enterprise Flash
Streams Yes Yes Yes

Important

Redis triggers aren't currently supported for functions running in the Consumption plan.

Important

The Node.js v4 model for Functions isn't yet supported by the Azure Cache for Redis extension. For more details about how the v4 model works, refer to the Azure Functions Node.js developer guide. To learn more about the differences between v3 and v4, refer to the migration guide.

Important

The Python v2 model for Functions isn't yet supported by the Azure Cache for Redis extension. For more details about how the v2 model works, refer to the Azure Functions Python developer guide.

Example

Important

For .NET functions, using the isolated worker model is recommended over the in-process model. For a comparison of the in-process and isolated worker models, see differences between the isolated worker model and the in-process model for .NET on Azure Functions.

Execution mode Description
In-process Your function code runs in the same process as the Functions host process. Supports only Long Term Support (LTS) versions of .NET. To learn more, see Develop .NET class library functions.
Isolated worker process Your function code runs in a separate .NET worker process. Use with supported versions of .NET and .NET Framework. To learn more, see Develop .NET isolated worker process functions.
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.RedisStreamTrigger
{
    internal class SimpleStreamTrigger
    {
        private readonly ILogger<SimpleStreamTrigger> logger;

        public SimpleStreamTrigger(ILogger<SimpleStreamTrigger> logger)
        {
            this.logger = logger;
        }

        [Function(nameof(SimpleStreamTrigger))]
        public void Run(
            [RedisStreamTrigger(Common.connectionStringSetting, "streamKey")] string entry)
        {
            logger.LogInformation(entry);
        }
    }
}

package com.function.RedisStreamTrigger;

import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.redis.annotation.*;

public class SimpleStreamTrigger {
    @FunctionName("SimpleStreamTrigger")
    public void run(
            @RedisStreamTrigger(
                name = "req",
                connection = "redisConnectionString",
                key = "streamTest",
                pollingIntervalInMs = 1000,
                maxBatchSize = 1)
                String message,
            final ExecutionContext context) {
            context.getLogger().info(message);
    }
}

This sample uses the same index.js file, with binding data in the function.json file.

Here's the index.js file:

module.exports = async function (context, entry) {
    context.log(entry);
}

From function.json, here's the binding data:

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "index.js"
}

This sample uses the same run.ps1 file, with binding data in the function.json file.

Here's the run.ps1 file:

param($entry, $TriggerMetadata)
Write-Host ($entry | ConvertTo-Json)

From function.json, here's the binding data:

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "run.ps1"
}

The Python v1 programming model requires you to define bindings in a separate function.json file in the function folder. For more information, see the Python developer guide.

This sample uses the same __init__.py file, with binding data in the function.json file.

Here's the __init__.py file:

import logging

def main(entry: str):
    logging.info(entry)

From function.json, here's the binding data:

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "__init__.py"
}

Attributes

Parameters Description Required Default
Connection The name of the application setting that contains the cache connection string, such as: <cacheName>.redis.cache.chinacloudapi.cn:6380,password... Yes
Key Key to read from. Yes
PollingIntervalInMs How often to poll the Redis server in milliseconds. Optional 1000
MessagesPerWorker The number of messages each functions worker should process. Used to determine how many workers the function should scale to. Optional 100
Count Number of elements to pull from Redis at one time. Optional 10
DeleteAfterProcess Indicates if the function deletes the stream entries after processing. Optional false

Annotations

Parameter Description Required Default
name entry Yes
connection The name of the application setting that contains the cache connection string, such as: <cacheName>.redis.cache.chinacloudapi.cn:6380,password... Yes
key Key to read from. Yes
pollingIntervalInMs How frequently to poll Redis, in milliseconds. Optional 1000
messagesPerWorker The number of messages each functions worker should process. It's used to determine how many workers the function should scale to. Optional 100
count Number of entries to read from Redis at one time. Entries are processed in parallel. Optional 10
deleteAfterProcess Whether to delete the stream entries after the function has run. Optional false

Configuration

The following table explains the binding configuration properties that you set in the function.json file.

function.json Properties Description Required Default
type Yes
deleteAfterProcess Optional false
connection The name of the application setting that contains the cache connection string, such as: <cacheName>.redis.cache.chinacloudapi.cn:6380,password... Yes
key The key to read from. Yes
pollingIntervalInMs How often to poll Redis in milliseconds. Optional 1000
messagesPerWorker (optional) The number of messages each functions worker should process. Used to determine how many workers the function should scale Optional 100
count Number of entries to read from Redis at one time. These are processed in parallel. Optional 10
name Yes
direction Yes

See the Example section for complete examples.

Usage

The RedisStreamTrigger Azure Function reads new entries from a stream and surfaces those entries to the function.

The trigger polls Redis at a configurable fixed interval, and uses XREADGROUP to read elements from the stream.

The consumer group for all instances of a function is the name of the function, that is, SimpleStreamTrigger for the StreamTrigger sample.

Each functions instance uses the WEBSITE_INSTANCE_ID or generates a random GUID to use as its consumer name within the group to ensure that scaled out instances of the function don't read the same messages from the stream.

Type Description
byte[] The message from the channel.
string The message from the channel.
Custom The trigger uses Json.NET serialization to map the message from the channel from a string into a custom type.