Compartir a través de

消息传输、锁定和处置

消息代理(如服务总线)的最核心功能是将消息接受到队列或主题中以及保存它们以用于将来检索。 发送是常用于指消息传输到消息代理中的术语。 接收是常用于指将消息传输到检索客户端的术语。

当客户端发送消息时,它通常希望了解消息是否正确传输到代理并由代理接受,或是否发生某种形式的错误。 这种肯定或否定确认可以让客户端和中转站理解消息传输状态。 因此,它称为“结算”。

同样,当中转站向客户端传输消息时,中转站和客户端都希望了解消息是已成功处理(可以删除消息),还是消息传递或处理失败(可能需要再次传递消息)。

处置发送操作

使用任何支持的服务总线 API 客户端时,到服务总线中的发送操作会始终进行显式处置,这意味着 API 操作会等待来自服务总线的接受结果到达,然后完成发送操作。

如果服务总线拒绝消息,则拒绝会包含错误指示器以及具有“tracking-id”的文本。 拒绝还包含有关是否可以在期望成功的情况下重试操作的信息。 在客户端中,此信息会转变为异常并引发到发送操作的调用方。 如果接受了消息,则操作以无提示方式完成。

高级消息队列协议 (AMQP) 是唯一支持 .NET Standard、Java、JavaScript、Python 和 Go 客户端的协议。 对于 .NET Framework 客户端,你可以使用服务总线消息传递协议 (SBMP) 或 AMQP。 使用 AMQP 协议时,消息传输和结算会进行管道处理,并且是异步方式。 建议使用异步编程模型 API 变体。

注意

2026 年 9 月 30 日,我们将停用 Azure 服务总线 SDK 库 WindowsAzure.ServiceBus、Microsoft.Azure.ServiceBus 和 com.microsoft.azure.servicebus,这些库不符合 Azure SDK 准则。 我们还将结束对 SBMP 协议的支持,因此在 2026 年 9 月 30 日之后,你将无法再使用此协议。 请在该日期之前迁移到最新的 Azure SDK 库,新库提供了关键安全更新和改进功能。

尽管旧库在 2026 年 9 月 30 日之后仍可使用,但它们将不再获得 Azure 的官方支持和更新。 有关详细信息,请参阅支持停用公告

发送方可以快速连续地将多个消息置于线路上,而不必等待确认每个消息(使用 SBMP 协议或使用 HTTP 1.1 时会是这种情况)。 当在分区实体上接受和存储相应消息时,或是在到不同实体的发送操作重叠时,这些异步发送操作会完成。 完成也可能不会按原始发送顺序进行。

用于处理发送操作结果的策略可能会对应用程序形成直接的显著性能影响。 本部分的示例使用 c# 编写,适用于 Java Future、Java Mono、JavaScript Promise 和其他语言中的等效概念。

如果应用程序会产生消息突发(此处使用普通循环说明),并且会等待每个发送操作完成,然后再发送下一个消息,则同步或异步 API 的情况相似,发送 10 个消息只会在 10 个连续的完整处置往返之后完成。

假设从本地站点到服务总线有 70 毫秒的 传输控制协议 (TCP) 往返延迟,并且服务总线接受和存储每个消息只需 10 毫秒,则在不考虑有效负载传输时间或潜在路由拥塞影响的情况下,以下循环至少需要 8 秒时间:

for (int i = 0; i < 10; i++)
{
    // creating the message omitted for brevity
    await sender.SendMessageAsync(message);
}

如果应用程序即时连续地启动 10 个异步发送操作,并等待其分别完成,则会叠加这 10 个发送操作的往返时间。 10 个消息会即时连续传输(甚至可能会共享 TCP 帧),总体传输持续时间在很大程度上取决于使消息传输到代理的网络相关时间。

进行与前面循环相同的假设,以下循环的总重叠执行时间可能刚好低于一秒:

var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
    tasks.Add(sender.SendMessageAsync(message));
}
await Task.WhenAll(tasks);

请务必注意,所有异步编程模型都使用某种形式的基于内存的隐藏工作队列来保存挂起的操作。 当发送 API 操作返回时,发送任务会在该工作队列中排队,但协议操作在轮到任务运行后才会开始。 对于倾向于推送消息突发并且需要考虑可靠性的代码,应注意不要同时“发送”太多消息,因为所有发送的消息在实际置于线路上之前都会占用内存。

如以下 C# 代码片段中所示,信号灯是可在需要时启用这类应用程序级别限制的同步对象。 信号灯的这种用法允许同时最多发送 10 个消息。 10 个可用信号灯锁中的一个会在发送之前采用,然后在发送完成时释放。 循环的第 11 次执行会等待以前发送操作中的至少一个发送操作完成,然后再完成锁定:

var semaphore = new SemaphoreSlim(10);

var tasks = new List<Task>();
for (int i = 0; i < 10; i++)
{
    await semaphore.WaitAsync();

    tasks.Add(sender.SendMessageAsync(message).ContinueWith((t)=>semaphore.Release()));
}
await Task.WhenAll(tasks);

应用程序绝不应采用“发后不理”方式启动异步发送操作,而不检索操作结果。 这样做可以加载内部和不可见任务队列,直到内存耗尽,并阻止应用程序检测发送错误:

for (int i = 0; i < 10; i++)
{
    sender.SendMessageAsync(message); // DON'T DO THIS
}

对于低级别 AMQP 客户端,服务总线还接受“预处置”传输。 预处置传输是发后不理操作,其结果在任一情况下都不会报告回客户端并且消息在发送时被视为已处置。 不向客户端进行反馈还意味着没有任何可操作数据可用于诊断,这表示此模式没有资格通过 Azure 支持获得帮助。

处置接收操作

对于接收操作,服务总线 API 客户端启用两种不同的显式模式:接收并删除和扫视锁定

ReceiveAndDelete

接收并删除模式告知代理将它发送到接收客户端的所有消息都在发送时视为已处置。 这意味着在代理将消息置于线路上之后,会立即将消息视为已使用。 如果消息传输失败,则消息会丢失。

此模式的优点是接收方无需对消息执行进一步操作,也不会由于等待处置结果而减慢速度。 如果各个消息中包含的数据具有较低值并且/或者只在很短时间内才有意义,则此模式是合理选择。

PeekLock

扫视锁定模式告知代理接收客户端希望显式处置收到的消息。 消息可供接收方进行处理,同时在服务中保持在排他锁下,以便其他竞争接收方无法看到它。 该锁的持续时间最初在队列或订阅级别进行定义,可以由拥有该锁的客户端通过 RenewMessageLockAsync 操作进行延长。 有关续订锁定的详细信息,请参阅本文中的续订锁部分。

锁定消息时,从相同队列或订阅进行接收的其他客户端可以接受锁并检索不处于活动锁下的下一个可用消息。 显式释放对消息的锁定或是锁定过期时,消息会置于检索顺序前列或附近,以便重新传递。

当接收方重复释放消息或让锁在运行定义的次数后 (maxDeliveryCount) 结束时,系统会自动将消息从队列或订阅中删除,并将其放入关联死信队列中。

当接收客户端调用消息的完成 API 时,它会通过正面确认来开始处置收到的消息。 这会向中转站指出消息已得到成功处理,并且会将消息从队列或订阅中删除。 代理会使用指示是否可以执行处置的回复来回复接收者的处置意向。

如果接收客户端未能处理消息,但是希望重新传递消息,则可以通过为消息调用弃用 API 来显式要求立即释放并解锁消息,或者,也可以不执行任何操作,让锁结束。

如果接收客户端未能处理消息,并且知道重新传递消息和重试操作不起作用,则可以拒绝消息,这会通过为消息调用死信 API 而将其移动到死信队列中,因此还允许设置自定义属性(包括可以从死信队列中的消息检索的原因代码)。

注意

仅当你为队列或订阅启用了死信功能时,队列或主题订阅才会存在死信子队列。

处置的一种特殊情况是延迟,我们将在专文中单独讨论。

CompleteDeadLetterRenewLock 操作可能因网络问题而失败,例如,锁定状态过期,或者存在其他阻止处置的服务端条件。 在后面一种情况下,服务会发送否定确认,该确认会在 API 客户端中表现为异常。 如果原因是网络连接中断,则会丢弃锁,因为服务总线不支持在不同连接上恢复现有 AMQP 链接。

如果 Complete 失败(这通常在消息处理结束时发生,某些情况下会在处理工作进行几分钟之后发生),则接收应用程序可以决定是否在第二次传递时保留工作状态并忽略相同的消息,或是否在重新传递消息时丢弃工作结果并重试。

用于标识重复消息传递的典型机制是检查消息 ID,它可以并且应该由发送方设置为唯一值(可能与来自发起进程的标识符一致)。 作业计划程序可能会将消息 ID 设置为它尝试通过给定辅助进程分配给辅助进程的作业的标识符,并且该辅助进程会在作业已完成时忽略该作业的第二次出现。

重要

请务必注意,PeekLock 或 SessionLock 在消息上获取的锁是易失的,在以下情况下可能会丢失

  • 服务更新
  • OS 更新
  • 在持有锁时更改实体(队列、主题、订阅)的属性。

当锁丢失时,Azure 服务总线将生成一个 MessageLockLostException 或 SessionLockLostException,该异常将出现在客户端应用程序上。 在这种情况下,客户端的默认重试逻辑应自动启动,并重试该操作。

续订锁

锁定持续时间的默认值为“1 分钟”。 你可以在队列或订阅级别为锁定持续时间指定不同的值。 拥有该锁的客户端可以使用接收方对象上的方法来续订消息锁。 相反,你可以使用自动锁定续订功能,在该功能中,你可以指定要保持续订锁的持续时间。

最好将锁定持续时间设置为高于正常处理时间的值,这样就不必延长锁定。 最大值为 5 分钟,因此,如果需要更长的时间,则需要延长锁定。 锁定持续时间超过所需时间也会产生一些影响。 例如,当客户端停止工作时,消息只会在锁定持续时间过后才能再次可用。

后续步骤