Azure Functions 的 Azure 服务总线触发器
使用服务总线触发器响应来自服务总线队列或主题的消息。 从扩展版本 3.1.0 开始,可以在启用会话的队列或主题上触发。
有关设置和配置详细信息,请参阅概述。
消耗计划和高级计划的服务总线缩放决策取决于基于目标的缩放。 有关详细信息,请参阅基于目标的缩放。
重要
本文使用选项卡来支持多个版本的 Node.js 编程模型。 v4 模型目前处于预览状态,旨在为 JavaScript 和 TypeScript 开发人员提供更为灵活和直观的体验。 在升级指南中详细了解 v3 和 v4 之间的差异。
Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。
使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南。
本文同时支持两个编程模型。
示例
可以使用以下 C# 模式之一创建 C# 函数:
- 进程内类库:编译的 C# 函数,该函数在与 Functions 运行时相同的进程中运行。
- 独立进程类库:编译的 C# 函数,该函数在独立于运行时的进程中运行。 支持在 .NET 5.0 上运行的 C# 函数需要独立的进程。
- C# 脚本:主要在 Azure 门户中创建 C# 函数时使用。
重要
对进程内模型的支持将于 2026 年 11 月 10 日结束。 为获得完全支持,强烈建议将应用迁移到独立工作模型。
此代码定义并初始化 ILogger
:
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp
{
/// <summary>
/// Samples demonstrating binding to the <see cref="ServiceBusReceivedMessage"/> type.
/// </summary>
public class ServiceBusReceivedMessageFunctions
{
//<docsnippet_servicebusmessage_createlogger>
private readonly ILogger<ServiceBusReceivedMessageFunctions> _logger;
public ServiceBusReceivedMessageFunctions(ILogger<ServiceBusReceivedMessageFunctions> logger)
{
_logger = logger;
}
//</docsnippet_servicebusmessage_createlogger>
/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
/// </summary>
//<docsnippet_servicebus_readmessage>
[Function(nameof(ServiceBusReceivedMessageFunction))]
[ServiceBusOutput("outputQueue", Connection = "ServiceBusConnection")]
public string ServiceBusReceivedMessageFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
var outputMessage = $"Output message created at {DateTime.Now}";
return outputMessage;
}
//</docsnippet_servicebus_readmessage>
/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
/// Note that when doing so, you must also set the <see cref="ServiceBusTriggerAttribute.IsBatched"/> property
/// to <value>true</value>.
/// </summary>
//<docsnippet_servicebus_readbatch>
[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
foreach (ServiceBusReceivedMessage message in messages)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}
//</docsnippet_servicebus_readbatch>
/// <summary>
/// This functions demonstrates that it is possible to bind to both the ServiceBusReceivedMessage and any of the supported binding contract
/// properties at the same time. If attempting this, the ServiceBusReceivedMessage must be the first parameter. There is not
/// much benefit to doing this as all of the binding contract properties are available as properties on the ServiceBusReceivedMessage.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageWithStringProperties))]
public void ServiceBusReceivedMessageWithStringProperties(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message, string messageId, int deliveryCount)
{
// The MessageId property and the messageId parameter are the same.
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message ID: {id}", messageId);
// Similarly the DeliveryCount property and the deliveryCount parameter are the same.
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}
//<docsnippet_servicebus_message_actions>
[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
// Complete the message
await messageActions.CompleteMessageAsync(message);
}
//</docsnippet_servicebus_message_actions>
}
}
此示例演示接收单个服务总线队列消息并将其写入日志的 C# 函数:
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp
{
/// <summary>
/// Samples demonstrating binding to the <see cref="ServiceBusReceivedMessage"/> type.
/// </summary>
public class ServiceBusReceivedMessageFunctions
{
//<docsnippet_servicebusmessage_createlogger>
private readonly ILogger<ServiceBusReceivedMessageFunctions> _logger;
public ServiceBusReceivedMessageFunctions(ILogger<ServiceBusReceivedMessageFunctions> logger)
{
_logger = logger;
}
//</docsnippet_servicebusmessage_createlogger>
/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
/// </summary>
//<docsnippet_servicebus_readmessage>
[Function(nameof(ServiceBusReceivedMessageFunction))]
[ServiceBusOutput("outputQueue", Connection = "ServiceBusConnection")]
public string ServiceBusReceivedMessageFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
var outputMessage = $"Output message created at {DateTime.Now}";
return outputMessage;
}
//</docsnippet_servicebus_readmessage>
/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
/// Note that when doing so, you must also set the <see cref="ServiceBusTriggerAttribute.IsBatched"/> property
/// to <value>true</value>.
/// </summary>
//<docsnippet_servicebus_readbatch>
[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
foreach (ServiceBusReceivedMessage message in messages)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}
//</docsnippet_servicebus_readbatch>
/// <summary>
/// This functions demonstrates that it is possible to bind to both the ServiceBusReceivedMessage and any of the supported binding contract
/// properties at the same time. If attempting this, the ServiceBusReceivedMessage must be the first parameter. There is not
/// much benefit to doing this as all of the binding contract properties are available as properties on the ServiceBusReceivedMessage.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageWithStringProperties))]
public void ServiceBusReceivedMessageWithStringProperties(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message, string messageId, int deliveryCount)
{
// The MessageId property and the messageId parameter are the same.
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message ID: {id}", messageId);
// Similarly the DeliveryCount property and the deliveryCount parameter are the same.
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}
//<docsnippet_servicebus_message_actions>
[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
// Complete the message
await messageActions.CompleteMessageAsync(message);
}
//</docsnippet_servicebus_message_actions>
}
}
此示例演示一个 C# 函数,该函数接收单个批处理中的多个服务总线队列消息,并将每个消息写入日志:
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp
{
/// <summary>
/// Samples demonstrating binding to the <see cref="ServiceBusReceivedMessage"/> type.
/// </summary>
public class ServiceBusReceivedMessageFunctions
{
//<docsnippet_servicebusmessage_createlogger>
private readonly ILogger<ServiceBusReceivedMessageFunctions> _logger;
public ServiceBusReceivedMessageFunctions(ILogger<ServiceBusReceivedMessageFunctions> logger)
{
_logger = logger;
}
//</docsnippet_servicebusmessage_createlogger>
/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
/// </summary>
//<docsnippet_servicebus_readmessage>
[Function(nameof(ServiceBusReceivedMessageFunction))]
[ServiceBusOutput("outputQueue", Connection = "ServiceBusConnection")]
public string ServiceBusReceivedMessageFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
var outputMessage = $"Output message created at {DateTime.Now}";
return outputMessage;
}
//</docsnippet_servicebus_readmessage>
/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
/// Note that when doing so, you must also set the <see cref="ServiceBusTriggerAttribute.IsBatched"/> property
/// to <value>true</value>.
/// </summary>
//<docsnippet_servicebus_readbatch>
[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
foreach (ServiceBusReceivedMessage message in messages)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}
//</docsnippet_servicebus_readbatch>
/// <summary>
/// This functions demonstrates that it is possible to bind to both the ServiceBusReceivedMessage and any of the supported binding contract
/// properties at the same time. If attempting this, the ServiceBusReceivedMessage must be the first parameter. There is not
/// much benefit to doing this as all of the binding contract properties are available as properties on the ServiceBusReceivedMessage.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageWithStringProperties))]
public void ServiceBusReceivedMessageWithStringProperties(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message, string messageId, int deliveryCount)
{
// The MessageId property and the messageId parameter are the same.
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message ID: {id}", messageId);
// Similarly the DeliveryCount property and the deliveryCount parameter are the same.
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}
//<docsnippet_servicebus_message_actions>
[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
// Complete the message
await messageActions.CompleteMessageAsync(message);
}
//</docsnippet_servicebus_message_actions>
}
}
此示例演示一个 C# 函数,该函数接收多个服务总线队列消息,将其写入日志,然后将消息处置为已完成:
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
namespace SampleApp
{
/// <summary>
/// Samples demonstrating binding to the <see cref="ServiceBusReceivedMessage"/> type.
/// </summary>
public class ServiceBusReceivedMessageFunctions
{
//<docsnippet_servicebusmessage_createlogger>
private readonly ILogger<ServiceBusReceivedMessageFunctions> _logger;
public ServiceBusReceivedMessageFunctions(ILogger<ServiceBusReceivedMessageFunctions> logger)
{
_logger = logger;
}
//</docsnippet_servicebusmessage_createlogger>
/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
/// </summary>
//<docsnippet_servicebus_readmessage>
[Function(nameof(ServiceBusReceivedMessageFunction))]
[ServiceBusOutput("outputQueue", Connection = "ServiceBusConnection")]
public string ServiceBusReceivedMessageFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
var outputMessage = $"Output message created at {DateTime.Now}";
return outputMessage;
}
//</docsnippet_servicebus_readmessage>
/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
/// Note that when doing so, you must also set the <see cref="ServiceBusTriggerAttribute.IsBatched"/> property
/// to <value>true</value>.
/// </summary>
//<docsnippet_servicebus_readbatch>
[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
foreach (ServiceBusReceivedMessage message in messages)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}
//</docsnippet_servicebus_readbatch>
/// <summary>
/// This functions demonstrates that it is possible to bind to both the ServiceBusReceivedMessage and any of the supported binding contract
/// properties at the same time. If attempting this, the ServiceBusReceivedMessage must be the first parameter. There is not
/// much benefit to doing this as all of the binding contract properties are available as properties on the ServiceBusReceivedMessage.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageWithStringProperties))]
public void ServiceBusReceivedMessageWithStringProperties(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message, string messageId, int deliveryCount)
{
// The MessageId property and the messageId parameter are the same.
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message ID: {id}", messageId);
// Similarly the DeliveryCount property and the deliveryCount parameter are the same.
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}
//<docsnippet_servicebus_message_actions>
[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
// Complete the message
await messageActions.CompleteMessageAsync(message);
}
//</docsnippet_servicebus_message_actions>
}
}
以下 Java 函数使用 @ServiceBusQueueTrigger
中的 @ServiceBusQueueTrigger
注释来说明服务总线队列触发器的配置。 此函数获取放置在队列上的消息,然后将其添加到日志。
@FunctionName("sbprocessor")
public void serviceBusProcess(
@ServiceBusQueueTrigger(name = "msg",
queueName = "myqueuename",
connection = "myconnvarname") String message,
final ExecutionContext context
) {
context.getLogger().info(message);
}
将消息添加到服务总线主题时,也可触发 Java 函数。 以下示例使用 @ServiceBusTopicTrigger
注释来说明触发器配置。
@FunctionName("sbtopicprocessor")
public void run(
@ServiceBusTopicTrigger(
name = "message",
topicName = "mytopicname",
subscriptionName = "mysubscription",
connection = "ServiceBusConnection"
) String message,
final ExecutionContext context
) {
context.getLogger().info(message);
}
以下示例显示了服务总线触发器 TypeScript 函数。 此函数将读取消息元数据并记录服务总线队列消息。
import { app, InvocationContext } from '@azure/functions';
export async function serviceBusQueueTrigger1(message: unknown, context: InvocationContext): Promise<void> {
context.log('Service bus queue function processed message:', message);
context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
context.log('DeliveryCount =', context.triggerMetadata.deliveryCount);
context.log('MessageId =', context.triggerMetadata.messageId);
}
app.serviceBusQueue('serviceBusQueueTrigger1', {
connection: 'MyServiceBusConnection',
queueName: 'testqueue',
handler: serviceBusQueueTrigger1,
});
以下示例显示了服务总线触发器 JavaScript 函数。 此函数将读取消息元数据并记录服务总线队列消息。
const { app } = require('@azure/functions');
app.serviceBusQueue('serviceBusQueueTrigger1', {
connection: 'MyServiceBusConnection',
queueName: 'testqueue',
handler: (message, context) => {
context.log('Service bus queue function processed message:', message);
context.log('EnqueuedTimeUtc =', context.triggerMetadata.enqueuedTimeUtc);
context.log('DeliveryCount =', context.triggerMetadata.deliveryCount);
context.log('MessageId =', context.triggerMetadata.messageId);
},
});
以下示例演示了 function.json 文件中的服务总线触发器绑定以及使用该绑定的 PowerShell 函数。
下面是 function.json 文件中的绑定数据:
{
"bindings": [
{
"name": "mySbMsg",
"type": "serviceBusTrigger",
"direction": "in",
"topicName": "mytopic",
"subscriptionName": "mysubscription",
"connection": "AzureServiceBusConnectionString"
}
]
}
下面是发送服务总线消息时运行的函数。
param([string] $mySbMsg, $TriggerMetadata)
Write-Host "PowerShell ServiceBus queue trigger function processed message: $mySbMsg"
下面的示例演示如何通过触发器读取服务总线队列消息。 该示例取决于使用的是 v1 还是 v2 Python 编程模型。
import logging
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="ServiceBusQueueTrigger1")
@app.service_bus_queue_trigger(arg_name="msg",
queue_name="<QUEUE_NAME>",
connection="<CONNECTION_SETTING>")
def test_function(msg: func.ServiceBusMessage):
logging.info('Python ServiceBus queue trigger processed message: %s',
msg.get_body().decode('utf-8'))
下面的示例演示了如何通过触发器读取服务总线队列主题。
import logging
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="ServiceBusTopicTrigger1")
@app.service_bus_topic_trigger(arg_name="message",
topic_name="TOPIC_NAME",
connection="CONNECTION_SETTING",
subscription_name="SUBSCRIPTION_NAME")
def test_function(message: func.ServiceBusMessage):
message_body = message.get_body().decode("utf-8")
logging.info("Python ServiceBus topic trigger processed message.")
logging.info("Message Body: " + message_body)
特性
进程内和独立工作进程 C# 库都使用 ServiceBusTriggerAttribute 特性来定义函数触发器。 C# 脚本改用 function.json 配置文件,如 C# 脚本指南中所述。
下表说明了可使用此触发器特性设置的属性:
属性 | 说明 |
---|---|
QueueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
TopicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
SubscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
Connection | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
IsBatched | 消息分批传送。 需要数组或集合类型。 |
IsSessionsEnabled | 如果连接到会话感知队列或订阅,则为 true 。 否则为 false (默认值)。 |
AutoCompleteMessages | 如果触发器应在成功调用后自动完成消息,则为 true 。 如果不应,例如在代码中处理消息结算时,则为 false 。 如果未显式设置,则行为将基于 host.json 中的 autoCompleteMessages 配置。 |
在本地开发时,需要将应用程序设置添加到 Values
集合中的 local.settings.json 文件中。
修饰符
仅适用于 Python v2 编程模型。
对于使用修饰器定义的 Python v2 功能,支持 service_bus_queue_trigger
上的以下属性:
properties | 说明 |
---|---|
arg_name |
变量的名称,表示函数代码中的队列或主题消息。 |
queue_name |
要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
connection |
指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。
批注
使用 ServiceBusQueueTrigger
注释可以创建在创建服务总线队列消息时要运行的函数。 可用的配置选项包括以下属性:
属性 | 说明 |
---|---|
name | 变量的名称,表示函数代码中的队列或主题消息。 |
queueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
topicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
subscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
连接 | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
使用 ServiceBusTopicTrigger
注释可以指定主题和订阅,以便以触发函数的数据为目标。
在本地开发时,请将应用程序设置添加到 Values
集合的 local.settings.json 文件。
有关更多详细信息,请参阅触发器示例。
配置
仅适用于 Python v1 编程模型。
下表说明了可以在传递给方法“app.serviceBusQueue()
”或“app.serviceBusTopic()
”的对象“options
”上设置的属性。
properties | 说明 |
---|---|
queueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
topicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
subscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
连接 | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
accessRights | 连接字符串的访问权限。 可用值为 manage 和 listen 。 默认值是 manage ,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 accessRights 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 及更高版本中,此属性不可用,因为最新版本的服务总线 SDK 不支持管理操作。 |
isSessionsEnabled | 如果连接到会话感知队列或订阅,则为 true 。 否则为 false (默认值)。 |
autoComplete | 对于非 C# 函数,此项必须为 true ,这意味着触发器应在处理后自动调用 complete,或者你会通过函数代码手动调用 complete。设置为 true 时,触发器会在函数执行成功完成的情况下自动完成该消息,否则会放弃该消息。函数中的异常会导致运行时在后台调用 abandonAsync 。 如果未发生异常,则在后台调用 completeAsync 。 此属性仅在 Azure Functions 2.x 和更高版本中可用。 |
在本地开发时,请将应用程序设置添加到 Values
集合的 local.settings.json 文件。
下表解释了在 function.json 文件中设置的绑定配置属性。
“function.json”属性 | 说明 |
---|---|
type | 必须设置为 serviceBusTrigger 。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
direction | 必须设置为“in”。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
name | 变量的名称,表示函数代码中的队列或主题消息。 |
queueName | 要监视的队列的名称。 仅在监视队列的情况下设置,不为主题设置。 |
topicName | 要监视的主题的名称。 仅在监视主题的情况下设置,不为队列设置。 |
subscriptionName | 要监视的订阅的名称。 仅在监视主题的情况下设置,不为队列设置。 |
连接 | 指定如何连接到服务总线的应用设置或设置集合的名称。 请参阅连接。 |
accessRights | 连接字符串的访问权限。 可用值为 manage 和 listen 。 默认值是 manage ,其指示 connection 具有“管理”权限。 如果使用不具有“管理”权限的连接字符串,请将 accessRights 设置为“listen”。 否则,Functions 运行时可能会在尝试执行需要管理权限的操作时失败。 在 Azure Functions 版本 2.x 及更高版本中,此属性不可用,因为最新版本的服务总线 SDK 不支持管理操作。 |
isSessionsEnabled | 如果连接到会话感知队列或订阅,则为 true 。 否则为 false (默认值)。 |
autoComplete | 对于非 C# 函数,此项必须为 true ,这意味着触发器应在处理后自动调用 complete,或者你会通过函数代码手动调用 complete。设置为 true 时,触发器会在函数执行成功完成的情况下自动完成该消息,否则会放弃该消息。函数中的异常会导致运行时在后台调用 abandonAsync 。 如果未发生异常,则在后台调用 completeAsync 。 此属性仅在 Azure Functions 2.x 和更高版本中可用。 |
在本地开发时,请将应用程序设置添加到 Values
集合的 local.settings.json 文件。
有关完整示例,请参阅示例部分。
使用情况
所有 C# 形式和扩展版本都支持以下参数类型:
类型 | 说明 |
---|---|
System.String | 当消息为简单文本时使用。 |
byte[] | 用于二进制数据消息。 |
Object | 当消息包含 JSON 时,Functions 会尝试将 JSON 数据反序列化为已知的普通旧 CLR 对象类型。 |
特定于消息的参数类型包含其他消息元数据。 服务总线触发器支持的特定类型取决于 Functions 运行时版本、扩展包版本和所使用的 C# 形式。
如果希望函数处理单条消息,服务总线触发器可以绑定到以下类型:
类型 | 说明 |
---|---|
string |
字符串格式的消息。 当消息为简单文本时使用。 |
byte[] |
消息的字节数。 |
JSON 可序列化类型 | 当事件包含 JSON 数据时,Functions 会尝试将 JSON 数据反序列化为普通的旧 CLR 对象 (POCO) 类型。 |
ServiceBusReceivedMessage | (预览版1) 消息对象。 |
如果希望函数处理一批消息,服务总线触发器可以绑定到以下类型:
类型 | 说明 |
---|---|
T[] ,其中 T 是单消息类型之一 |
批处理中的事件数组。 每个条目表示一个事件。 |
独立进程模型尚不支持服务总线触发器的消息解决方案。
1 若要使用这些类型,需要引用 Microsoft.Azure.Functions.Worker.Extensions.ServiceBus 5.10.0-preview2 或更高版本以及 SDK 类型绑定的常见依赖项。
未定义 Connection
属性时,Functions 会查找名为 AzureWebJobsServiceBus
的应用设置,这是服务总线连接字符串的默认名称。 还可以设置 Connection
属性来指定应用程序设置(其中包含要使用的服务总线连接字符串)的名称。
可以通过 ServiceBusQueueMessage
或 ServiceBusTopicMessage
参数获取传入的服务总线消息。
可以通过在 function.json 文件的名称属性中配置的参数来使用服务总线实例。
队列消息可通过类型为 func.ServiceBusMessage
的参数提供给函数。 服务总线消息作为字符串或 JSON 对象传递到函数中。
有关完整示例,请参阅示例部分。
连接
connection
属性是对环境配置的引用,它指定应用应该如何连接到服务总线。 它可能指定:
如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。
连接字符串
若要获取连接字符串,请执行获取管理凭据中显示的步骤。 必须是服务总线命名空间的连接字符串,不限于特定的队列或主题。
此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection
属性指定的值匹配。
如果应用设置名称以“AzureWebJobs”开头,则只能指定该名称的余下部分。 例如,如果将 connection
设为“MyServiceBus”,Functions 运行时会查找名为“AzureWebJobsMyServiceBus”的应用设置。 如果将 connection
留空,函数运行时将使用名为“AzureWebJobsServiceBus”的应用设置中的默认服务总线连接字符串。
基于标识的连接
如果使用 5.x 版或更高版本的扩展,则无需将连接字符串与机密一起使用,而是可以使应用使用 Azure Active Directory 标识。 为此,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection
属性。
在此模式下,扩展需要以下属性:
属性 | 环境变量模板 | 说明 | 示例值 |
---|---|---|---|
完全限定的命名空间 | <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace |
完全限定的服务总线命名空间。 | <service_bus_namespace>.servicebus.chinacloudapi.cn |
可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性。
注意
使用 Azure 应用程序配置或 Key Vault 为托管标识连接提供设置时,设置名称应使用有效的键分隔符(例如 :
或 /
)替代 __
,以确保正确解析名称。
例如 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace
。
在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credential
和 clientID
属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发。
向标识授予权限
无论使用何种标识,都必须具有执行所需操作的权限。 需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色。
重要
某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。
你将需要创建一个角色分配,以便在运行时提供对主题和队列的访问权限。 所有者等管理角色还不够。 下表显示了在正常操作中使用服务总线扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。
绑定类型 | 内置角色示例 |
---|---|
触发器1 | Azure 服务总线数据接收方、Azure 服务总线数据所有者 |
输出绑定 | Azure 服务总线数据发送方 |
1 若要从服务总线主题触发,角色分配需要对服务总线订阅资源具有有效范围。 如果仅包含主题,将发生错误。 某些客户端(例如 Azure 门户)不会将服务总线订阅资源公开为角色分配的范围。 在这种情况下,可以改用 Azure CLI。 若要了解详细信息,请参阅适用于 Azure 服务总线的 Azure 内置角色。
有害消息
无法在 Azure Functions 中控制或配置有害消息处理。 服务总线处理有害消息本身。
PeekLock 行为
Functions 运行时以 PeekLock 模式接收消息。
默认情况下,如果函数成功完成,则运行时会对此消息调用 Complete
;如果函数失败,则调用 Abandon
。 可以通过 host.json
中的 autoCompleteMessages
属性禁用自动完成。
默认情况下,如果函数成功完成,则运行时会对此消息调用 Complete
;如果函数失败,则调用 Abandon
。 可以通过 host.json
中的 autoCompleteMessages
属性或通过触发器特性上的一个属性来禁用自动完成。 如果你的函数代码处理消息结算,则应禁用自动完成。
如果函数的运行时间长于 PeekLock
超时时间,则只要该函数正在运行,就会自动续订锁定。 maxAutoRenewDuration
在 host.json 中可配置,它映射到 serviceBusProcessor.MaxAutoLockRenewalDuration。 此设置的默认值为 5 分钟。
消息元数据
使用特定于消息传送的类型,你可以轻松地检索作为对象属性的元数据。 这些属性取决于 Functions 运行时版本、扩展包版本和所使用的 C# 形式。
这些属性是 ServiceBusReceivedMessage 类的成员。
属性 | 类型 | 说明 |
---|---|---|
ApplicationProperties |
ApplicationProperties |
由发送方设置的属性。 |
ContentType |
string |
发送方和接收方用于实现应用程序特定逻辑的内容类型标识符。 |
CorrelationId |
string |
相关 ID。 |
DeliveryCount |
Int32 |
传递次数。 |
EnqueuedTime |
DateTime |
排队时间 (UTC)。 |
ScheduledEnqueueTimeUtc |
DateTime |
计划的排队时间 (UTC)。 |
ExpiresAt |
DateTime |
到期时间 (UTC)。 |
MessageId |
string |
服务总线可用于标识重复消息的用户定义值(如果启用)。 |
ReplyTo |
string |
对队列地址的回复。 |
Subject |
string |
特定于应用程序的标签,可用于替代 Label 元数据属性。 |
To |
string |
发送到地址。 |