适用于 Azure Functions 的 Azure 队列存储触发器

当消息添加到 Azure 队列存储时,队列存储触发器将运行函数。

示例

在队列中收到新项时,可以使用队列触发器启动一个函数。 队列消息作为输入提供给该函数。

可以使用以下 C# 模式之一创建 C# 函数:

  • 进程内类库:编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。
  • 独立进程类库:编译的 C# 函数,该函数在独立于运行时的进程中运行。 支持在 .NET 5.0 上运行的 C# 函数需要独立的进程。
  • C# 脚本:主要在 Azure 门户中创建 C# 函数时使用。

以下示例演示可在每次处理某个队列项之后轮询 myqueue-items 队列并写入日志的 C# 函数

public static class QueueFunctions
{
    [FunctionName("QueueTrigger")]
    public static void QueueTrigger(
        [QueueTrigger("myqueue-items")] string myQueueItem, 
        ILogger log)
    {
        log.LogInformation($"C# function processed: {myQueueItem}");
    }
}

以下 Java 示例显示了一个存储队列触发器函数,该函数用于记录放入队列 myqueuename 的触发消息。

@FunctionName("queueprocessor")
public void run(
    @QueueTrigger(name = "msg",
                queueName = "myqueuename",
                connection = "myconnvarname") String message,
    final ExecutionContext context
) {
    context.getLogger().info(message);
}

以下示例演示 function.json 文件中的一个队列触发器绑定以及使用该绑定的 JavaScript 函数。 每次处理某个队列项之后,该函数会轮询 myqueue-items 队列并写入日志。

function.json 文件如下所示:

{
    "disabled": false,
    "bindings": [
        {
            "type": "queueTrigger",
            "direction": "in",
            "name": "myQueueItem",
            "queueName": "myqueue-items",
            "connection":"MyStorageConnectionAppSetting"
        }
    ]
}

配置部分解释了这些属性。

注意

name 参数在 JavaScript 代码中反映为 context.bindings.<name>,其中包含队列项有效负载。 此有效负载也作为第二个参数传递给函数。

JavaScript 代码如下所示:

module.exports = async function (context, message) {
    context.log('Node.js queue trigger function processed work item', message);
    // OR access using context.bindings.<name>
    // context.log('Node.js queue trigger function processed work item', context.bindings.myQueueItem);
    context.log('expirationTime =', context.bindingData.expirationTime);
    context.log('insertionTime =', context.bindingData.insertionTime);
    context.log('nextVisibleTime =', context.bindingData.nextVisibleTime);
    context.log('id =', context.bindingData.id);
    context.log('popReceipt =', context.bindingData.popReceipt);
    context.log('dequeueCount =', context.bindingData.dequeueCount);
};

用法部分解释了 function.json 中的 name 属性命名的 myQueueItem消息元数据部分解释了所有其他所示变量。

下面的示例演示如何通过触发器读取传递给函数的队列消息。

存储队列触发器在 function.json 文件中定义,其中的 type 设置为 queueTrigger

{
  "bindings": [
    {
      "name": "QueueItem",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "MyStorageConnectionAppSetting"
    }
  ]
}

Run.ps1 文件中的代码将参数声明为 $QueueItem,以允许你在函数中读取队列消息。

# Input bindings are passed in via param block.
param([string] $QueueItem, $TriggerMetadata)

# Write out the queue message and metadata to the information log.
Write-Host "PowerShell queue trigger function processed work item: $QueueItem"
Write-Host "Queue item expiration time: $($TriggerMetadata.ExpirationTime)"
Write-Host "Queue item insertion time: $($TriggerMetadata.InsertionTime)"
Write-Host "Queue item next visible time: $($TriggerMetadata.NextVisibleTime)"
Write-Host "ID: $($TriggerMetadata.Id)"
Write-Host "Pop receipt: $($TriggerMetadata.PopReceipt)"
Write-Host "Dequeue count: $($TriggerMetadata.DequeueCount)"

下面的示例演示如何通过触发器读取传递给函数的队列消息。

存储队列触发器在 function.json 中定义,其中 type 设置为 queueTrigger

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "messages",
      "connection": "AzureStorageQueuesConnectionString"
    }
  ]
}

代码 _init_.py 将参数声明为 func.QueueMessage,它允许你在函数中读取队列消息。

import logging
import json

import azure.functions as func

def main(msg: func.QueueMessage):
    logging.info('Python queue trigger function processed a queue item.')

    result = json.dumps({
        'id': msg.id,
        'body': msg.get_body().decode('utf-8'),
        'expiration_time': (msg.expiration_time.isoformat()
                            if msg.expiration_time else None),
        'insertion_time': (msg.insertion_time.isoformat()
                           if msg.insertion_time else None),
        'time_next_visible': (msg.time_next_visible.isoformat()
                              if msg.time_next_visible else None),
        'pop_receipt': msg.pop_receipt,
        'dequeue_count': msg.dequeue_count
    })

    logging.info(result)

特性

进程内独立进程 C# 库使用 QueueTriggerAttribute 来定义函数。 C# 脚本改为使用 function.json 配置文件。

C# 类库中,特性的构造函数采用要监视的队列的名称,如以下示例中所示:

[FunctionName("QueueTrigger")]
public static void Run(
    [QueueTrigger("myqueue-items")] string myQueueItem, 
    ILogger log)
{
    ...
}

可以设置 Connection 属性来指定要使用的存储帐户连接字符串,如以下示例中所示:

[FunctionName("QueueTrigger")]
public static void Run(
    [QueueTrigger("myqueue-items", Connection = "StorageConnectionAppSetting")] string myQueueItem, 
    ILogger log)
{
    ....
}

批注

使用 QueueTrigger 注释可以访问触发函数的队列。 以下示例通过 message 参数向函数提供队列消息。

package com.function;
import com.microsoft.azure.functions.annotation.*;
import java.util.Queue;
import com.microsoft.azure.functions.*;

public class QueueTriggerDemo {
    @FunctionName("QueueTriggerDemo")
    public void run(
        @QueueTrigger(name = "message", queueName = "messages", connection = "MyStorageConnectionAppSetting") String message,
        final ExecutionContext context
    ) {
        context.getLogger().info("Queue message: " + message);
    }
}
属性 说明
name 在函数签名中声明参数名称。 触发函数时,此参数的值包含队列消息的内容。
queueName 在存储帐户中声明队列名称。
connection 指向存储帐户连接字符串。

配置

下表解释了在 function.json 文件和 QueueTrigger 特性中设置的绑定配置属性。

function.json 属性 说明
type 必须设置为 queueTrigger。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 只能在 function.json 文件中设置。 必须设置为 in。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 函数代码中包含队列项有效负载的变量的名称。
queueName 要轮询的队列的名称。
连接 指定如何连接到 Azure 队列的应用设置或设置集合的名称。 请参阅连接

有关完整示例的信息,请参阅示例部分

在本地开发时,需要将应用程序设置添加到 Values 集合中的 local.settings.json 文件中。

使用情况

注意

函数需要 base64 编码字符串。 对编码类型进行的任何调整(若要将数据作为 base64 编码字符串进行准备)需要在调用服务中实现。

队列触发器的用法取决于扩展包版本,以及函数应用中使用的 C# 形式,可以是以下形式之一:

进程内类库是编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。

选择一个版本以查看模式和版本的使用情况详细信息。

使用 string paramName 等方法参数访问消息数据。 paramName 是在 QueueTriggerAttribute 中指定的值。 可以绑定到以下任何类型:

绑定到对象时,Functions 运行时会尝试将 JSON 有效负载反序列化为代码中定义的任意类的实例。 有关使用 QueueMessage 的示例,请参阅扩展的 GitHub 存储库

尽管该特性采用 Connection 属性,但你也可以使用 StorageAccountAttribute 来指定存储帐户连接。 如果需要使用一个不同于库中的其他函数的存储帐户,可以执行此操作。 构造函数采用包含存储连接字符串的应用设置的名称。 可以在参数、方法或类级别应用该特性。 以下示例演示类级别和方法级别:

[StorageAccount("ClassLevelStorageAppSetting")]
public static class AzureFunctions
{
    [FunctionName("StorageTrigger")]
    [StorageAccount("FunctionLevelStorageAppSetting")]
    public static void Run( //...
{
    ...
}

要使用的存储帐户按以下顺序确定:

  • 触发器或绑定特性的 Connection 属性。
  • 作为触发器或绑定特性应用到同一参数的 StorageAccount 特性。
  • 应用到函数的 StorageAccount 特性。
  • 应用到类的 StorageAccount 特性。
  • 函数应用的默认存储帐户,在 AzureWebJobsStorage 应用程序设置中定义。

使用 QueueTrigger 注释可以访问触发函数的队列消息。

可通过 context.bindings.<NAME> 使用队列项有效负载,其中,<NAME>function.json 中定义的名称相匹配。 如果有效负载为 JSON,该值将反序列化为对象。

通过与 function.json 文件中绑定的 name 参数指定的名称匹配的字符串参数访问队列消息。

通过类型为 QueueMessage 的参数访问队列消息。

Metadata

队列触发器提供了数个元数据属性。 这些属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。

以下属性是 CloudQueueMessage 类的成员。

属性 类型 说明
QueueTrigger string 队列有效负载(如果是有效的字符串)。 如果队列消息有效负载是字符串,则 QueueTrigger 包含的值与 function.jsonname 属性命名的变量的值相同。
DequeueCount int 此消息取消排队的次数。
ExpirationTime DateTimeOffset 消息过期的时间。
Id string 队列消息 ID。
InsertionTime DateTimeOffset 消息添加到队列的时间。
NextVisibleTime DateTimeOffset 消息下一次可见的时间。
PopReceipt string 消息的 pop 接收方。

连接

connection 属性是对环境配置的引用,它指定应用应如何连接到 Azure 队列。 它可能指定:

如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。

连接字符串

若要获取连接字符串,请执行管理存储帐户访问密钥中显示的步骤。

此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection 属性指定的值匹配。

如果应用设置名称以“AzureWebJobs”开始,则只能在此处指定该名称的余下部分。 例如,如果将 connection 设置为“MyStorage”,Functions 运行时将会查找名为“AzureWebJobsMyStorage”的应用设置。如果将 connection 留空,Functions 运行时将使用应用设置中名为 AzureWebJobsStorage 的默认存储连接字符串。

基于标识的连接

如果使用 5.x 版或更高版本的扩展,则无需将连接字符串与机密一起使用,而是可以使应用使用 Azure Active Directory 标识。 为此,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection 属性。

如果将 connection 设置为“AzureWebJobsStorage”,请参阅使用标识连接到主机存储。 对于所有其他连接,扩展需要以下属性:

属性 环境变量模板 说明 示例值
队列服务 URI <CONNECTION_NAME_PREFIX>__queueServiceUri1 要连接到的队列服务的数据平面 URI,使用 HTTPS 方案。 https://<storage_account_name>.queue.core.chinacloudapi.cn

1<CONNECTION_NAME_PREFIX>__serviceUri 可以用作别名。 如果同时提供了两个窗体,则将使用 queueServiceUri 窗体。 如果跨 Blob、队列和/或表使用总体连接配置,则无法使用 serviceUri 窗体。

可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性

在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credentialclientID 属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发

向标识授予权限

无论使用何种标识,都必须具有执行所需操作的权限。 需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色

重要

某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。

你将需要创建一个角色分配,以便在运行时提供对队列的访问权限。 所有者等管理角色还不够。 下表显示了在正常操作中使用队列存储扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。

绑定类型 内置角色示例
触发器 存储队列数据读取者存储队列数据消息处理者
输出绑定 存储队列数据参与者存储队列数据消息发送方

有害消息

队列触发函数失败时,Azure Functions 会针对给定的队列消息重试该函数最多 5 次(包括第一次尝试)。 如果 5 次尝试全部失败,函数运行时会将一则消息添加到名为 <originalqueuename>-poison 的队列。 可以编写一个函数来处理有害队列中的消息,并记录这些消息,或者发送需要注意的通知。

若要手动处理有害消息,请检查队列消息的 dequeueCount

速览锁定

速览锁定模式将在触发队列时自动发生。 当取消消息排队时,这些消息将被标记为不可见,并与存储服务管理的超时关联。

启用函数后,其便会在满足下列条件时开始处理消息。

  • 如果函数运行成功,则函数执行完成,消息被删除。
  • 如果函数运行失败,则重置消息可见性。 重置后,系统将在函数下次请求新消息时重新处理该消息。
  • 如果函数一直因故障问题未完成,则消息可见性将过期,而消息也将重新出现在队列中。

所有可见性机制都由存储服务而不是函数运行时处理。

轮询算法

队列触发器实现了随机指数退让算法,以降低空闲队列轮询对存储交易成本造成的影响。

该算法使用以下逻辑:

  • 当找到一条消息时,运行时会等待 100 毫秒,然后检查另一条消息
  • 如果未找到任何消息,它会等待大约 200 毫秒,然后重试。
  • 如果后续尝试获取队列消息失败,则等待时间会继续增加,直到达到最长等待时间(默认为 1 分钟)。
  • 可以通过 host.json 文件中的 maxPollingInterval 属性配置最大等待时间。

对于本地开发,最大轮询间隔默认为两秒。

注意

关于在消耗计划中托管函数应用的计费问题,我们不会针对你在运行时所花费的轮询时间收取费用。

并发

当有多个队列消息在等待时,队列触发器会检索一批消息并以并发方式调用函数实例来处理它们。 默认情况下,批大小为 16。 当处理的数量下降到 8 时,运行时可获取另一个批,并开始处理这些消息。 因此,单个虚拟机 (VM) 上每个函数处理的最大并发消息数是 24。 此限制分别应用于每个 VM 上各个队列触发的函数。 如果你的函数应用横向扩展到多个 VM,则每个 VM 将等待触发器并尝试运行函数。 例如,如果某个函数应用横向扩展到 3 个 VM,则每个队列触发的函数的默认最大并发实例数是 72。

可以在 host.json 文件中配置用于获取新批的批大小和阈值。 如果希望在函数应用中最大程度地降低查询触发的函数的并行执行,可以将批大小设置为 1。 只有当函数在单个虚拟机 (VM) 上运行时,此设置才可消除并发。

队列触发器会自动阻止函数同时多次处理队列消息。

host.json 属性

host.json 文件包含控制队列触发器行为的设置。 有关可用设置的详细信息,请参阅 host.json 设置部分。

后续步骤