快速入门:如何通过 Ruby 使用服务总线队列

本教程介绍如何创建 Ruby 应用程序,以便向服务总线队列发送消息以及从中接收消息。 相关示例用 Ruby 编写且使用 Azure gem。

先决条件

  1. Azure 订阅。 若要完成本教程,需要一个 Azure 帐户。 你可以激活 MSDN 订阅者权益或注册试用版订阅
  2. 按照使用 Azure 门户创建服务总线队列一文中的步骤操作。
    1. 阅读服务总线队列的快速概述

    2. 创建一个服务总线命名空间

    3. 获取连接字符串

      注意

      在本教程中,需使用 Ruby 在服务总线命名空间中创建一个队列

创建 Ruby 应用程序

有关说明,请参阅在 Azure 上创建 Ruby 应用程序

配置应用程序以使用服务总线

若要使用服务总线,请下载并使用 Azure Ruby 包,其中包括一组便于与存储 REST 服务进行通信的库。

使用 RubyGems 获取该程序包

  1. 使用命令行接口,例如 PowerShell (Windows)、Terminal (Mac) 或 Bash (Unix)。
  2. 在命令窗口中键入“gem install azure”以安装 gem 和依赖项。

导入包

使用常用的文本编辑器将以下内容添加到要在其中使用存储的 Ruby 文件的顶部:

require "azure"

设置服务总线连接

使用以下代码,设置命名空间、密钥名称、密钥、签名程序和主机的值:

Azure.configure do |config|
  config.sb_namespace = '<your azure service bus namespace>'
  config.sb_sas_key_name = '<your azure service bus access keyname>'
  config.sb_sas_key = '<your azure service bus access key>'
end
signer = Azure::ServiceBus::Auth::SharedAccessSigner.new
sb_host = "https://#{Azure.sb_namespace}.servicebus.chinacloudapi.cn"

将命名空间值设置为创建的值,而不是整个 URL 的值。 例如,使用 "yourexamplenamespace",而不是 "yourexamplenamespace.servicebus.chinacloudapi.cn"。

使用多个命名空间时,可以在创建 SharedAccessSigner 对象时将密钥及其名称传递到构造函数

sb_namespace = '<your azure service bus namespace>'
sb_sas_key_name = '<your azure service bus access keyname>'
sb_sas_key = '<your azure service bus access key>'

signer = Azure::ServiceBus::Auth::SharedAccessSigner.new(sb_sas_key_name, sb_sas_key)
sb_host = "https://#{sb_namespace}.servicebus.chinacloudapi.cn"

如何创建队列

可以通过 Azure::ServiceBusService 对象处理队列。 若要创建队列,请使用 create_queue() 方法。 以下示例创建一个队列或输出任何错误。

azure_service_bus_service = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer})
begin
  queue = azure_service_bus_service.create_queue("test-queue")
rescue
  puts $!
end

还可以通过其他选项传递 Azure::ServiceBus::Queue 对象,这些选项可用于重写默认队列设置,如消息生存时间或最大队列大小。 以下示例演示如何将最大队列大小设置为 5 GB,将生存时间设置为 1 分钟:

queue = Azure::ServiceBus::Queue.new("test-queue")
queue.max_size_in_megabytes = 5120
queue.default_message_time_to_live = "PT1M"

queue = azure_service_bus_service.create_queue(queue)

如何向队列发送消息

要向服务总线队列发送消息,应用程序需对 Azure::ServiceBusService 对象调用 send_queue_message() 方法。 发往服务总线队列的消息以及从服务总线队列接收的消息是 Azure::ServiceBus::BrokeredMessage 对象,它们具有一组标准属性(如 labeltime_to_live)、一个用于保存自定义的应用程序特定属性的字典和大量任意应用程序数据。 应用程序可以通过将字符串值作为消息传送设置消息正文,任何必需的标准属性会用默认值填充。

以下示例演示如何使用 send_queue_message() 向名为 test-queue 的队列发送一条测试消息:

message = Azure::ServiceBus::BrokeredMessage.new("test queue message")
message.correlation_id = "test-correlation-id"
azure_service_bus_service.send_queue_message("test-queue", message)

服务总线队列在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。 一个队列中包含的消息数量不受限制,但消息的总大小受限制。 此队列大小在创建时定义,上限为 5 GB。

如何从队列接收消息

对 Azure::ServiceBusService 对象使用 receive_queue_message() 方法可从队列接收消息。 默认情况下,消息在被读取的同时会被锁定,从而无法从队列中删除。 但是,可以通过将 :peek_lock 选项设置为“false”,在读取消息时将其从队列中删除。

默认行为使读取和删除变成一个两阶段操作,从而也有可能支持不允许遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序处理完该消息(或将它可靠地存储起来留待将来处理)后,通过调用 delete_queue_message() 方法并提供要删除的消息作为参数,完成接收过程的第二阶段。 delete_queue_message() 方法会将消息标记为已使用,并从队列中将其删除。

如果将 :peek_lock 参数设置为 false,则读取和删除消息将成为最简单的模型,并且最适用于应用程序可以容忍在发生故障时不处理消息的情况。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线已将消息标记为“已使用”,因此当应用程序重新启动并重新开始使用消息时,它会漏掉在发生崩溃前使用的消息。

以下示例演示如何使用 receive_queue_message() 接收和处理消息。 该示例先通过将 :peek_lock 设置为“false”接收并删除一条消息,然后再接收另一条消息,最后使用 delete_queue_message() 删除该消息:

message = azure_service_bus_service.receive_queue_message("test-queue",
  { :peek_lock => false })
message = azure_service_bus_service.receive_queue_message("test-queue")
azure_service_bus_service.delete_queue_message(message)

如何处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序因某种原因无法处理消息,则它可以对 Azure::ServiceBusService 对象调用 unlock_queue_message() 方法。 此调用会导致服务总线解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),则服务总线将自动解锁该消息并使它可再次被接收。

如果应用程序在处理消息之后,但在调用 delete_queue_message() 方法之前崩溃,则在应用程序重启时会将该消息重新传送给它。 此过程通常称作“至少处理一次”,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 这通常可通过使用消息的 message_id 属性实现,该属性在多次传送尝试中保持不变。

注意

可以使用服务总线资源管理器管理服务总线资源。 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。

后续步骤

现在,已了解有关服务总线队列的基础知识,请访问下面的链接以获取详细信息。

有关本文中讨论的 Azure 服务总线队列与如何通过 Ruby 使用队列存储一文中讨论的 Azure 队列的比较,请参阅 Azure 队列和 Azure 服务总线队列 - 比较与对照