Azure Functions 的 Azure 服务总线绑定

本文介绍如何在 Azure Functions 中使用 Azure 服务总线绑定。 Azure Functions 支持对服务总线队列和主题的触发器和输出绑定。

此参考信息面向 Azure Functions 开发人员。 Azure Functions 的新手请从以下资源入手:

包 - Functions 1.x

Microsoft.Azure.WebJobs.ServiceBus NuGet 包 2.x 版中提供了服务总线绑定。

下表说明了如何在每个开发环境中添加对此绑定的支持。

开发环境 添加支持
Functions 1.x
本地开发 - C# 类库 安装包
本地开发 - C# 脚本、JavaScript、F# 自动
门户开发 自动

包 - Functions 2.x

Microsoft.Azure.WebJobs.Extensions.ServiceBus NuGet 包 3.x 版中提供了服务总线绑定。 azure-webjobs-sdk GitHub 存储库中提供了此包的源代码。

下表说明了如何在每个开发环境中添加对此绑定的支持。

开发环境 添加支持
Functions 2.x
本地开发 - C# 类库 安装包
本地开发 - C# 脚本、JavaScript、F#、Java 和 Python 注册扩展
门户开发 添加输出绑定时安装

若要了解如何更新门户中的现有绑定扩展而不必重新发布函数应用项目,请参阅更新扩展

触发器

使用服务总线触发器响应来自服务总线队列或主题的消息。

触发器 - 示例

参阅语言特定的示例:

触发器 - C# 示例

以下示例演示了读取消息元数据并记录服务总线队列消息的 C# 函数

[FunctionName("ServiceBusQueueTriggerCSharp")]                    
public static void Run(
    [ServiceBusTrigger("myqueue", AccessRights.Manage, Connection = "ServiceBusConnection")] 
    string myQueueItem,
    Int32 deliveryCount,
    DateTime enqueuedTimeUtc,
    string messageId,
    ILogger log)
{
    log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
    log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
    log.LogInformation($"DeliveryCount={deliveryCount}");
    log.LogInformation($"MessageId={messageId}");
}

此示例适用于 Azure Functions 版本 1.x。 要使此代码适用于 2.x:

  • 省略访问权限参数
  • 将日志参数的类型从 TraceWriter 更改为 ILogger
  • log.Info 更改为 log.LogInformation

触发器 - C# 脚本示例

以下示例演示 function.json 文件中的一个服务总线触发器绑定以及使用该绑定的 C# 脚本函数。 此函数将读取消息元数据并记录服务总线队列消息。

下面是 function.json 文件中的绑定数据:

{
"bindings": [
    {
    "queueName": "testqueue",
    "connection": "MyServiceBusConnection",
    "name": "myQueueItem",
    "type": "serviceBusTrigger",
    "direction": "in"
    }
],
"disabled": false
}

C# 脚本代码如下所示:

using System;

public static void Run(string myQueueItem,
    Int32 deliveryCount,
    DateTime enqueuedTimeUtc,
    string messageId,
    TraceWriter log)
{
    log.Info($"C# ServiceBus queue trigger function processed message: {myQueueItem}");

    log.Info($"EnqueuedTimeUtc={enqueuedTimeUtc}");
    log.Info($"DeliveryCount={deliveryCount}");
    log.Info($"MessageId={messageId}");
}

触发器 - F# 示例

以下示例演示 function.json 文件中的一个服务总线触发器绑定以及使用该绑定的 F# 函数。 该函数记录服务总线队列消息。

下面是 function.json 文件中的绑定数据:

{
"bindings": [
    {
    "queueName": "testqueue",
    "connection": "MyServiceBusConnection",
    "name": "myQueueItem",
    "type": "serviceBusTrigger",
    "direction": "in"
    }
],
"disabled": false
}

F# 脚本代码如下所示:

let Run(myQueueItem: string, log: ILogger) =
    log.LogInformation(sprintf "F# ServiceBus queue trigger function processed message: %s" myQueueItem)

触发器 - JavaScript 示例

以下示例演示 function.json 文件中的一个服务总线触发器绑定以及使用该绑定的 JavaScript 函数。 此函数将读取消息元数据并记录服务总线队列消息。

下面是 function.json 文件中的绑定数据:

{
"bindings": [
    {
    "queueName": "testqueue",
    "connection": "MyServiceBusConnection",
    "name": "myQueueItem",
    "type": "serviceBusTrigger",
    "direction": "in"
    }
],
"disabled": false
}

JavaScript 脚本代码如下所示:

module.exports = function(context, myQueueItem) {
    context.log('Node.js ServiceBus queue trigger function processed message', myQueueItem);
    context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
    context.log('DeliveryCount =', context.bindingData.deliveryCount);
    context.log('MessageId =', context.bindingData.messageId);
    context.done();
};

触发器 - Java 示例

以下示例演示了 function.json 文件中的服务总线触发器绑定以及使用该绑定的 Java 函数。 该函数由置于服务总线队列上的消息触发,还会记录队列消息。

下面是 function.json 文件中的绑定数据:

{
"bindings": [
    {
    "queueName": "myqueuename",
    "connection": "MyServiceBusConnection",
    "name": "msg",
    "type": "ServiceBusQueueTrigger",
    "direction": "in"
    }
],
"disabled": false
}

下面是 Java 代码:

@FunctionName("sbprocessor")
 public void serviceBusProcess(
    @ServiceBusQueueTrigger(name = "msg",
                             queueName = "myqueuename",
                             connection = "myconnvarname") String message,
   final ExecutionContext context
 ) {
     context.getLogger().info(message);
 }

触发器 - 特性

C# 类库中,请使用以下属性来配置服务总线触发器:

  • ServiceBusTriggerAttribute

    该特性的构造函数采用队列或者主题和订阅的名称。 在 Azure Functions 版本 1.x 中,还可以指定连接的访问权限。 如果未指定访问权限,则默认为 Manage。 有关详细信息,请参阅触发器 - 配置部分。

    下面是一个示例,演示了用于字符串参数的属性:

    [FunctionName("ServiceBusQueueTriggerCSharp")]                    
    public static void Run(
        [ServiceBusTrigger("myqueue")] string myQueueItem, ILogger log)
    {
        ...
    }
    

    可以设置 Connection 属性来指定要使用的服务总线帐户,如以下示例中所示:

    [FunctionName("ServiceBusQueueTriggerCSharp")]                    
    public static void Run(
        [ServiceBusTrigger("myqueue", Connection = "ServiceBusConnection")] 
        string myQueueItem, ILogger log)
    {
        ...
    }
    

    有关完整示例,请参阅触发器 - C# 示例

  • ServiceBusAccountAttribute

    提供另一种方式来指定要使用的服务总线帐户。 构造函数采用包含服务总线连接字符串的应用设置的名称。 可以在参数、方法或类级别应用该特性。 以下示例演示类级别和方法级别:

    [ServiceBusAccount("ClassLevelServiceBusAppSetting")]
    public static class AzureFunctions
    {
        [ServiceBusAccount("MethodLevelServiceBusAppSetting")]
        [FunctionName("ServiceBusQueueTriggerCSharp")]
        public static void Run(
            [ServiceBusTrigger("myqueue", AccessRights.Manage)] 
            string myQueueItem, ILogger log)
    {
        ...
    }
    

要使用的服务总线帐户按以下顺序确定:

  • ServiceBusTrigger 特性的 Connection 属性。
  • 作为 ServiceBusTrigger 特性应用到同一参数的 ServiceBusAccount 特性。
  • 应用到函数的 ServiceBusAccount 特性。
  • 应用到类的 ServiceBusAccount 特性。
  • “AzureWebJobsServiceBus”应用设置。

触发器 - 配置

下表解释了在 function.json 文件和 ServiceBusTrigger 特性中设置的绑定配置属性。

function.json 属性 Attribute 属性 说明
类型 不适用 必须设置为“serviceBusTrigger”。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 不适用 必须设置为“in”。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 不适用 变量的名称,表示函数代码中的队列或主题消息。 设置为“$return”可引用函数返回值。
queueName QueueName 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。
topicName TopicName 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。
subscriptionName SubscriptionName 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。
连接 Connection 应用设置的名称,包含要用于此绑定的服务总线连接字符串。 如果应用设置名称以“AzureWebJobs”开头,则只能指定该名称的余下部分。 例如,如果将 connection 设置为“MyServiceBus”,函数运行时将会查找名为“AzureWebJobsMyServiceBus”的应用设置。 如果将 connection 留空,函数运行时将使用名为“AzureWebJobsServiceBus”的应用设置中的默认服务总线连接字符串。

若要获取连接字符串,请执行获取管理凭据中显示的步骤。 必须是服务总线命名空间的连接字符串,不限于特定的队列或主题。
accessRights Access 连接字符串的访问权限。 可用值为 managelisten。 默认值是 manage,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 accessRights 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 中,此属性不可用,因为存储 SDK 不支持管理操作。

在本地进行开发时,应用设置将取 local.settings.json 文件的值。

触发器 - 用法

在 C# 和 C# 脚本中,可以为队列或主题消息使用以下参数类型:

  • string - 如果消息是文本。
  • byte[] - 适用于二进制数据。
  • 自定义类型 - 如果消息包含 JSON,Azure Functions 会尝试反序列化 JSON 数据。
  • BrokeredMessage - 提供带 BrokeredMessage.GetBody() 方法的反序列化消息。

这些参数仅适用于 Azure Functions 版本 1.x;对于 2.x,请使用 Message 而非 BrokeredMessage

在 JavaScript 中通过 context.bindings.<name from function.json> 访问队列或主题消息。 服务总线消息作为字符串或 JSON 对象传递到函数中。

触发器 - 有害消息

无法在 Azure Functions 中控制或配置有害消息处理。 服务总线处理有害消息本身。

触发器 - PeekLock 行为

Functions 运行时以 PeekLock 模式接收消息。 如果函数成功完成,则对此消息调用 Complete;如果函数失败,则调用 Abandon。 如果函数的运行时间长于 PeekLock 超时时间,则只要该函数正在运行,就会自动续订锁定。

maxAutoRenewDuration 可在 host.json 中配置,它映射到 OnMessageOptions.MaxAutoRenewDuration。 根据服务总线文档,此设置所允许的最长时间为 5 分钟,而你可以将 Functions 时间限制从默认值 5 分钟增加到 10 分钟。 对于服务总线函数,你不会那么做,因为会超出服务总线续订限制。

触发器 - 消息元数据

服务总线触发器提供了几个元数据属性。 这些属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。 以下是 BrokeredMessage 类的属性。

属性 类型 说明
DeliveryCount Int32 传递次数。
DeadLetterSource string 死信源。
ExpiresAtUtc DateTime 到期时间 (UTC)。
EnqueuedTimeUtc DateTime 排队时间 (UTC)。
MessageId string 服务总线可用于标识重复消息的用户定义值(如果启用)。
ContentType string 发送方和接收方用于实现应用程序特定逻辑的内容类型标识符。
ReplyTo string 对队列地址的回复。
SequenceNumber Int64 服务总线分配给消息的唯一编号。
To string 发送到地址。
Label string 应用程序特定标签。
CorrelationId string 相关 ID。
UserProperties IDictionary<String,Object> 应用程序特定的消息属性。

请参阅在本文的前面部分使用这些属性的代码示例

触发器 - host.json 属性

host.json 文件包含控制服务总线触发器行为的设置。

{
    "serviceBus": {
      "maxConcurrentCalls": 16,
      "prefetchCount": 100,
      "autoRenewTimeout": "00:05:00"
    }
}
属性 默认 说明
maxConcurrentCalls 16 消息泵应该对回调发起的最大并发调用数。 默认情况下,Functions 运行时同时处理多条消息。 若要指示运行时一次只处理单个队列或主题消息,请将 maxConcurrentCalls 设置为 1。
prefetchCount 不适用 基础 MessageReceiver 将要使用的默认 PrefetchCount。
autoRenewTimeout 00:05:00 自动续订消息锁的最长持续时间。

输出

使用 Azure 服务总线输出绑定发送队列或主题消息。

输出 - 示例

参阅语言特定的示例:

输出 - C# 示例

以下示例演示发送服务总线队列消息的 C# 函数

[FunctionName("ServiceBusOutput")]
[return: ServiceBus("myqueue", Connection = "ServiceBusConnection")]
public static string ServiceBusOutput([HttpTrigger] dynamic input, ILogger log)
{
    log.LogInformation($"C# function processed: {input.Text}");
    return input.Text;
}

输出 - C# 脚本示例

以下示例演示 function.json 文件中的一个服务总线输出绑定以及使用该绑定的 C# 脚本函数。 该函数使用计时器触发器,每隔 15 秒发送一条队列消息。

下面是 function.json 文件中的绑定数据:

{
    "bindings": [
        {
            "schedule": "0/15 * * * * *",
            "name": "myTimer",
            "runsOnStartup": true,
            "type": "timerTrigger",
            "direction": "in"
        },
        {
            "name": "outputSbQueue",
            "type": "serviceBus",
            "queueName": "testqueue",
            "connection": "MyServiceBusConnection",
            "direction": "out"
        }
    ],
    "disabled": false
}

下面是可创建一条消息的 C# 脚本代码:

public static void Run(TimerInfo myTimer, ILogger log, out string outputSbQueue)
{
    string message = $"Service Bus queue message created at: {DateTime.Now}";
    log.LogInformation(message); 
    outputSbQueue = message;
}

下面是可创建多条消息的 C# 脚本代码:

public static void Run(TimerInfo myTimer, ILogger log, ICollector<string> outputSbQueue)
{
    string message = $"Service Bus queue messages created at: {DateTime.Now}";
    log.LogInformation(message); 
    outputSbQueue.Add("1 " + message);
    outputSbQueue.Add("2 " + message);
}

输出 - F# 示例

以下示例演示 function.json 文件中的一个服务总线输出绑定以及使用该绑定的 F# 脚本函数。 该函数使用计时器触发器,每隔 15 秒发送一条队列消息。

下面是 function.json 文件中的绑定数据:

{
    "bindings": [
        {
            "schedule": "0/15 * * * * *",
            "name": "myTimer",
            "runsOnStartup": true,
            "type": "timerTrigger",
            "direction": "in"
        },
        {
            "name": "outputSbQueue",
            "type": "serviceBus",
            "queueName": "testqueue",
            "connection": "MyServiceBusConnection",
            "direction": "out"
        }
    ],
    "disabled": false
}

下面是可创建一条消息的 F# 脚本代码:

let Run(myTimer: TimerInfo, log: ILogger, outputSbQueue: byref<string>) =
    let message = sprintf "Service Bus queue message created at: %s" (DateTime.Now.ToString())
    log.LogInformation(message)
    outputSbQueue = message

输出 - JavaScript 示例

以下示例演示 function.json 文件中的一个服务总线输出绑定以及使用该绑定的 JavaScript 函数。 该函数使用计时器触发器,每隔 15 秒发送一条队列消息。

下面是 function.json 文件中的绑定数据:

{
    "bindings": [
        {
            "schedule": "0/15 * * * * *",
            "name": "myTimer",
            "runsOnStartup": true,
            "type": "timerTrigger",
            "direction": "in"
        },
        {
            "name": "outputSbQueue",
            "type": "serviceBus",
            "queueName": "testqueue",
            "connection": "MyServiceBusConnection",
            "direction": "out"
        }
    ],
    "disabled": false
}

下面是可创建一条消息的 JavaScript 脚本代码:

module.exports = function (context, myTimer) {
    var message = 'Service Bus queue message created at ' + timeStamp;
    context.log(message);   
    context.bindings.outputSbQueue = message;
    context.done();
};

下面是可创建多条消息的 JavaScript 脚本代码:

module.exports = function (context, myTimer) {
    var message = 'Service Bus queue message created at ' + timeStamp;
    context.log(message);   
    context.bindings.outputSbQueue = [];
    context.bindings.outputSbQueue.push("1 " + message);
    context.bindings.outputSbQueue.push("2 " + message);
    context.done();
};

输出 - Java 示例

以下示例演示了一个 Java 函数,该函数在由 HTTP 请求触发时将消息发送到服务总线队列 myqueue

@FunctionName("httpToServiceBusQueue")
@ServiceBusQueueOutput(name = "message", queueName = "myqueue", connection = "AzureServiceBusConnection")
public String pushToQueue(
  @HttpTrigger(name = "request", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
  final String message,
  @HttpOutput(name = "response") final OutputBinding<T> result ) {
      result.setValue(message + " has been sent.");
      return message;
 }

Java 函数运行时库中,对其值将写入服务总线队列的函数参数使用 @QueueOutput 注释。 参数类型应为 OutputBinding<T>,其中 T 是 POJO 的任何本机 Java 类型。

输出 - 特性

C# 类库中,使用 ServiceBusAttribute

该特性的构造函数采用队列或者主题和订阅的名称。 也可指定连接的访问权限。 输出 - 配置部分介绍了如何选择访问权限设置。 下面是一个示例,说明了应用于该函数的返回值的属性:

[FunctionName("ServiceBusOutput")]
[return: ServiceBus("myqueue")]
public static string Run([HttpTrigger] dynamic input, ILogger log)
{
    ...
}

可以设置 Connection 属性来指定要使用的服务总线帐户,如以下示例中所示:

[FunctionName("ServiceBusOutput")]
[return: ServiceBus("myqueue", Connection = "ServiceBusConnection")]
public static string Run([HttpTrigger] dynamic input, ILogger log)
{
    ...
}

有关完整示例,请参阅输出 - C# 示例

可以使用 ServiceBusAccount 特性在类、方法或参数级别指定要使用的服务总线帐户。 有关详细信息,请参阅触发器 - 特性

输出 - 配置

下表解释了在 function.json 文件和 ServiceBus 特性中设置的绑定配置属性。

function.json 属性 Attribute 属性 说明
类型 不适用 必须设置为“serviceBus”。 在 Azure 门户中创建触发器时,会自动设置此属性。
direction 不适用 必须设置为“out”。 在 Azure 门户中创建触发器时,会自动设置此属性。
name 不适用 变量的名称,表示函数代码中的队列或主题。 设置为“$return”可引用函数返回值。
queueName QueueName 队列名称。 仅在发送队列消息的情况下设置,不为主题设置。
topicName TopicName 要监视的主题的名称。 仅在发送主题消息的情况下设置,不为队列设置。
连接 Connection 应用设置的名称,包含要用于此绑定的服务总线连接字符串。 如果应用设置名称以“AzureWebJobs”开头,则只能指定该名称的余下部分。 例如,如果将 connection 设置为“MyServiceBus”,函数运行时将会查找名为“AzureWebJobsMyServiceBus”的应用设置。 如果将 connection 留空,函数运行时将使用名为“AzureWebJobsServiceBus”的应用设置中的默认服务总线连接字符串。

若要获取连接字符串,请执行获取管理凭据中显示的步骤。 必须是服务总线命名空间的连接字符串,不限于特定的队列或主题。
accessRights Access 连接字符串的访问权限。 可用值为 managelisten。 默认值是 manage,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 accessRights 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 中,此属性不可用,因为存储 SDK 不支持管理操作。

在本地进行开发时,应用设置将取 local.settings.json 文件的值。

输出 - 用法

在 Azure Functions 1.x 中,如果队列尚不存在并且 accessRights 设置为 manage,则运行时会创建队列。 在 Functions 版本 2.x 中,队列或主题必须已存在,如果指定了不存在的队列或主题,则函数将失败。

在 C# 和 C# 脚本中,可以为输出绑定使用以下参数类型:

  • out T paramName - T 可以是任何可 JSON 序列化的类型。 如果函数退出时参数值为 null,Functions 将创建具有 null 对象的消息。
  • out string - 如果函数退出时参数值为 null,Functions 不创建消息。
  • out byte[] - 如果函数退出时参数值为 null,Functions 不创建消息。
  • out BrokeredMessage - 如果函数退出时参数值为 null,Functions 不创建消息。
  • ICollector<T>IAsyncCollector<T> - 用于创建多条消息。 调用 Add 方法时创建了一条消息。

在异步函数中,请使用返回值或 IAsyncCollector 而非 out 参数。

这些参数仅适用于 Azure Functions 版本 1.x;对于 2.x,请使用 Message 而非 BrokeredMessage

在 JavaScript 中通过 context.bindings.<name from function.json> 访问队列或主题。 可以向 context.binding.<name> 分配一个字符串、字节数组或 Javascript 对象(反序列化为 JSON)。

异常和返回代码

绑定 参考
服务总线 服务总线错误代码
服务总线 服务总线限制

host.json 设置

本部分介绍版本 2.x 中可用于此绑定的全局配置设置。 下面的示例 host.json 文件仅包含此绑定的 2.x 版本设置。 有关版本 2.x 中的全局配置设置的详细信息,请参阅 Azure Functions 版本 2.x 的 host.json 参考

Note

有关 Functions 1.x 中 host.json 的参考,请参阅 Azure Functions 1.x 的 host.json 参考

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "prefetchCount": 100,
            "messageHandlerOptions": {
                "autoComplete": false,
                "maxConcurrentCalls": 32,
                "maxAutoRenewDuration": "00:55:00"
            }
        }
    }
}
属性 默认 说明
maxAutoRenewDuration 00:05:00 自动续订消息锁的最长持续时间。
autoComplete false 触发器应立即标记为已完成(自动完成),还是等待调用完成的处理。
maxConcurrentCalls 16 消息泵应该对回调发起的最大并发调用数。 默认情况下,Functions 运行时同时处理多条消息。 若要指示运行时一次只处理单个队列或主题消息,请将 maxConcurrentCalls 设置为 1。
prefetchCount 不适用 基础 MessageReceiver 将要使用的默认 PrefetchCount。

后续步骤