Azure Functions 的 RedisStreamTrigger

RedisStreamTrigger 从流中读取新条目并将这些元素呈现给函数。

Basic 标准、高级 Enterprise、Enterprise Flash

重要

消耗计划中运行的函数当前不支持 Redis 触发器。

重要

Azure Cache for Redis 缓存扩展尚不支持适用于 Functions 的 Node.js v4 模型。 有关 v4 模型工作原理的更多详细信息,请参阅 Azure Functions Node.js 开发人员指南。 要详细了解 v3 和 v4 之间的差异,请参阅迁移指南

重要

Azure Cache for Redis 缓存扩展尚不支持适用于 Functions 的 Python v2 模型。 有关 v2 模型工作原理的更多详细信息,请参阅 Azure Functions Python 开发人员指南

示例

重要

对于 .NET 函数,建议在进程内模型中使用独立辅助角色模型。 有关进程内独立辅助角色模型的比较,请参阅 Azure Functions 上 .NET 的独立辅助角色模型与进程内模型之间的差异。

执行模式 说明
进程内 函数代码与 Functions 宿主进程在同一进程中运行。 仅支持 .NET 的长期支持 (LTS) 版本。 若要了解详细信息,请参阅开发 .NET 类库函数
独立工作进程 函数代码在单独的 .NET 工作进程中运行。 与受支持的 .NET 和 .NET Framework 版本结合使用。 若要了解详细信息,请参阅开发 .NET 独立工作进程函数
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.RedisStreamTrigger
{
    internal class SimpleStreamTrigger
    {
        private readonly ILogger<SimpleStreamTrigger> logger;

        public SimpleStreamTrigger(ILogger<SimpleStreamTrigger> logger)
        {
            this.logger = logger;
        }

        [Function(nameof(SimpleStreamTrigger))]
        public void Run(
            [RedisStreamTrigger(Common.connectionStringSetting, "streamKey")] string entry)
        {
            logger.LogInformation(entry);
        }
    }
}

package com.function.RedisStreamTrigger;

import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.redis.annotation.*;

public class SimpleStreamTrigger {
    @FunctionName("SimpleStreamTrigger")
    public void run(
            @RedisStreamTrigger(
                name = "req",
                connection = "redisConnectionString",
                key = "streamTest",
                pollingIntervalInMs = 1000,
                maxBatchSize = 1)
                String message,
            final ExecutionContext context) {
            context.getLogger().info(message);
    }
}

此示例使用相同的 index.js 文件,function.json 文件中包含绑定数据。

index.js文件如下所示:

module.exports = async function (context, entry) {
    context.log(entry);
}

function.json中,下面是绑定数据:

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "index.js"
}

此示例使用相同的 run.ps1 文件,function.json 文件中包含绑定数据。

run.ps1文件如下所示:

param($entry, $TriggerMetadata)
Write-Host ($entry | ConvertTo-Json)

function.json中,下面是绑定数据:

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "run.ps1"
}

Python v1 编程模型要求在函数文件夹中的单独 function.json 文件中定义绑定。 有关详细信息,请参阅 Python 开发人员指南

此示例使用相同的 __init__.py 文件,function.json 文件中包含绑定数据。

__init__.py文件如下所示:

import logging

def main(entry: str):
    logging.info(entry)

function.json中,下面是绑定数据:

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "__init__.py"
}

特性

参数 说明 需要 默认
Connection 包含缓存连接字符串的应用程序设置的名称,例如:<cacheName>.redis.cache.chinacloudapi.cn:6380,password...
Key 要从中读取的键。
PollingIntervalInMs 轮询 Redis 服务器的频率(以毫秒为单位)。 可选 1000
MessagesPerWorker 每个函数辅助角色应处理的消息数。 用于确定函数应缩放到的辅助角色数。 可选 100
Count 一次从 Redis 中拉取的元素数。 可选 10
DeleteAfterProcess 指示函数是否在处理后删除流条目。 可选 false

批注

参数 步骤 需要 默认
name entry
connection 包含缓存连接字符串的应用程序设置的名称,例如:<cacheName>.redis.cache.chinacloudapi.cn:6380,password...
key 要从中读取的键。
pollingIntervalInMs 轮询 Redis 的频率(以毫秒为单位)。 可选 1000
messagesPerWorker 每个函数辅助角色应处理的消息数。 用于确定应将函数范围扩展到的辅助角色数。 可选 100
count 一次从 Redis 中读取的条目数。 系统会并行处理条目。 可选 10
deleteAfterProcess 是否在函数运行后删除流条目。 可选 false

配置

下表解释了在 function.json 文件中设置的绑定配置属性。

function.json 属性 说明 需要 默认
type
deleteAfterProcess 可选 false
connection 包含缓存连接字符串的应用程序设置的名称,例如:<cacheName>.redis.cache.chinacloudapi.cn:6380,password...
key 要从中读取的键。
pollingIntervalInMs 轮询 Redis 的频率(以毫秒为单位)。 可选 1000
messagesPerWorker (可选)每个函数辅助角色应处理的消息数。 用于确定函数应缩放到的辅助角色数 可选 100
count 一次从 Redis 中读取的条目数。 这些是并行处理的。 可选 10
name
direction

有关完整示例,请参阅示例部分。

使用情况

RedisStreamTrigger Azure 函数从流中读取新条目,并将这些条目呈现到函数中。

触发器以可配置的固定间隔轮询 Redis,并使用 XREADGROUP 从流中读取元素。

函数的所有实例的使用者组是函数的名称,对于 StreamTrigger 示例,为 SimpleStreamTrigger

每个函数实例会使用 WEBSITE_INSTANCE_ID 或创建一个随机 GUID,用作其在组中的使用者名称,以确保函数的横向扩展实例不会从流中读取相同的消息。

类型 描述
byte[] 来自通道的消息。
string 来自通道的消息。
Custom 触发器使用 Json.NET 序列化将通道中的消息从 string 映射到自定义类型。