如何通过 Python 使用服务总线队列

本文介绍了如何使用服务总线队列。 相关示例采用 Python 编写,并使用了 [Python Azure 服务总线包][Python Azure Service Bus package]。 涉及的任务包括创建队列、发送和接收消息以及删除队列

什么是 Service Bus 队列?

服务总线队列支持中转消息传送通信模型。 在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介(代理)的队列交换消息。 消息创建方(发送方)将消息传送到队列,然后继续对其进行处理。 消息使用方(接收方)以异步方式从队列中提取消息并对其进行处理。 创建方不必等待使用方的答复即可继续处理并发送更多消息。 队列为一个或多个竞争使用方提供先入先出 (FIFO)消息传递方式。 也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。

QueueConcepts

服务总线队列是一种可用于各种应用场景的通用技术:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信。
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信。
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信。

利用队列,你可以更轻松地缩放应用程序,并增强体系结构的弹性。

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。

创建命名空间:

  1. 登录到 Azure 门户
  2. 在门户的左侧导航窗格中,依次单击“+”新建,搜索“service bus"。

  3. 在“创建命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  4. 在确保命名空间名称可用后,选择定价层(基础版或标准版)。

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。

  6. 在“资源组” 字段中,选择用于放置该命名空间的现有资源组,或者创建一个新资源组。

  7. 在“位置” 中,选择应在其中托管该命名空间的国家或地区。

    创建命名空间

  8. 单击“创建” 。 系统现已创建命名空间并已将其启用。 可能需要等待几分钟,因为系统会为你的帐户配置资源。

获取管理凭据

创建新的命名空间时,会自动生成一项初始的共享访问签名 (SAS) 规则,将一对主密钥和辅助密钥关联到一起,向每个密钥授予对命名空间的所有资产的完全控制权限。 请参阅服务总线身份验证和授权,了解如何创建更多的规则,对常规的发送者和接收者的权限进行更多限制。 若要复制初始规则,请执行以下步骤:

  1. 单击“所有资源”,然后单击新创建的命名空间名称。
  2. 在命名空间窗口中,单击“共享访问策略”。
  3. 在“共享访问策略”屏幕中,单击“RootManageSharedAccessKey”。

    connection-info

  4. 在“策略: RootManageSharedAccessKey”窗口中,单击“主连接字符串”旁边的“复制”按钮,将连接字符串复制到剪贴板供以后使用。 将此值粘贴到记事本或其他某个临时位置。

    connection-string

  5. 重复上述步骤,将主键的值复制和粘贴到临时位置,以供稍后使用。

Note

若要安装 Python 或 [Python Azure 服务总线包][Python Azure Service Bus package],请参阅 Python 安装指南

创建队列

可以通过 ServiceBusService 对象处理队列。 将以下代码添加到任何 Python 文件的顶部附近,你希望在其中以编程方式访问服务总线:

    from azure.servicebus import ServiceBusService, Message, Queue

以下代码创建 ServiceBusService 对象。 将 mynamespacesharedaccesskeynamesharedaccesskey 替换为命名空间、共享访问签名 (SAS) 密钥名称和值。

    bus_service = ServiceBusService(
        service_namespace='mynamespace',
        shared_access_key_name='sharedaccesskeyname',
        shared_access_key_value='sharedaccesskey')

SAS 密钥名称的值和值可以在 Azure 门户连接信息中找到,也可以在服务器资源管理器中选择服务总线命名空间后,在 Visual Studio“属性”窗格中找到(如前一部分中所示)。

    bus_service.create_queue('taskqueue')

create_queue 方法还支持其他选项,通过这些选项可以重写默认队列设置,例如消息生存时间 (TTL) 或最大队列大小。 以下示例将最大队列大小设置为 5 GB,将 TTL 值设置为 1 分钟:

    queue_options = Queue()
    queue_options.max_size_in_megabytes = '5120'
    queue_options.default_message_time_to_live = 'PT1M'

    bus_service.create_queue('taskqueue', queue_options)

向队列发送消息

要向服务总线队列发送消息,应用程序需对 ServiceBusService 对象调用 send_queue_message 方法。

以下示例演示如何使用 send_queue_message 向名为 taskqueue 的队列发送一条测试消息:

    msg = Message(b'Test Message')
    bus_service.send_queue_message('taskqueue', msg)

在标准层,服务总线队列支持的最大消息大小为 256 KB。 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。 一个队列中包含的消息数量不受限制,但消息的总大小受限制。 此队列大小在创建时定义,上限为 5 GB。 有关配额的详细信息,请参阅 服务总线配额

从队列接收消息

对“ServiceBusService”对象使用“接收”_队列_消息的方法可从队列接收消息:

    msg = bus_service.receive_queue_message('taskqueue', peek_lock=False)
    print(msg.body)

“速览”_锁定参数设置为 False 时,将在读取消息后将其从队列中删除。 通过将参数 peek_lock 设置为“True”,可读取(扫视)并锁定消息而不会将其从队列中删除。

在接收过程中读取并删除消息的行为是最简单的模式,并且最适合应用程序允许出现故障时不处理消息的情况。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线会将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它会遗漏在发生崩溃前使用的消息。

如果将 peek_lock 参数设置为“True”,则接收将变成一个两阶段操作,从而可支持无法容忍遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序处理完消息(或安全存储该消息以供将来处理)后,会通过对 Message 对象调用“删除”方法来完成接收过程的第二个阶段。 delete 方法会将消息标记为“已使用”并将其从队列中删除。

    msg = bus_service.receive_queue_message('taskqueue', peek_lock=True)
    print(msg.body)

    msg.delete()

如何处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序因某种原因无法处理消息,则可对 Message 对象调用 unlock 方法。 这会导致 Service Bus 解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),Service Bus 会自动解锁该消息并使它可再次被接收。

如果应用程序在处理消息之后,但在调用 delete 方法之前崩溃,则在应用程序重新启动时,该消息会重新传送给应用程序。 此情况通常称作至少处理一次,即每条消息至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 这通常可以通过消息的 MessageId 属性来实现,该属性在多次传送尝试中保持不变。

后续步骤

现在,已了解有关服务总线队列的基础知识,请参阅以下文章了解详细信息。