适用于 Azure Functions 的 Azure 队列存储触发器
当消息添加到 Azure 队列存储时,队列存储触发器将运行函数。
面向消耗计划和高级计划的 Azure 队列存储缩放决策是通过基于目标的缩放完成的。 有关详细信息,请参阅基于目标的缩放。
重要
本文使用选项卡来支持多个版本的 Node.js 编程模型。 v4 模型目前处于预览状态,旨在为 JavaScript 和 TypeScript 开发人员提供更为灵活和直观的体验。 在升级指南中详细了解 v3 和 v4 之间的差异。
Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。
使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南。
本文同时支持两个编程模型。
示例
在队列中收到新项时,可以使用队列触发器启动一个函数。 队列消息作为输入提供给该函数。
可以使用以下 C# 模式之一创建 C# 函数:
- 进程内类库:编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。
- 独立进程类库:编译的 C# 函数,该函数在独立于运行时的进程中运行。 支持在 .NET 5.0 上运行的 C# 函数需要独立的进程。
- C# 脚本:主要在 Azure 门户中创建 C# 函数时使用。
重要
对进程内模型的支持将于 2026 年 11 月 10 日结束。 为获得完全支持,强烈建议将应用迁移到独立工作模型。
以下示例显示了一个 C# 函数,该函数在每次处理队列项目时轮询 input-queue
队列并将多条消息写入输出队列。
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp
{
public class QueueFunction
{
private readonly ILogger<QueueFunction> _logger;
public QueueFunction(ILogger<QueueFunction> logger)
{
_logger = logger;
}
//<docsnippet_queue_output_binding>
//<docsnippet_queue_trigger>
[Function(nameof(QueueFunction))]
[QueueOutput("output-queue")]
public string[] Run([QueueTrigger("input-queue")] Album myQueueItem, FunctionContext context)
//</docsnippet_queue_trigger>
{
// Use a string array to return more than one message.
string[] messages = {
$"Album name = {myQueueItem.Name}",
$"Album songs = {myQueueItem.Songs.ToString()}"};
_logger.LogInformation("{msg1},{msg2}", messages[0], messages[1]);
// Queue Output messages
return messages;
}
//</docsnippet_queue_output_binding>
/// <summary>
/// This function demonstrates binding to a single <see cref="QueueMessage"/>.
/// </summary>
[Function(nameof(QueueMessageFunction))]
public void QueueMessageFunction([QueueTrigger("input-queue")] QueueMessage message)
{
_logger.LogInformation(message.MessageText);
}
/// <summary>
/// This function demonstrates binding to a single <see cref="BinaryData"/>.
/// </summary>
[Function(nameof(QueueBinaryDataFunction))]
public void QueueBinaryDataFunction([QueueTrigger("input-queue")] BinaryData message)
{
_logger.LogInformation(message.ToString());
}
}
public class Album
{
public string Id { get; set; }
public string Name { get; set; }
public List<string> Songs { get; set; }
}
}
以下 Java 示例显示了一个存储队列触发器函数,该函数用于记录放入队列 myqueuename
的触发消息。
@FunctionName("queueprocessor")
public void run(
@QueueTrigger(name = "msg",
queueName = "myqueuename",
connection = "myconnvarname") String message,
final ExecutionContext context
) {
context.getLogger().info(message);
}
以下示例显示队列触发器 TypeScript 函数。 每次处理某个队列项之后,该函数会轮询 myqueue-items
队列并写入日志。
import { app, InvocationContext } from '@azure/functions';
export async function storageQueueTrigger1(queueItem: unknown, context: InvocationContext): Promise<void> {
context.log('Storage queue function processed work item:', queueItem);
context.log('expirationTime =', context.triggerMetadata.expirationTime);
context.log('insertionTime =', context.triggerMetadata.insertionTime);
context.log('nextVisibleTime =', context.triggerMetadata.nextVisibleTime);
context.log('id =', context.triggerMetadata.id);
context.log('popReceipt =', context.triggerMetadata.popReceipt);
context.log('dequeueCount =', context.triggerMetadata.dequeueCount);
}
app.storageQueue('storageQueueTrigger1', {
queueName: 'myqueue-items',
connection: 'MyStorageConnectionAppSetting',
handler: storageQueueTrigger1,
});
以下示例显示队列触发器 JavaScript 函数。 每次处理某个队列项之后,该函数会轮询 myqueue-items
队列并写入日志。
const { app } = require('@azure/functions');
app.storageQueue('storageQueueTrigger1', {
queueName: 'myqueue-items',
connection: 'MyStorageConnectionAppSetting',
handler: (queueItem, context) => {
context.log('Storage queue function processed work item:', queueItem);
context.log('expirationTime =', context.triggerMetadata.expirationTime);
context.log('insertionTime =', context.triggerMetadata.insertionTime);
context.log('nextVisibleTime =', context.triggerMetadata.nextVisibleTime);
context.log('id =', context.triggerMetadata.id);
context.log('popReceipt =', context.triggerMetadata.popReceipt);
context.log('dequeueCount =', context.triggerMetadata.dequeueCount);
},
});
下面的示例演示如何通过触发器读取传递给函数的队列消息。
存储队列触发器在 function.json 文件中定义,其中的 type
设置为 queueTrigger
。
{
"bindings": [
{
"name": "QueueItem",
"type": "queueTrigger",
"direction": "in",
"queueName": "messages",
"connection": "MyStorageConnectionAppSetting"
}
]
}
Run.ps1 文件中的代码将参数声明为 $QueueItem
,以允许你在函数中读取队列消息。
# Input bindings are passed in via param block.
param([string] $QueueItem, $TriggerMetadata)
# Write out the queue message and metadata to the information log.
Write-Host "PowerShell queue trigger function processed work item: $QueueItem"
Write-Host "Queue item expiration time: $($TriggerMetadata.ExpirationTime)"
Write-Host "Queue item insertion time: $($TriggerMetadata.InsertionTime)"
Write-Host "Queue item next visible time: $($TriggerMetadata.NextVisibleTime)"
Write-Host "ID: $($TriggerMetadata.Id)"
Write-Host "Pop receipt: $($TriggerMetadata.PopReceipt)"
Write-Host "Dequeue count: $($TriggerMetadata.DequeueCount)"
下面的示例演示如何通过触发器读取传递给函数的队列消息。 该示例取决于是使用 v1 还是 v2 Python 编程模型。
import logging
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="QueueFunc")
@app.queue_trigger(arg_name="msg", queue_name="inputqueue",
connection="storageAccountConnectionString") # Queue trigger
@app.queue_output(arg_name="outputQueueItem", queue_name="outqueue",
connection="storageAccountConnectionString") # Queue output binding
def test_function(msg: func.QueueMessage,
outputQueueItem: func.Out[str]) -> None:
logging.info('Python queue trigger function processed a queue item: %s',
msg.get_body().decode('utf-8'))
outputQueueItem.set('hello')
特性
进程内和独立工作进程 C# 库使用 QueueTriggerAttribute 来定义函数。 C# 脚本改用 function.json 配置文件,如 C# 脚本指南中所述。
在 C# 类库中,特性的构造函数采用要监视的队列的名称,如以下示例中所示:
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp
{
public class QueueFunction
{
private readonly ILogger<QueueFunction> _logger;
public QueueFunction(ILogger<QueueFunction> logger)
{
_logger = logger;
}
//<docsnippet_queue_output_binding>
//<docsnippet_queue_trigger>
[Function(nameof(QueueFunction))]
[QueueOutput("output-queue")]
public string[] Run([QueueTrigger("input-queue")] Album myQueueItem, FunctionContext context)
//</docsnippet_queue_trigger>
{
// Use a string array to return more than one message.
string[] messages = {
$"Album name = {myQueueItem.Name}",
$"Album songs = {myQueueItem.Songs.ToString()}"};
_logger.LogInformation("{msg1},{msg2}", messages[0], messages[1]);
// Queue Output messages
return messages;
}
//</docsnippet_queue_output_binding>
/// <summary>
/// This function demonstrates binding to a single <see cref="QueueMessage"/>.
/// </summary>
[Function(nameof(QueueMessageFunction))]
public void QueueMessageFunction([QueueTrigger("input-queue")] QueueMessage message)
{
_logger.LogInformation(message.MessageText);
}
/// <summary>
/// This function demonstrates binding to a single <see cref="BinaryData"/>.
/// </summary>
[Function(nameof(QueueBinaryDataFunction))]
public void QueueBinaryDataFunction([QueueTrigger("input-queue")] BinaryData message)
{
_logger.LogInformation(message.ToString());
}
}
public class Album
{
public string Id { get; set; }
public string Name { get; set; }
public List<string> Songs { get; set; }
}
}
此示例还演示了在属性自身中设置连接字符串设置。
批注
使用 QueueTrigger
注释可以访问触发函数的队列。 以下示例通过 message
参数向函数提供队列消息。
package com.function;
import com.microsoft.azure.functions.annotation.*;
import java.util.Queue;
import com.microsoft.azure.functions.*;
public class QueueTriggerDemo {
@FunctionName("QueueTriggerDemo")
public void run(
@QueueTrigger(name = "message", queueName = "messages", connection = "MyStorageConnectionAppSetting") String message,
final ExecutionContext context
) {
context.getLogger().info("Queue message: " + message);
}
}
属性 | 说明 |
---|---|
name |
在函数签名中声明参数名称。 触发函数时,此参数的值包含队列消息的内容。 |
queueName |
在存储帐户中声明队列名称。 |
connection |
指向存储帐户连接字符串。 |
修饰符
仅适用于 Python v2 编程模型。
对于使用修饰器定义的 Python v2 函数,queue_trigger
修饰器上的以下属性可定义队列存储触发器:
properties | 说明 |
---|---|
arg_name |
在函数签名中声明参数名称。 触发函数时,此参数的值包含队列消息的内容。 |
queue_name |
在存储帐户中声明队列名称。 |
connection |
指向存储帐户连接字符串。 |
对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。
配置
仅适用于 Python v1 编程模型。
下表解释了在 function.json 文件和 QueueTrigger
特性中设置的绑定配置属性。
function.json 属性 | 说明 |
---|---|
type | 必须设置为 queueTrigger 。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
direction | 只能在 function.json 文件中设置。 必须设置为 in 。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
name | 函数代码中包含队列项有效负载的变量的名称。 |
queueName | 要轮询的队列的名称。 |
连接 | 指定如何连接到 Azure 队列的应用设置或设置集合的名称。 请参阅连接。 |
有关完整示例的信息,请参阅示例部分。
在本地开发时,需要将应用程序设置添加到 Values
集合中的 local.settings.json 文件中。
使用情况
注意
函数需要 base64 编码字符串。 对编码类型进行的任何调整(若要将数据作为 base64 编码字符串进行准备)需要在调用服务中实现。
队列触发器的用法取决于扩展包版本,以及函数应用中使用的 C# 模态,可以是以下模式之一:
选择一个版本以查看模式和版本的使用情况详细信息。
队列触发器可以绑定到以下类型:
类型 | 说明 |
---|---|
string |
消息内容即字符串。 当消息为简单文本时使用。 |
byte[] |
消息的字节数。 |
JSON 可序列化类型 | 当队列消息包含 JSON 数据时,Functions 会尝试将 JSON 数据反序列化为普通的旧 CLR 对象 (POCO) 类型。 |
QueueMessage | (Preview1) 消息。 |
BinaryData | (Preview1) 消息的字节数。 |
1 若要使用这些类型,需要引用 Microsoft.Azure.Functions.Worker.Extensions.Storage.Queues 5.1.3-preview1 或更高版本以及 SDK 类型绑定的常见依赖项。
使用 QueueTrigger 注释可以访问触发函数的队列消息。
通过与 function.json 文件中绑定的 name
参数指定的名称匹配的字符串参数访问队列消息。
通过类型为 QueueMessage 的参数访问队列消息。
Metadata
队列触发器提供了数个元数据属性。 对于提供对消息元数据的访问的语言工作人员,这些属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。
消息元数据属性是 CloudQueueMessage 类的成员。
可以从 context.triggerMetadata
访问消息元数据属性。
可以从传递的 $TriggerMetadata
参数访问消息元数据属性。
properties | 类型 | 说明 |
---|---|---|
QueueTrigger |
string |
队列有效负载(如果是有效的字符串)。 如果队列消息有效负载是字符串,则 QueueTrigger 包含的值与 function.json 中 name 属性命名的变量的值相同。 |
DequeueCount |
long |
此消息取消排队的次数。 |
ExpirationTime |
DateTimeOffset |
消息过期的时间。 |
Id |
string |
队列消息 ID。 |
InsertionTime |
DateTimeOffset |
消息添加到队列的时间。 |
NextVisibleTime |
DateTimeOffset |
消息下一次可见的时间。 |
PopReceipt |
string |
消息的 pop 接收方。 |
可以从传递的绑定参数(前面示例中的 msg
)访问以下消息元数据属性。
properties | 说明 |
---|---|
body |
作为字符串的队列有效负载。 |
dequeue_count |
此消息取消排队的次数。 |
expiration_time |
消息过期的时间。 |
id |
队列消息 ID。 |
insertion_time |
消息添加到队列的时间。 |
time_next_visible |
消息下一次可见的时间。 |
pop_receipt |
消息的 pop 接收方。 |
连接
connection
属性是对环境配置的引用,它指定应用应如何连接到 Azure 队列。 它可能指定:
如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。
连接字符串
若要获取连接字符串,请执行管理存储帐户访问密钥中显示的步骤。
此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection
属性指定的值匹配。
如果应用设置名称以“AzureWebJobs”开始,则只能在此处指定该名称的余下部分。 例如,如果将 connection
设置为“MyStorage”,Functions 运行时将会查找名为“AzureWebJobsMyStorage”的应用设置。如果将 connection
留空,Functions 运行时将使用应用设置中名为 AzureWebJobsStorage
的默认存储连接字符串。
基于标识的连接
如果使用 5.x 或更高版本的扩展,则无需将连接字符串与机密配合使用,可以让应用使用 Azure Active Directory 标识。 要使用标识,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection
属性。
如果将 connection
设置为“AzureWebJobsStorage”,请参阅使用标识连接到主机存储。 对于所有其他连接,扩展需要以下属性:
属性 | 环境变量模板 | 说明 | 示例值 |
---|---|---|---|
队列服务 URI | <CONNECTION_NAME_PREFIX>__queueServiceUri 1 |
要连接到的队列服务的数据平面 URI,使用 HTTPS 方案。 | https://<storage_account_name>.queue.core.chinacloudapi.cn |
1 <CONNECTION_NAME_PREFIX>__serviceUri
可以用作别名。 如果同时提供了两个窗体,则会使用 queueServiceUri
窗体。 如果要跨 Blob、队列和/或表使用总体连接配置,则无法使用 serviceUri
窗体。
可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性。
在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credential
和 clientID
属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发。
向标识授予权限
无论使用何种标识,都必须具有执行所需操作的权限。 需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色。
重要
某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。
你将需要创建一个角色分配,以便在运行时提供对队列的访问权限。 所有者等管理角色还不够。 下表显示了在正常操作中使用队列存储扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。
绑定类型 | 内置角色示例 |
---|---|
触发器 | 存储队列数据读取者、存储队列数据消息处理者 |
输出绑定 | 存储队列数据参与者、存储队列数据消息发送方 |
有害消息
队列触发函数失败时,Azure Functions 会针对给定的队列消息重试该函数最多 5 次(包括第一次尝试)。 如果 5 次尝试全部失败,函数运行时会将一则消息添加到名为 <originalqueuename>-poison 的队列。 可以编写一个函数来处理有害队列中的消息,并记录这些消息,或者发送需要注意的通知。
若要手动处理有害消息,请检查队列消息的 dequeueCount。
速览锁定
使用存储服务提供的可见性机制,队列触发器会自动执行扫视锁定模式。 当消息被触发的函数取消排队时,它们会被标记为不可见。 执行队列触发的函数可以对队列中的消息产生以下结果之一:
- 函数执行成功完成,消息从队列中删除。
- 函数执行失败,Functions 主机根据 host.json 文件中的
visibilityTimeout
设置更新消息的可见性。 默认可见性超时为零,这意味着消息会立即重新出现在队列中以供重新处理。 使用visibilityTimeout
设置可延迟无法处理的消息的重新处理。 此超时设置适用于函数应用中的所有队列触发的函数。 - Functions 主机在函数执行期间崩溃。 发生这种不常见的事件时,主机无法将
visibilityTimeout
应用于正在处理的消息。 取而代之的是,该消息将保留存储服务设置的默认超时时间 10 分钟。 10 分钟后,该消息会重新出现在队列中以供重新处理。 无法更改此服务定义的默认超时。
轮询算法
队列触发器实现了随机指数退让算法,以降低空闲队列轮询对存储交易成本造成的影响。
该算法使用以下逻辑:
- 当找到一条消息时,运行时会等待 100 毫秒,然后检查另一条消息。
- 如果未找到任何消息,它会等待大约 200 毫秒,然后重试。
- 如果后续尝试获取队列消息失败,则等待时间会继续增加,直到达到最长等待时间(默认为 1 分钟)。
- 可以通过 host.json 文件中的
maxPollingInterval
属性配置最大等待时间。
在本地开发期间,最大轮询间隔默认为两秒。
注意
关于在消耗计划中托管函数应用的计费问题,我们不会针对你在运行时所花费的轮询时间收取费用。
并发
当有多个队列消息在等待时,队列触发器会检索一批消息并以并发方式调用函数实例来处理它们。 默认情况下,批大小为 16。 当处理的数量下降到 8 时,运行时可获取另一个批,并开始处理这些消息。 因此,单个虚拟机 (VM) 上每个函数处理的最大并发消息数是 24。 此限制分别应用于每个 VM 上各个队列触发的函数。 如果函数应用横向扩展到多个 VM,则每个 VM 都会等待触发器并尝试运行函数。 例如,如果某个函数应用横向扩展到 3 个 VM,则每个队列触发的函数的默认最大并发实例数是 72。
可以在 host.json 文件中配置用于获取新批的批大小和阈值。 如果希望在函数应用中最大程度地降低查询触发的函数的并行执行,可以将批大小设置为 1。 只有当函数在单个虚拟机 (VM) 上运行时,此设置才可消除并发。
队列触发器会自动阻止函数同时多次处理队列消息。
host.json 属性
host.json 文件包含控制队列触发器行为的设置。 有关可用设置的详细信息,请参阅 host.json 设置部分。