如何使用服务总线队列

本指南介绍如何使用服务总线队列。 相关示例用 Ruby 编写且使用 Azure gem。 涉及的方案包括“创建队列、发送和接收消息”以及“删除队列”。 有关服务总线队列的详细信息,请参阅 后续步骤 部分。

什么是 Service Bus 队列?

服务总线队列支持中转消息传送通信模型。 在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介(代理)的队列交换消息。 消息创建方(发送方)将消息传送到队列,然后继续对其进行处理。 消息使用方(接收方)以异步方式从队列中提取消息并对其进行处理。 创建方不必等待使用方的答复即可继续处理并发送更多消息。 队列为一个或多个竞争使用方提供先入先出 (FIFO)消息传递方式。 也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。

QueueConcepts

服务总线队列是一种可用于各种应用场景的通用技术:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信。
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信。
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信。

利用队列,你可以更轻松地缩放应用程序,并增强体系结构的弹性。

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。

创建命名空间:

  1. 登录到 Azure 门户
  2. 在门户的左侧导航窗格中,依次单击“+”新建,搜索“service bus"。

  3. 在“创建命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  4. 在确保命名空间名称可用后,选择定价层(基础版或标准版)。

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。

  6. 在“资源组” 字段中,选择用于放置该命名空间的现有资源组,或者创建一个新资源组。

  7. 在“位置” 中,选择应在其中托管该命名空间的国家或地区。

    创建命名空间

  8. 单击“创建” 。 系统现已创建命名空间并已将其启用。 可能需要等待几分钟,因为系统会为你的帐户配置资源。

获取管理凭据

创建新的命名空间时,会自动生成一项初始的共享访问签名 (SAS) 规则,将一对主密钥和辅助密钥关联到一起,向每个密钥授予对命名空间的所有资产的完全控制权限。 请参阅服务总线身份验证和授权,了解如何创建更多的规则,对常规的发送者和接收者的权限进行更多限制。 若要复制初始规则,请执行以下步骤:

  1. 单击“所有资源”,然后单击新创建的命名空间名称。
  2. 在命名空间窗口中,单击“共享访问策略”。
  3. 在“共享访问策略”屏幕中,单击“RootManageSharedAccessKey”。

    connection-info

  4. 在“策略: RootManageSharedAccessKey”窗口中,单击“连接字符串 - 主键”旁边的复制按钮,将连接字符串复制到剪贴板供以后使用。 将此值粘贴到记事本或其他某个临时位置。

    connection-string

  5. 重复上述步骤,将主键的值复制和粘贴到临时位置,以供稍后使用。

创建 Ruby 应用程序

有关说明,请参阅在 Azure 上创建 Ruby 应用程序

配置应用程序以使用服务总线

若要使用服务总线,请下载并使用 Azure Ruby 包,其中包括一组便于与存储 REST 服务进行通信的库。

使用 RubyGems 获取该程序包

  1. 使用命令行接口,例如 PowerShell (Windows)、Terminal (Mac) 或 Bash (Unix)。
  2. 在命令窗口中键入“gem install azure”以安装 gem 和依赖项。

导入包

使用常用的文本编辑器将以下内容添加到要在其中使用存储的 Ruby 文件的顶部:

require "azure"

设置服务总线连接

使用以下代码设置命名空间、密钥名称、密钥、签名程序和主机的值:

Azure.configure do |config|
  config.sb_namespace = '<your azure service bus namespace>'
  config.sb_sas_key_name = '<your azure service bus access keyname>'
  config.sb_sas_key = '<your azure service bus access key>'
end
signer = Azure::ServiceBus::Auth::SharedAccessSigner.new
sb_host = "https://#{Azure.sb_namespace}.servicebus.chinacloudapi.cn"

将命名空间值设置为你创建的值,而不是整个 URL 的值。 例如,使用 "yourexamplenamespace",而不是 "yourexamplenamespace.servicebus.chinacloudapi.cn"。

如何创建队列

可以通过 Azure::ServiceBusService 对象处理队列。 若要创建队列,请使用 create_queue() 方法。 以下示例创建一个队列或输出任何错误。

azure_service_bus_service = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer})
begin
  queue = azure_service_bus_service.create_queue("test-queue")
rescue
  puts $!
end

还可以通过其他选项传递 Azure::ServiceBus::Queue 对象,这些选项可用于重写默认队列设置,如消息生存时间或最大队列大小。 以下示例演示了如何将最大队列大小设置为 5 GB,将生存时间设置为 1 分钟:

queue = Azure::ServiceBus::Queue.new("test-queue")
queue.max_size_in_megabytes = 5120
queue.default_message_time_to_live = "PT1M"

queue = azure_service_bus_service.create_queue(queue)

如何向队列发送消息

要向服务总线队列发送消息,应用程序需对 Azure::ServiceBusService 对象调用 send_queue_message() 方法。 发往服务总线队列的消息以及从服务总线队列接收的消息是 Azure::ServiceBus::BrokeredMessage 对象,它们具有一组标准属性(如 labeltime_to_live)、一个用于保存自定义的应用程序特定属性的字典和大量任意应用程序数据。 应用程序可以通过将字符串值作为消息传送设置消息正文,任何必需的标准属性会用默认值填充。

以下示例演示如何使用 send_queue_message() 向名为 test-queue 的队列发送一条测试消息:

message = Azure::ServiceBus::BrokeredMessage.new("test queue message")
message.correlation_id = "test-correlation-id"
azure_service_bus_service.send_queue_message("test-queue", message)

服务总线队列在标准层中支持的最大消息大小为 256 KB。标头最大为 64 KB,其中包括标准和自定义应用程序属性。一个队列可包含的消息数不受限制,但消息的总大小受限。此队列大小是在创建时定义的,上限为 5 GB。

如何从队列接收消息

Azure::ServiceBusService 对象使用 receive_queue_message() 方法可从队列接收消息。 默认情况下,消息在被读取的同时会被锁定,从而无法从队列中删除。 但是,可以通过将 :peek_lock 选项设置为 false,在读取消息时将其从队列中删除。

默认行为使读取和删除变成一个两阶段操作,从而也有可能支持不允许遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序处理完该消息(或将它可靠地存储起来留待将来处理)后,通过调用 delete_queue_message() 方法并提供要删除的消息作为参数来完成接收过程的第二阶段。 delete_queue_message() 方法将此消息标记为“已使用”并将其从队列中删除。

如果 :peek_lock 参数设置为 false,读取并删除消息将是最简单的模式,并且最适合在发生故障时应用程序允许不处理消息的情况。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线会将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它会遗漏在发生崩溃前使用的消息。

以下示例演示如何使用 receive_queue_message() 接收和处理消息。该示例先通过将 :peek_lock 设置为 false 接收并删除一条消息,然后再接收另一条消息,最后使用 delete_queue_message() 删除该消息:

message = azure_service_bus_service.receive_queue_message("test-queue", 
  { :peek_lock => false })
message = azure_service_bus_service.receive_queue_message("test-queue")
azure_service_bus_service.delete_queue_message(message)

如何处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序因某种原因无法处理消息,则它可以对 Azure::ServiceBusService 对象调用 unlock_queue_message() 方法。 此调用会导致服务总线解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),则服务总线将自动解锁该消息并使它可再次被接收。

如果应用程序在处理消息之后,调用 delete_queue_message() 方法之前崩溃,则在应用程序重启时会将该消息重新传送给它。 此过程通常称作“至少处理一次”,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 这通常可通过使用消息的 message_id 属性实现,该属性在多次传送尝试中保持不变。

后续步骤

现在,已了解有关服务总线队列的基础知识,请访问下面的链接以获取详细信息。

有关本文中讨论的 Azure 服务总线队列与如何通过 Ruby 使用队列存储一文中讨论的 Azure 队列的比较,请参阅 Azure 队列和 Azure 服务总线队列 - 比较与对照