快速入门:如何通过 Ruby 使用服务总线主题和订阅

本文介绍如何从 Ruby 应用程序使用服务总线主题和订阅。 涉及的方案包括:

  • 创建主题和订阅
  • 创建订阅筛选器
  • 将消息发送到主题
  • 从订阅接收消息
  • 删除主题和订阅

先决条件

  1. Azure 订阅。 若要完成本教程,需要一个 Azure 帐户。 可以激活你的 Visual Studio 或 MSDN 订阅者权益或者注册试用版订阅

  2. 按照 快速入门:使用 Azure 门户创建一个服务总线主题和对此主题的订阅来创建服务总线 命名空间 并获取 连接字符串

    备注

    在本快速入门中,你将使用 Ruby 创建一个 主题 和对此主题的 订阅

创建 Ruby 应用程序

有关说明,请参阅在 Azure 上创建 Ruby 应用程序

配置应用程序以使用服务总线

若要使用服务总线,请下载并使用 Azure Ruby 包,其中包括一组便于与存储 REST 服务进行通信的库。

使用 RubyGems 获取该程序包

  1. 使用命令行接口,例如 PowerShell (Windows)、Terminal (Mac) 或 Bash (Unix)。
  2. 在命令窗口中键入“gem install azure”以安装 gem 和依赖项。

导入包

使用常用的文本编辑器将以下内容添加到要在其中使用存储的 Ruby 文件的顶部:

require "azure"

设置服务总线连接

使用以下代码,设置命名空间、密钥名称、密钥、签名程序和主机的值:

Azure.configure do |config|
  config.sb_namespace = '<your azure service bus namespace>'
  config.sb_sas_key_name = '<your azure service bus access keyname>'
  config.sb_sas_key = '<your azure service bus access key>'
end
signer = Azure::ServiceBus::Auth::SharedAccessSigner.new
sb_host = "https://#{Azure.sb_namespace}.servicebus.chinacloudapi.cn"

将命名空间值设置为创建的值,而不是整个 URL 的值。 例如,使用 "yourexamplenamespace" ,而不是 "yourexamplenamespace.servicebus.chinacloudapi.cn"。

使用多个命名空间时,可以在创建 SharedAccessSigner 对象时将密钥及其名称传递到构造函数

sb_namespace = '<your azure service bus namespace>'
sb_sas_key_name = '<your azure service bus access keyname>'
sb_sas_key = '<your azure service bus access key>'

signer = Azure::ServiceBus::Auth::SharedAccessSigner.new(sb_sas_key_name, sb_sas_key)
sb_host = "https://#{sb_namespace}.servicebus.chinacloudapi.cn"

创建主题

可以通过 Azure::ServiceBusService 对象处理主题。 以下代码创建 Azure::ServiceBusService 对象。 要创建主题,请使用 create_topic() 方法。 以下示例将创建一个主题或输出任何错误。

azure_service_bus_service = Azure::ServiceBus::ServiceBusService.new(sb_host, { signer: signer})
begin
  topic = azure_service_bus_service.create_topic("test-topic")
rescue
  puts $!
end

还可以通过其他选项传递 Azure::ServiceBus::Topic 对象,借助这些选项,可以重写默认主题设置,如消息生存时间或最大队列大小。 下面的示例演示如何将最大队列大小设置为 5 GB,将生存时间设置为 1 分钟:

topic = Azure::ServiceBus::Topic.new("test-topic")
topic.max_size_in_megabytes = 5120
topic.default_message_time_to_live = "PT1M"

topic = azure_service_bus_service.create_topic(topic)

创建订阅

主题订阅也是使用 Azure::ServiceBusService 对象创建的。 订阅已命名,并且具有一个限制传递到订阅的虚拟队列的消息集的可选筛选器。

默认情况下,订阅是永久性的。 除非删除它或与之相关的主题,否则订阅将继续存在。 如果应用程序包含创建订阅的逻辑,则它应首先使用 getSubscription 方法检查该订阅是否已经存在。

可以通过设置 AutoDeleteOnIdle 属性来自动删除订阅。

创建具有默认 (MatchAll) 筛选器的订阅

如果在创建新订阅时未指定任何筛选器,则将使用默认的 MatchAll 筛选器。 使用 MatchAll 筛选器时,发布到主题的所有消息都会置于订阅的虚拟队列中。 以下示例创建了名为“all-messages”的订阅并使用了默认的 MatchAll 筛选器。

subscription = azure_service_bus_service.create_subscription("test-topic", "all-messages")

创建具有筛选器的订阅

还可以定义筛选器,以指定发送到主题的哪些消息应该在特定订阅中显示。

订阅支持的最灵活的筛选器类型是 Azure::ServiceBus::SqlFilter,它实现了部分 SQL92 功能。 SQL 筛选器对发布到主题的消息的属性进行操作。 有关可用于 SQL 筛选器的表达式的更多详细信息,请参阅 SqlFilter 语法。

可以使用 Azure::ServiceBusService 对象的 create_rule() 方法向订阅中添加筛选器。 此方法允许用户向现有订阅中添加新筛选器。

由于默认筛选器会自动应用到所有新订阅,因此,必须首先删除默认筛选器,否则 MatchAll 会替代可能指定的任何其他筛选器。 可以通过对 Azure::ServiceBusService 对象使用 delete_rule() 方法删除默认规则。

以下示例将创建一个名为“high-messages”的订阅,该订阅包含一个 Azure::ServiceBus::SqlFilter,它仅选择自定义 message_number 属性大于 3 的消息:

subscription = azure_service_bus_service.create_subscription("test-topic", "high-messages")
azure_service_bus_service.delete_rule("test-topic", "high-messages", "$Default")

rule = Azure::ServiceBus::Rule.new("high-messages-rule")
rule.topic = "test-topic"
rule.subscription = "high-messages"
rule.filter = Azure::ServiceBus::SqlFilter.new({
  :sql_expression => "message_number > 3" })
rule = azure_service_bus_service.create_rule(rule)

类似地,以下示例创建一个名为 low-messages 的订阅,该订阅包含一个 Azure::ServiceBus::SqlFilter,它仅选择 message_number 属性小于或等于 3 的消息:

subscription = azure_service_bus_service.create_subscription("test-topic", "low-messages")
azure_service_bus_service.delete_rule("test-topic", "low-messages", "$Default")

rule = Azure::ServiceBus::Rule.new("low-messages-rule")
rule.topic = "test-topic"
rule.subscription = "low-messages"
rule.filter = Azure::ServiceBus::SqlFilter.new({
  :sql_expression => "message_number <= 3" })
rule = azure_service_bus_service.create_rule(rule)

现在,当消息发送到 test-topic 时,它始终会传送给订阅了 all-messages 主题订阅的接收者,并且选择性地传送给订阅了 high-messageslow-messages 主题订阅的接收者(具体取决于消息内容)。

将消息发送到主题

要将消息发送到服务总线主题,应用程序必须对 Azure::ServiceBusService 对象使用 send_topic_message() 方法。 发送到服务总线主题的消息是 Azure::ServiceBus::BrokeredMessage 对象的实例。 Azure::ServiceBus::BrokeredMessage 对象具有一组标准属性(如 labeltime_to_live)、一个用于保存自定义的应用程序特定属性的字典,以及大量字符串数据。 应用程序可以通过将字符串值传递给 send_topic_message() 方法设置消息正文,并且任何必需的标准属性将用默认值填充。

以下示例演示如何向 test-topic发送五条测试消息。 每条消息的 message_number 自定义属性值因循环迭代而异(这会确定接收消息的订阅):

5.times do |i|
  message = Azure::ServiceBus::BrokeredMessage.new("test message " + i,
    { :message_number => i })
  azure_service_bus_service.send_topic_message("test-topic", message)
end

服务总线主题在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。 一个主题中包含的消息数量不受限制,但消息的总大小受限制。 此主题大小是在创建时定义的,上限为 5 GB。

从订阅接收消息

对 Azure::ServiceBusService 对象使用 receive_subscription_message() 方法可从订阅接收消息。 默认情况下,消息在被读取(查看)的同时会被锁定,从而无法从订阅中删除。 但是,可以通过将 peek_lock 选项设置为“false”,读取消息并将其从订阅中删除。

默认行为使读取和删除变成一个两阶段操作,从而也有可能支持不允许遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序处理完该消息(或将它可靠地存储起来留待将来处理)后,通过调用 delete_subscription_message() 方法并提供要删除的消息作为参数,完成接收过程的第二阶段。 delete_subscription_message() 方法会将消息标记为已使用,并将其从订阅中删除。

如果 :peek_lock 参数设置为“false”,读取并删除消息将是最简单的模式,并且最适合在发生故障时应用程序可以容忍不处理消息的情况。 可以考虑这样一种情形:使用者发出接收请求,但在处理该请求前发生了故障。 由于服务总线已将消息标记为“已使用”,因此当应用程序重新启动并重新开始使用消息时,它会漏掉在发生崩溃前使用的消息。

以下示例演示如何使用 receive_subscription_message() 接收和处理消息。 该示例先通过将 :peek_lock 设置为“false”从 low-messages 订阅接收一条消息并将其删除,然后再从 high-messages 接收另一条消息,最后使用 delete_subscription_message() 删除该消息:

message = azure_service_bus_service.receive_subscription_message(
  "test-topic", "low-messages", { :peek_lock => false })
message = azure_service_bus_service.receive_subscription_message(
  "test-topic", "high-messages")
azure_service_bus_service.delete_subscription_message(message)

如何处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序因某种原因无法处理消息,则它可以对 Azure::ServiceBusService 对象调用 unlock_subscription_message() 方法。 这会导致服务总线解锁订阅中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

另外,还存在与订阅中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),则服务总线会自动解锁该消息并使其可再次被接收。

如果应用程序在处理消息之后,但在调用 delete_subscription_message() 方法之前崩溃,则在应用程序重启时会将该消息重新传送给它。 此情况通常称作“至少处理一次”,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 此逻辑通常可通过使用消息的 message_id 属性实现,该属性在多次传送尝试中保持不变。

删除主题和订阅

除非设置 AutoDeleteOnIdle 属性,否则主题和订阅是永久性的。 可以通过 Azure 门户或以编程方式删除这些主题和订阅。 下面的示例演示如何删除名为 test-topic 的主题。

azure_service_bus_service.delete_topic("test-topic")

删除某个主题也会删除向该主题注册的所有订阅。 也可以单独删除订阅。 以下代码演示如何从 test-topic 主题删除名为 high-messages 的订阅:

azure_service_bus_service.delete_subscription("test-topic", "high-messages")

备注

可以使用服务总线资源管理器管理服务总线资源。 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。

后续步骤

现在,已了解有关 Service Bus 主题的基础知识,单击下面的链接可了解更多信息。