Azure Functions 的 Azure 队列存储绑定

本文介绍如何在 Azure Functions 中使用 Azure 队列存储绑定。 Azure Functions 支持对队列使用触发器和输出绑定。

此参考信息面向 Azure Functions 开发人员。 Azure Functions 的新手请从以下资源入手:

包 - Functions 1.x

Microsoft.Azure.WebJobs NuGet 包 2.x 版中提供了队列存储绑定。 azure-webjobs-sdk GitHub 存储库中提供了此包的源代码。

Functions 2.x 中的绑定扩展

对于 Azure Functions 2.x 版中的本地开发,该包自动注册为绑定扩展

Functions 1.x 中的 Azure 存储 SDK 版本

在 Functions 1.x 中,存储触发器和绑定使用 7.2.1 版的 Azure 存储 SDK(WindowsAzure.Storage NuGet 包)。 如果引用另一版本的存储 SDK,而且在函数签名中绑定到某个存储 SDK 类型,则 Functions 运行时可能会报告它不能绑定到该类型。 此解决方案是为了确保项目引用 WindowsAzure.Storage 7.2.1

包 - Functions 2.x

Microsoft.Azure.WebJobs.Extensions.Storage NuGet 包 3.x 版中提供了队列存储绑定。 azure-webjobs-sdk GitHub 存储库中提供了此包的源代码。

下表说明了如何在每个开发环境中添加对此绑定的支持。

开发环境 添加支持
Functions 2.x
本地开发 - C# 类库 安装包
本地开发 - C# 脚本、JavaScript、F#、Java 和 Python 注册扩展
门户开发 添加输出绑定时安装

若要了解如何更新门户中的现有绑定扩展而不必重新发布函数应用项目,请参阅更新扩展

编码

函数需要 base64 编码字符串。 对编码类型进行的任何调整(若要将数据作为 base64 编码字符串进行准备)需要在调用服务中实现。

触发器

在队列中收到新项时,可以使用队列触发器启动一个函数。 队列消息作为输入提供给该函数。

触发器 - 示例

参阅语言特定的示例:

触发器 - C# 示例

以下示例演示可在每次处理某个队列项之后轮询 myqueue-items 队列并写入日志的 C# 函数

public static class QueueFunctions
{
    [FunctionName("QueueTrigger")]
    public static void QueueTrigger(
        [QueueTrigger("myqueue-items")] string myQueueItem, 
        ILogger log)
    {
        log.LogInformation($"C# function processed: {myQueueItem}");
    }
}

触发器 - C# 脚本示例

以下示例演示 function.json 文件中的一个队列触发器绑定以及使用该绑定的 C# 脚本 (.csx) 代码。 每次处理某个队列项之后,该函数会轮询 myqueue-items 队列并写入日志。

function.json 文件如下所示:

{
    "disabled": false,
    "bindings": [
        {
            "type": "queueTrigger",
            "direction": "in",
            "name": "myQueueItem",
            "queueName": "myqueue-items",
            "connection":"MyStorageConnectionAppSetting"
        }
    ]
}

配置部分解释了这些属性。

C# 脚本代码如下所示:

#r "Microsoft.WindowsAzure.Storage"

using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;
using System;

public static void Run(CloudQueueMessage myQueueItem, 
    DateTimeOffset expirationTime, 
    DateTimeOffset insertionTime, 
    DateTimeOffset nextVisibleTime,
    string queueTrigger,
    string id,
    string popReceipt,
    int dequeueCount,
    ILogger log)
{
    log.LogInformation($"C# Queue trigger function processed: {myQueueItem.AsString}\n" +
        $"queueTrigger={queueTrigger}\n" +
        $"expirationTime={expirationTime}\n" +
        $"insertionTime={insertionTime}\n" +
        $"nextVisibleTime={nextVisibleTime}\n" +
        $"id={id}\n" +
        $"popReceipt={popReceipt}\n" + 
        $"dequeueCount={dequeueCount}");
}

用法部分解释了 function.json 中的 name 属性命名的 myQueueItem消息元数据部分解释了所有其他所示变量。

触发器 - JavaScript 示例

以下示例演示 function.json 文件中的一个队列触发器绑定以及使用该绑定的 JavaScript 函数。 每次处理某个队列项之后,该函数会轮询 myqueue-items 队列并写入日志。

function.json 文件如下所示:

{
    "disabled": false,
    "bindings": [
        {
            "type": "queueTrigger",
            "direction": "in",
            "name": "myQueueItem",
            "queueName": "myqueue-items",
            "connection":"MyStorageConnectionAppSetting"
        }
    ]
}

配置部分解释了这些属性。

Note

name 参数在 JavaScript 代码中反映为 context.bindings.<name>,其中包含队列项有效负载。 此有效负载也作为第二个参数传递给函数。

JavaScript 代码如下所示:

module.exports = async function (context, message) {
    context.log('Node.js queue trigger function processed work item', message);
    // OR access using context.bindings.<name>
    // context.log('Node.js queue trigger function processed work item', context.bindings.myQueueItem);
    context.log('queueTrigger =', context.bindingData.queueTrigger);
    context.log('expirationTime =', context.bindingData.expirationTime);
    context.log('insertionTime =', context.bindingData.insertionTime);
    context.log('nextVisibleTime =', context.bindingData.nextVisibleTime);
    context.log('id =', context.bindingData.id);
    context.log('popReceipt =', context.bindingData.popReceipt);
    context.log('dequeueCount =', context.bindingData.dequeueCount);
    context.done();
};

用法部分解释了 function.json 中的 name 属性命名的 myQueueItem消息元数据部分解释了所有其他所示变量。

触发器 - Java 示例

以下 Java 示例演示了一个存储队列触发器函数,该函数记录放置到队列 myqueuename 中的已触发消息。

@FunctionName("queueprocessor")
public void run(
   @QueueTrigger(name = "msg",
                  queueName = "myqueuename",
                  connection = "myconnvarname") String message,
    final ExecutionContext context
) {
    context.getLogger().info(message);
}

触发器 - 特性

C# 类库,请使用以下属性来配置队列触发器:

  • QueueTriggerAttribute

    该特性的构造函数采用要监视的队列的名称,如以下示例中所示:

    [FunctionName("QueueTrigger")]
    public static void Run(
        [QueueTrigger("myqueue-items")] string myQueueItem, 
        ILogger log)
    {
        ...
    }
    

    可以设置 Connection 属性来指定要使用的存储帐户,如以下示例中所示:

    [FunctionName("QueueTrigger")]
    public static void Run(
        [QueueTrigger("myqueue-items", Connection = "StorageConnectionAppSetting")] string myQueueItem, 
        ILogger log)
    {
        ....
    }
    

    有关完整示例,请参阅触发器 - C# 示例

  • StorageAccountAttribute

    提供另一种方式来指定要使用的存储帐户。 构造函数采用包含存储连接字符串的应用设置的名称。 可以在参数、方法或类级别应用该特性。 以下示例演示类级别和方法级别:

    [StorageAccount("ClassLevelStorageAppSetting")]
    public static class AzureFunctions
    {
        [FunctionName("QueueTrigger")]
        [StorageAccount("FunctionLevelStorageAppSetting")]
        public static void Run( //...
    {
        ...
    }
    

要使用的存储帐户按以下顺序确定:

  • QueueTrigger 特性的 Connection 属性。
  • 作为 QueueTrigger 特性应用到同一参数的 StorageAccount 特性。
  • 应用到函数的 StorageAccount 特性。
  • 应用到类的 StorageAccount 特性。
  • “AzureWebJobsStorage”应用设置。

触发器 - 配置

下表解释了在 function.json 文件和 QueueTrigger 特性中设置的绑定配置属性。

function.json 属性 Attribute 属性 说明
类型 不适用 必须设置为 queueTrigger。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 不适用 只能在 function.json 文件中设置。 必须设置为 in。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 不适用 函数代码中包含队列项有效负载的变量的名称。
queueName QueueName 要轮询的队列的名称。
连接 Connection 包含要用于此绑定的存储连接字符串的应用设置的名称。 如果应用设置名称以“AzureWebJobs”开始,则只能在此处指定该名称的余下部分。 例如,如果将 connection 设置为“MyStorage”,函数运行时将会查找名为“AzureWebJobsMyStorage”的应用设置。 如果将 connection 留空,函数运行时将使用名为 AzureWebJobsStorage 的应用设置中的默认存储连接字符串。

在本地进行开发时,应用设置将取 local.settings.json 文件的值。

触发器 - 用法

在 C# 和 C# 脚本中,可以使用 string paramName 等方法参数访问消息数据。 在 C# 脚本中,paramName 是在 function.jsonname 属性中指定的值。 可以绑定到以下任何类型:

  • 对象 - Functions 运行时将 JSON 负载反序列化为代码中定义的任意类的实例。
  • string
  • byte[]
  • CloudQueueMessage

如果在尝试绑定到 CloudQueueMessage 时出现错误消息,请确保引用正确的存储 SDK 版本

在 JavaScript 中,可以使用 context.bindings.<name> 访问队列项有效负载。 如果有效负载为 JSON,则会将它反序列化为对象。

触发器 - 消息元数据

队列触发器提供了数个元数据属性。 这些属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。 以下是 CloudQueueMessage 类的属性。

属性 类型 说明
QueueTrigger string 队列有效负载(如果是有效的字符串)。 如果队列消息有效负载是字符串,则 QueueTrigger 包含的值与 function.jsonname 属性命名的变量的值相同。
DequeueCount int 此消息取消排队的次数。
ExpirationTime DateTimeOffset 消息过期的时间。
Id string 队列消息 ID。
InsertionTime DateTimeOffset 消息添加到队列的时间。
NextVisibleTime DateTimeOffset 消息下一次可见的时间。
PopReceipt string 消息的 pop 接收方。

触发器 - 有害消息

队列触发函数失败时,Azure Functions 会针对给定的队列消息重试该函数最多 5 次(包括第一次尝试)。 如果 5 次尝试全部失败,函数运行时会将一条消息添加到名为 <originalqueuename>-poison 的队列。 可以编写一个函数来处理有害队列中的消息,并记录这些消息,或者发送需要注意的通知。

若要手动处理有害消息,请检查队列消息的 dequeueCount

触发器 - 轮询算法

队列触发器实现了随机指数退让算法,以降低空闲队列轮询对存储交易成本造成的影响。 当找到消息时,运行时将等待两秒钟,然后检查另一条消息;如果未找到消息,它将等待大约四秒,然后重试。 如果后续尝试获取队列消息失败,则等待时间会继续增加,直到达到最长等待时间(默认为 1 分钟)。 可以通过 host.json 文件中的 maxPollingInterval 属性配置最大等待时间。

触发器 - 并发

当有多个队列消息在等待时,队列触发器会检索一批消息并以并发方式调用函数实例来处理它们。 默认情况下,批大小为 16。 当处理的数量下降到 8 时,运行时可获取另一个批,并开始处理这些消息。 因此,单个虚拟机 (VM) 上每个函数处理的最大并发消息数是 24。 此限制分别应用于每个 VM 上各个队列触发的函数。 如果你的函数应用横向扩展到多个 VM,则每个 VM 将等待触发器并尝试运行函数。 例如,如果某个函数应用横向扩展到 3 个 VM,则每个队列触发的函数的默认最大并发实例数是 72。

可以在 host.json 文件中配置用于获取新批的批大小和阈值。 如果希望在函数应用中最大程度地降低查询触发的函数的并行执行,可以将批大小设置为 1。 只有当函数在单个虚拟机 (VM) 上运行时,此设置才可消除并发。

队列触发器会自动阻止函数多次处理队列消息;无需将函数编写为幂等函数。

触发器 - host.json 属性

host.json 文件包含控制队列触发器行为的设置。

{
    "queues": {
      "maxPollingInterval": 2000,
      "visibilityTimeout" : "00:00:30",
      "batchSize": 16,
      "maxDequeueCount": 5,
      "newBatchThreshold": 8
    }
}
属性 默认 说明
maxPollingInterval 60000 队列轮询的最大间隔时间,以毫秒为单位。
visibilityTimeout 0 消息处理失败时的重试间隔时间。
batchSize 16 Functions 运行时同时检索并并行处理的队列消息数。 当处理的数量下降到 newBatchThreshold 时,运行时可获取另一个批,并开始处理这些消息。 因此,每个函数处理的最大并发消息数是 batchSize 加上 newBatchThreshold。 此限制分别应用于各个队列触发的函数。

如果要避免对队列上收到的消息并行执行,可以将 batchSize 设置为 1。 但是,只有在函数于单个虚拟机 (VM) 上运行时,此设置才可消除并发。 如果函数应用横向扩展到多个 VM,每个 VM 可运行每个队列触发的函数的一个实例。

batchSize 的最大值为 32。
maxDequeueCount 5 在将某个消息移到有害队列之前,尝试处理该消息的次数。
newBatchThreshold batchSize/2 只要同时处理的消息数下降到此数值,运行时即检索另一个批次。

输出

使用 Azure 队列存储输出绑定可将消息写入队列。

输出 - 示例

参阅语言特定的示例:

输出 - C# 示例

以下示例演示针对收到的每个 HTTP 请求创建队列消息的C# 函数

[StorageAccount("AzureWebJobsStorage")]
public static class QueueFunctions
{
    [FunctionName("QueueOutput")]
    [return: Queue("myqueue-items")]
    public static string QueueOutput([HttpTrigger] dynamic input,  ILogger log)
    {
        log.LogInformation($"C# function processed: {input.Text}");
        return input.Text;
    }
}

输出 - C# 脚本示例

以下示例演示 function.json 文件中的一个 HTTP 触发器绑定以及使用该绑定的 C# 脚本 (.csx) 代码。 该函数针对收到的每个 HTTP 请求创建一个包含 CustomQueueMessage 对象有效负载的队列项。

function.json 文件如下所示:

{
  "bindings": [
    {
      "type": "httpTrigger",
      "direction": "in",
      "authLevel": "function",
      "name": "input"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "return"
    },
    {
      "type": "queue",
      "direction": "out",
      "name": "$return",
      "queueName": "outqueue",
      "connection": "MyStorageConnectionAppSetting",
    }
  ]
}

配置部分解释了这些属性。

下面是可创建一条队列消息的 C# 脚本代码:

public class CustomQueueMessage
{
    public string PersonName { get; set; }
    public string Title { get; set; }
}

public static CustomQueueMessage Run(CustomQueueMessage input, ILogger log)
{
    return input;
}

可以使用 ICollectorIAsyncCollector 参数一次性发送多条消息。 以下 C# 脚本代码发送多条消息,其中一条消息包含 HTTP 请求数据,另一条消息包含硬编码值:

public static void Run(
    CustomQueueMessage input, 
    ICollector<CustomQueueMessage> myQueueItems, 
    ILogger log)
{
    myQueueItems.Add(input);
    myQueueItems.Add(new CustomQueueMessage { PersonName = "You", Title = "None" });
}

输出 - JavaScript 示例

以下示例演示 function.json 文件中的一个 HTTP 触发器绑定以及使用该绑定的 JavaScript 函数。 该函数针对收到的每个 HTTP 请求创建一个队列项。

function.json 文件如下所示:

{
  "bindings": [
    {
      "type": "httpTrigger",
      "direction": "in",
      "authLevel": "function",
      "name": "input"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "return"
    },
    {
      "type": "queue",
      "direction": "out",
      "name": "$return",
      "queueName": "outqueue",
      "connection": "MyStorageConnectionAppSetting",
    }
  ]
}

配置部分解释了这些属性。

JavaScript 代码如下所示:

module.exports = function (context, input) {
    context.done(null, input.body);
};

可以通过定义 myQueueItem 输出绑定的消息数组,一次性发送多条消息。 以下 JavaScript 代码针对收到的每个 HTTP 请求发送两条包含硬编码值的队列消息。

module.exports = function(context) {
    context.bindings.myQueueItem = ["message 1","message 2"];
    context.done();
};

输出 - Java 示例

以下示例演示一个 Java 函数,该函数在受到 HTTP 请求触发时创建一个队列消息。

@FunctionName("httpToQueue")
@QueueOutput(name = "item", queueName = "myqueue-items", connection = "AzureWebJobsStorage")
 public String pushToQueue(
     @HttpTrigger(name = "request", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
     final String message,
     @HttpOutput(name = "response") final OutputBinding&lt;String&gt; result) {
       result.setValue(message + " has been added.");
       return message;
 }

Java 函数运行时库中,对其值将写入队列存储的参数使用 @QueueOutput 注释。 参数类型应为 OutputBinding<T>,其中 T 是 POJO 的任何本机 Java 类型。

输出 - 特性

C# 类库中,使用 QueueAttribute。

该特性将应用到 out 参数,或应用到函数的返回值。 该特性的构造函数采用队列的名称,如以下示例中所示:

[FunctionName("QueueOutput")]
[return: Queue("myqueue-items")]
public static string Run([HttpTrigger] dynamic input,  ILogger log)
{
    ...
}

可以设置 Connection 属性来指定要使用的存储帐户,如以下示例中所示:

[FunctionName("QueueOutput")]
[return: Queue("myqueue-items", Connection = "StorageConnectionAppSetting")]
public static string Run([HttpTrigger] dynamic input,  ILogger log)
{
    ...
}

有关完整示例,请参阅输出 - C# 示例

可以使用 StorageAccount 特性在类、方法或参数级别指定存储帐户。 有关详细信息,请参阅“触发器 - 特性”。

输出 - 配置

下表解释了在 function.json 文件和 Queue 特性中设置的绑定配置属性。

function.json 属性 Attribute 属性 说明
类型 不适用 必须设置为 queue。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 不适用 必须设置为 out。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 不适用 表示函数代码中的队列的变量的名称。 设置为 $return 可引用函数返回值。
queueName QueueName 队列的名称。
连接 Connection 包含要用于此绑定的存储连接字符串的应用设置的名称。 如果应用设置名称以“AzureWebJobs”开始,则只能在此处指定该名称的余下部分。 例如,如果将 connection 设置为“MyStorage”,函数运行时将会查找名为“AzureWebJobsMyStorage”的应用设置。 如果将 connection 留空,函数运行时将使用名为 AzureWebJobsStorage 的应用设置中的默认存储连接字符串。

在本地进行开发时,应用设置将取 local.settings.json 文件的值。

输出 - 用法

在 C# 和 C# 脚本中,可以使用 out T paramName 等方法参数编写一条队列消息。 在 C# 脚本中,paramName 是在 function.jsonname 属性中指定的值。 可以使用方法返回类型而不使用 out 参数,T 可为以下任何类型:

如果在尝试绑定到 CloudQueueMessage 时出现错误消息,请确保引用正确的存储 SDK 版本

在 C# 和 C# 脚本中,可使用以下类型之一编写多条队列消息:

在 JavaScript 函数中,可以使用 context.bindings.<name> 访问输出队列消息。 可对队列项有效负载使用字符串或 JSON 可序列化对象。

异常和返回代码

绑定 参考
队列 队列错误代码
Blob、表、队列 存储错误代码
Blob、表、队列 故障排除

host.json 设置

本部分介绍版本 2.x 中可用于此绑定的全局配置设置。 下面的示例 host.json 文件仅包含此绑定的 2.x 版本设置。 有关版本 2.x 中的全局配置设置的详细信息,请参阅 Azure Functions 版本 2.x 的 host.json 参考

Note

有关 Functions 1.x 中 host.json 的参考,请参阅 Azure Functions 1.x 的 host.json 参考

{
    "version": "2.0",
    "extensions": {
        "queues": {
            "maxPollingInterval": "00:00:02",
            "visibilityTimeout" : "00:00:30",
            "batchSize": 16,
            "maxDequeueCount": 5,
            "newBatchThreshold": 8
        }
    }
}
属性 默认 说明
maxPollingInterval 00:00:02 队列轮询的最大间隔时间。 最小值为 00:00:00.100(100 毫秒)。
visibilityTimeout 00:00:00 消息处理失败时的重试间隔时间。
batchSize 16 Functions 运行时同时检索并并行处理的队列消息数。 当处理的数量下降到 newBatchThreshold 时,运行时可获取另一个批,并开始处理这些消息。 因此,每个函数处理的最大并发消息数是 batchSize 加上 newBatchThreshold。 此限制分别应用于各个队列触发的函数。

如果要避免对队列上收到的消息并行执行,可以将 batchSize 设置为 1。 但是,只有在函数于单个虚拟机 (VM) 上运行时,此设置才可消除并发。 如果函数应用横向扩展到多个 VM,每个 VM 可运行每个队列触发的函数的一个实例。

batchSize 的最大值为 32。
maxDequeueCount 5 在将某个消息移到有害队列之前,尝试处理该消息的次数。
newBatchThreshold batchSize/2 只要同时处理的消息数下降到此数值,运行时即检索另一个批次。

后续步骤