适用于 Azure Functions 的 Azure 队列存储触发器

当消息添加到 Azure 队列存储时,队列存储触发器将运行函数。

面向消耗计划和高级计划的 Azure 队列存储缩放决策是通过基于目标的缩放完成的。 有关详细信息,请参阅基于目标的缩放

重要

本文使用选项卡来支持多个版本的 Node.js 编程模型。 v4 模型目前处于预览状态,旨在为 JavaScript 和 TypeScript 开发人员提供更为灵活和直观的体验。 在升级指南中详细了解 v3 和 v4 之间的差异。

Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。

使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南

本文同时支持两个编程模型。

示例

在队列中收到新项时,可以使用队列触发器启动一个函数。 队列消息作为输入提供给该函数。

可以使用以下 C# 模式之一创建 C# 函数:

  • 进程内类库:编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。
  • 独立工作进程类库:编译的 C# 函数,该函数在独立于运行时的工作进程中运行。 需要独立的工作进程才能支持在非 LTS 版 .NET 和 .NET Framework 上运行的 C# 函数。
  • C# 脚本:主要在 Azure 门户中创建 C# 函数时使用。

以下示例显示了一个 C# 函数,该函数在每次处理队列项目时轮询 input-queue 队列并将多条消息写入输出队列。

// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp
{
    public class QueueFunction
    {
        private readonly ILogger<QueueFunction> _logger;

        public QueueFunction(ILogger<QueueFunction> logger)
        {
            _logger = logger;
        }

        //<docsnippet_queue_output_binding>
        //<docsnippet_queue_trigger>
        [Function(nameof(QueueFunction))]
        [QueueOutput("output-queue")]
        public string[] Run([QueueTrigger("input-queue")] Album myQueueItem, FunctionContext context)
        //</docsnippet_queue_trigger>
        {
            // Use a string array to return more than one message.
            string[] messages = {
                $"Album name = {myQueueItem.Name}",
                $"Album songs = {myQueueItem.Songs.ToString()}"};

            _logger.LogInformation("{msg1},{msg2}", messages[0], messages[1]);

            // Queue Output messages
            return messages;
        }
        //</docsnippet_queue_output_binding>

        /// <summary>
        /// This function demonstrates binding to a single <see cref="QueueMessage"/>.
        /// </summary>
        [Function(nameof(QueueMessageFunction))]
        public void QueueMessageFunction([QueueTrigger("input-queue")] QueueMessage message)
        {
            _logger.LogInformation(message.MessageText);
        }

        /// <summary>
        /// This function demonstrates binding to a single <see cref="BinaryData"/>.
        /// </summary>
        [Function(nameof(QueueBinaryDataFunction))]
        public void QueueBinaryDataFunction([QueueTrigger("input-queue")] BinaryData message)
        {
            _logger.LogInformation(message.ToString());
        }
    }

    public class Album
    {
        public string Id { get; set; }

        public string Name { get; set; }

        public List<string> Songs { get; set; }
    }
}

以下 Java 示例显示了一个存储队列触发器函数,该函数用于记录放入队列 myqueuename 的触发消息。

@FunctionName("queueprocessor")
public void run(
    @QueueTrigger(name = "msg",
                queueName = "myqueuename",
                connection = "myconnvarname") String message,
    final ExecutionContext context
) {
    context.getLogger().info(message);
}

以下示例显示队列触发器 TypeScript 函数。 每次处理某个队列项之后,该函数会轮询 myqueue-items 队列并写入日志。

import { app, InvocationContext } from '@azure/functions';

export async function storageQueueTrigger1(queueItem: unknown, context: InvocationContext): Promise<void> {
    context.log('Storage queue function processed work item:', queueItem);
    context.log('expirationTime =', context.triggerMetadata.expirationTime);
    context.log('insertionTime =', context.triggerMetadata.insertionTime);
    context.log('nextVisibleTime =', context.triggerMetadata.nextVisibleTime);
    context.log('id =', context.triggerMetadata.id);
    context.log('popReceipt =', context.triggerMetadata.popReceipt);
    context.log('dequeueCount =', context.triggerMetadata.dequeueCount);
}

app.storageQueue('storageQueueTrigger1', {
    queueName: 'myqueue-items',
    connection: 'MyStorageConnectionAppSetting',
    handler: storageQueueTrigger1,
});

使用情况 部分介绍了 queueItem消息元数据部分解释了所有其他所示变量。

以下示例显示队列触发器 JavaScript 函数。 每次处理某个队列项之后,该函数会轮询 myqueue-items 队列并写入日志。

const { app } = require('@azure/functions');

app.storageQueue('storageQueueTrigger1', {
    queueName: 'myqueue-items',
    connection: 'MyStorageConnectionAppSetting',
    handler: (queueItem, context) => {
        context.log('Storage queue function processed work item:', queueItem);
        context.log('expirationTime =', context.triggerMetadata.expirationTime);
        context.log('insertionTime =', context.triggerMetadata.insertionTime);
        context.log('nextVisibleTime =', context.triggerMetadata.nextVisibleTime);
        context.log('id =', context.triggerMetadata.id);
        context.log('popReceipt =', context.triggerMetadata.popReceipt);
        context.log('dequeueCount =', context.triggerMetadata.dequeueCount);
    },
});

使用情况 部分介绍了 queueItem消息元数据部分解释了所有其他所示变量。

下面的示例演示如何通过触发器读取传递给函数的队列消息。

存储队列触发器在 function.json 文件中定义,其中的 type 设置为 queueTrigger

{
  "bindings": [
    {
      "name": "QueueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "MyStorageConnectionAppSetting"
    }
  ]
}

Run.ps1 文件中的代码将参数声明为 $QueueItem,以允许你在函数中读取队列消息。

# Input bindings are passed in via param block.
param([string] $QueueItem, $TriggerMetadata)

# Write out the queue message and metadata to the information log.
Write-Host "PowerShell queue trigger function processed work item: $QueueItem"
Write-Host "Queue item expiration time: $($TriggerMetadata.ExpirationTime)"
Write-Host "Queue item insertion time: $($TriggerMetadata.InsertionTime)"
Write-Host "Queue item next visible time: $($TriggerMetadata.NextVisibleTime)"
Write-Host "ID: $($TriggerMetadata.Id)"
Write-Host "Pop receipt: $($TriggerMetadata.PopReceipt)"
Write-Host "Dequeue count: $($TriggerMetadata.DequeueCount)"

下面的示例演示如何通过触发器读取传递给函数的队列消息。 该示例取决于是使用 v1 还是 v2 Python 编程模型。

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="QueueFunc")
@app.queue_trigger(arg_name="msg", queue_name="inputqueue",
                   connection="storageAccountConnectionString")  # Queue trigger
@app.queue_output(arg_name="outputQueueItem", queue_name="outqueue",
                 connection="storageAccountConnectionString")  # Queue output binding
def test_function(msg: func.QueueMessage,
                  outputQueueItem: func.Out[str]) -> None:
    logging.info('Python queue trigger function processed a queue item: %s',
                 msg.get_body().decode('utf-8'))
    outputQueueItem.set('hello')

特性

进程内独立工作进程 C# 库使用 QueueTriggerAttribute 来定义函数。 C# 脚本改用 function.json 配置文件,如 C# 脚本指南中所述。

C# 类库中,特性的构造函数采用要监视的队列的名称,如以下示例中所示:

// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp
{
    public class QueueFunction
    {
        private readonly ILogger<QueueFunction> _logger;

        public QueueFunction(ILogger<QueueFunction> logger)
        {
            _logger = logger;
        }

        //<docsnippet_queue_output_binding>
        //<docsnippet_queue_trigger>
        [Function(nameof(QueueFunction))]
        [QueueOutput("output-queue")]
        public string[] Run([QueueTrigger("input-queue")] Album myQueueItem, FunctionContext context)
        //</docsnippet_queue_trigger>
        {
            // Use a string array to return more than one message.
            string[] messages = {
                $"Album name = {myQueueItem.Name}",
                $"Album songs = {myQueueItem.Songs.ToString()}"};

            _logger.LogInformation("{msg1},{msg2}", messages[0], messages[1]);

            // Queue Output messages
            return messages;
        }
        //</docsnippet_queue_output_binding>

        /// <summary>
        /// This function demonstrates binding to a single <see cref="QueueMessage"/>.
        /// </summary>
        [Function(nameof(QueueMessageFunction))]
        public void QueueMessageFunction([QueueTrigger("input-queue")] QueueMessage message)
        {
            _logger.LogInformation(message.MessageText);
        }

        /// <summary>
        /// This function demonstrates binding to a single <see cref="BinaryData"/>.
        /// </summary>
        [Function(nameof(QueueBinaryDataFunction))]
        public void QueueBinaryDataFunction([QueueTrigger("input-queue")] BinaryData message)
        {
            _logger.LogInformation(message.ToString());
        }
    }

    public class Album
    {
        public string Id { get; set; }

        public string Name { get; set; }

        public List<string> Songs { get; set; }
    }
}

此示例还演示了在属性自身中设置连接字符串设置

批注

使用 QueueTrigger 注释可以访问触发函数的队列。 以下示例通过 message 参数向函数提供队列消息。

package com.function;
import com.microsoft.azure.functions.annotation.*;
import java.util.Queue;
import com.microsoft.azure.functions.*;

public class QueueTriggerDemo {
    @FunctionName("QueueTriggerDemo")
    public void run(
        @QueueTrigger(name = "message", queueName = "messages", connection = "MyStorageConnectionAppSetting") String message,
        final ExecutionContext context
    ) {
        context.getLogger().info("Queue message: " + message);
    }
}
属性 说明
name 在函数签名中声明参数名称。 触发函数时,此参数的值包含队列消息的内容。
queueName 在存储帐户中声明队列名称。
connection 指向存储帐户连接字符串。

修饰符

仅适用于 Python v2 编程模型。

对于使用修饰器定义的 Python v2 函数,queue_trigger 修饰器上的以下属性可定义队列存储触发器:

properties 说明
arg_name 在函数签名中声明参数名称。 触发函数时,此参数的值包含队列消息的内容。
queue_name 在存储帐户中声明队列名称。
connection 指向存储帐户连接字符串。

对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。

配置

仅适用于 Python v1 编程模型

下表说明了可以在传递给 app.storageQueue() 方法的 options 对象上设置的属性。

properties 说明
queueName 要轮询的队列的名称。
连接 指定如何连接到 Azure 队列的应用设置或设置集合的名称。 请参阅连接

下表解释了在 function.json 文件和 QueueTrigger 特性中设置的绑定配置属性。

function.json 属性 说明
type 必须设置为 queueTrigger。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 只能在 function.json 文件中设置。 必须设置为 in。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 函数代码中包含队列项有效负载的变量的名称。
queueName 要轮询的队列的名称。
连接 指定如何连接到 Azure 队列的应用设置或设置集合的名称。 请参阅连接

有关完整示例的信息,请参阅示例部分

在本地开发时,需要将应用程序设置添加到 Values 集合中的 local.settings.json 文件中。

使用情况

注意

函数需要 base64 编码字符串。 对编码类型进行的任何调整(若要将数据作为 base64 编码字符串进行准备)需要在调用服务中实现。

队列触发器的用法取决于扩展包版本,以及函数应用中使用的 C# 模态,可以是以下模式之一:

独立工作进程类库的已编译 C# 函数在独立于运行时的进程中运行。

选择一个版本以查看模式和版本的使用情况详细信息。

队列触发器可以绑定到以下类型:

类型 说明
string 消息内容即字符串。 当消息为简单文本时使用。
byte[] 消息的字节数。
JSON 可序列化类型 当队列消息包含 JSON 数据时,Functions 会尝试将 JSON 数据反序列化为普通的旧 CLR 对象 (POCO) 类型。
QueueMessage (Preview1)
消息。
BinaryData (Preview1)
消息的字节数。

1 若要使用这些类型,需要引用 Microsoft.Azure.Functions.Worker.Extensions.Storage.Queues 5.1.3-preview1 或更高版本以及 SDK 类型绑定的常见依赖项

使用 QueueTrigger 注释可以访问触发函数的队列消息。

将队列项作为函数的第一个参数访问。 如果有效负载为 JSON,该值将反序列化为对象。

通过与 function.json 文件中绑定的 name 参数指定的名称匹配的字符串参数访问队列消息。

通过类型为 QueueMessage 的参数访问队列消息。

Metadata

队列触发器提供了数个元数据属性。 对于提供对消息元数据的访问的语言工作人员,这些属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。

消息元数据属性是 CloudQueueMessage 类的成员。

可以从 context.triggerMetadata 访问消息元数据属性。

可以从传递的 $TriggerMetadata 参数访问消息元数据属性。

properties 类型​​ 说明
QueueTrigger string 队列有效负载(如果是有效的字符串)。 如果队列消息有效负载是字符串,则 QueueTrigger 包含的值与 function.jsonname 属性命名的变量的值相同。
DequeueCount long 此消息取消排队的次数。
ExpirationTime DateTimeOffset 消息过期的时间。
Id string 队列消息 ID。
InsertionTime DateTimeOffset 消息添加到队列的时间。
NextVisibleTime DateTimeOffset 消息下一次可见的时间。
PopReceipt string 消息的 pop 接收方。

可以从传递的绑定参数(前面示例中的 msg)访问以下消息元数据属性。

properties 说明
body 作为字符串的队列有效负载。
dequeue_count 此消息取消排队的次数。
expiration_time 消息过期的时间。
id 队列消息 ID。
insertion_time 消息添加到队列的时间。
time_next_visible 消息下一次可见的时间。
pop_receipt 消息的 pop 接收方。

连接

connection 属性是对环境配置的引用,它指定应用应如何连接到 Azure 队列。 它可能指定:

如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。

连接字符串

若要获取连接字符串,请执行管理存储帐户访问密钥中显示的步骤。

此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection 属性指定的值匹配。

如果应用设置名称以“AzureWebJobs”开始,则只能在此处指定该名称的余下部分。 例如,如果将 connection 设置为“MyStorage”,Functions 运行时将会查找名为“AzureWebJobsMyStorage”的应用设置。如果将 connection 留空,Functions 运行时将使用应用设置中名为 AzureWebJobsStorage 的默认存储连接字符串。

基于标识的连接

如果使用 5.x 或更高版本的扩展,则无需将连接字符串与机密配合使用,可以让应用使用 Azure Active Directory 标识。 要使用标识,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection 属性。

如果将 connection 设置为“AzureWebJobsStorage”,请参阅使用标识连接到主机存储。 对于所有其他连接,扩展需要以下属性:

属性 环境变量模板 说明 示例值
队列服务 URI <CONNECTION_NAME_PREFIX>__queueServiceUri1 要连接到的队列服务的数据平面 URI,使用 HTTPS 方案。 https://<storage_account_name>.queue.core.chinacloudapi.cn

1 <CONNECTION_NAME_PREFIX>__serviceUri 可以用作别名。 如果同时提供了两个窗体,则会使用 queueServiceUri 窗体。 如果要跨 Blob、队列和/或表使用总体连接配置,则无法使用 serviceUri 窗体。

可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性

在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credentialclientID 属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发

向标识授予权限

无论使用何种标识,都必须具有执行所需操作的权限。 对于大多数 Azure 服务,这意味着你需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色

重要

某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。

你将需要创建一个角色分配,以便在运行时提供对队列的访问权限。 所有者等管理角色还不够。 下表显示了在正常操作中使用队列存储扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。

绑定类型 内置角色示例
触发器 存储队列数据读取者存储队列数据消息处理者
输出绑定 存储队列数据参与者存储队列数据消息发送方

有害消息

队列触发函数失败时,Azure Functions 会针对给定的队列消息重试该函数最多 5 次(包括第一次尝试)。 如果 5 次尝试全部失败,函数运行时会将一则消息添加到名为 <originalqueuename>-poison 的队列。 可以编写一个函数来处理有害队列中的消息,并记录这些消息,或者发送需要注意的通知。

若要手动处理有害消息,请检查队列消息的 dequeueCount

速览锁定

使用存储服务提供的可见性机制,队列触发器会自动执行扫视锁定模式。 当消息被触发的函数取消排队时,它们会被标记为不可见。 执行队列触发的函数可以对队列中的消息产生以下结果之一:

  • 函数执行成功完成,消息从队列中删除。
  • 函数执行失败,Functions 主机根据 host.json 文件中的 visibilityTimeout 设置更新消息的可见性。 默认可见性超时为零,这意味着消息会立即重新出现在队列中以供重新处理。 使用 visibilityTimeout 设置可延迟无法处理的消息的重新处理。 此超时设置适用于函数应用中的所有队列触发的函数。
  • Functions 主机在函数执行期间崩溃。 发生这种不常见的事件时,主机无法将 visibilityTimeout 应用于正在处理的消息。 取而代之的是,该消息将保留存储服务设置的默认超时时间 10 分钟。 10 分钟后,该消息会重新出现在队列中以供重新处理。 无法更改此服务定义的默认超时。

轮询算法

队列触发器实现了随机指数退让算法,以降低空闲队列轮询对存储交易成本造成的影响。

该算法使用以下逻辑:

  • 当找到一条消息时,运行时会等待 100 毫秒,然后检查另一条消息。
  • 如果未找到任何消息,它会等待大约 200 毫秒,然后重试。
  • 如果后续尝试获取队列消息失败,则等待时间会继续增加,直到达到最长等待时间(默认为 1 分钟)。
  • 可以通过 host.json 文件中的 maxPollingInterval 属性配置最大等待时间。

在本地开发期间,最大轮询间隔默认为两秒。

注意

关于在消耗计划中托管函数应用的计费问题,我们不会针对你在运行时所花费的轮询时间收取费用。

并发

当有多个队列消息在等待时,队列触发器会检索一批消息并以并发方式调用函数实例来处理它们。 默认情况下,批大小为 16。 当处理的数量下降到 8 时,运行时可获取另一个批,并开始处理这些消息。 因此,单个虚拟机 (VM) 上每个函数处理的最大并发消息数是 24。 此限制分别应用于每个 VM 上各个队列触发的函数。 如果函数应用横向扩展到多个 VM,则每个 VM 都会等待触发器并尝试运行函数。 例如,如果某个函数应用横向扩展到 3 个 VM,则每个队列触发的函数的默认最大并发实例数是 72。

可以在 host.json 文件中配置用于获取新批的批大小和阈值。 如果希望在函数应用中最大程度地降低查询触发的函数的并行执行,可以将批大小设置为 1。 只有当函数在单个虚拟机 (VM) 上运行时,此设置才可消除并发。

队列触发器会自动阻止函数同时多次处理队列消息。

host.json 属性

host.json 文件包含控制队列触发器行为的设置。 有关可用设置的详细信息,请参阅 host.json 设置部分。

后续步骤