如何通过 Python 使用服务总线主题和订阅How to use Service Bus topics and subscriptions with Python

本文介绍如何将 Python 与服务总线主题和订阅配合使用。This article describes how to use Python with Azure Service Bus topics and subscriptions. 示例使用 Azure Python SDK 包执行以下操作:The samples use the Azure Python SDK package to:

  • 创建主题以及对主题的订阅Create topics and subscriptions to topics
  • 创建订阅筛选器和规则Create subscription filters and rules
  • 将消息发送到主题Send messages to topics
  • 从订阅接收消息Receive messages from subscriptions
  • 删除主题和订阅Delete topics and subscriptions

先决条件Prerequisites

  1. Azure 订阅。An Azure subscription. 若要完成本教程,需要一个 Azure 帐户。To complete this tutorial, you need an Azure account. 可以激活 Visual Studio 或 MSDN 订阅者权益或者注册试用帐户You can activate your Visual Studio or MSDN subscriber benefitsor sign-up for a trial account.

创建 ServiceBusService 对象Create a ServiceBusService object

可以通过某个 ServiceBusService 对象来使用主题以及对主题的订阅。A ServiceBusService object lets you work with topics and subscriptions to topics. 若要以编程方式访问服务总线,请将以下行添加到 Python 文件的顶部附近:To programmatically access Service Bus, add the following line near the top of your Python file:

from azure.servicebus.control_client import ServiceBusService, Message, Topic, Rule, DEFAULT_RULE_NAME

添加以下代码以创建 ServiceBusService 对象。Add the following code to create a ServiceBusService object. 请将 <namespace><sharedaccesskeyname><sharedaccesskeyvalue> 替换为你的服务总线命名空间名称、共享访问签名 (SAS) 密钥名称和主密钥值。Replace <namespace>, <sharedaccesskeyname>, and <sharedaccesskeyvalue> with your Service Bus namespace name, Shared Access Signature (SAS) key name, and primary key value. 可以在 Azure 门户上服务总线命名空间中的“共享访问策略”下找到这些值。 You can find these values under Shared access policies in your Service Bus namespace in the Azure portal.

bus_service = ServiceBusService(
    service_namespace='<namespace>',
    shared_access_key_name='<sharedaccesskeyname>',
    shared_access_key_value='<sharedaccesskeyvalue>')

创建主题Create a topic

以下代码使用 create_topic 方法创建名为 mytopic 且采用默认设置的服务总线主题:The following code uses the create_topic method to create a Service Bus topic called mytopic, with default settings:

bus_service.create_topic('mytopic')

可以使用主题选项来重写默认主题设置,例如消息生存时间 (TTL) 或最大主题大小。You can use topic options to override default topic settings, such as message time to live (TTL) or maximum topic size. 以下示例创建名为 mytopic 的主题,其最大主题大小为 5 GB,默认消息 TTL 为 1 分钟:The following example creates a topic named mytopic with a maximum topic size of 5 GB and default message TTL of one minute:

topic_options = Topic()
topic_options.max_size_in_megabytes = '5120'
topic_options.default_message_time_to_live = 'PT1M'

bus_service.create_topic('mytopic', topic_options)

创建订阅Create subscriptions

也可以使用 ServiceBusService 对象创建对主题的订阅。You also use the ServiceBusService object to create subscriptions to topics. 订阅可以包含筛选器,用于限制传送至其虚拟队列的消息集。A subscription can have a filter to restrict the message set delivered to its virtual queue. 如果未指定筛选器,新订阅将使用默认的 MatchAll 筛选器,该筛选器会将发布到主题的所有消息置于订阅的虚拟队列中。If you don't specify a filter, new subscriptions use the default MatchAll filter, which places all messages published to the topic into the subscription's virtual queue. 以下示例创建名为 AllMessages 的对 mytopic 的订阅,该订阅使用 MatchAll 筛选器:The following example creates a subscription to mytopic named AllMessages that uses the MatchAll filter:

bus_service.create_subscription('mytopic', 'AllMessages')

对订阅使用筛选器Use filters with subscriptions

使用 ServiceBusService 对象的 create_rule 方法来筛选订阅中显示的消息。Use the create_rule method of the ServiceBusService object to filter the messages that appear in a subscription. 可以在创建订阅时指定规则,或将规则添加到现有订阅。You can specify rules when you create the subscription, or add rules to existing subscriptions.

最灵活的筛选器类型是 SqlFilter,它使用 SQL-92 的子集。The most flexible type of filter is a SqlFilter, which uses a subset of SQL-92. SQL 筛选器基于发布到主题的消息的属性运行。SQL filters operate based on the properties of messages published to the topic. 有关可用于 SQL 筛选器的表达式的详细信息,请参阅 SqlFilter.SqlExpression 语法。For more information about the expressions you can use with a SQL filter, see the SqlFilter.SqlExpression syntax.

由于 MatchAll 默认筛选器会自动应用到所有新订阅,因此,必须先从要筛选的订阅中删除该筛选器,否则 MatchAll 会重写指定的任何其他筛选器。Because the MatchAll default filter applies automatically to all new subscriptions, you must remove it from subscriptions you want to filter, or MatchAll will override any other filters you specify. 可以使用 ServiceBusService 对象的 delete_rule 方法删除默认规则 。You can remove the default rule by using the delete_rule method of the ServiceBusService object.

以下示例创建名为 HighMessages 的对 mytopic 的订阅,该订阅使用名为 HighMessageFilterSqlFilter 规则。The following example creates a subscription to mytopic named HighMessages, with a SqlFilter rule named HighMessageFilter. HighMessageFilter 规则仅选择自定义 messageposition 属性大于 3 的消息:The HighMessageFilter rule selects only messages with a custom messageposition property greater than 3:

bus_service.create_subscription('mytopic', 'HighMessages')

rule = Rule()
rule.filter_type = 'SqlFilter'
rule.filter_expression = 'messageposition > 3'

bus_service.create_rule('mytopic', 'HighMessages', 'HighMessageFilter', rule)
bus_service.delete_rule('mytopic', 'HighMessages', DEFAULT_RULE_NAME)

以下示例创建名为 LowMessages 的对 mytopic 的订阅,该订阅使用名为 LowMessageFilterSqlFilter 规则。The following example creates a subscription to mytopic named LowMessages, with a SqlFilter rule named LowMessageFilter. LowMessageFilter 规则仅选择 messageposition 属性小于或等于 3 的消息:The LowMessageFilter rule selects only messages with a messageposition property less than or equal to 3:

bus_service.create_subscription('mytopic', 'LowMessages')

rule = Rule()
rule.filter_type = 'SqlFilter'
rule.filter_expression = 'messageposition <= 3'

bus_service.create_rule('mytopic', 'LowMessages', 'LowMessageFilter', rule)
bus_service.delete_rule('mytopic', 'LowMessages', DEFAULT_RULE_NAME)

AllMessagesHighMessagesLowMessages 都生效后,发送到 mytopic 的消息始终会传送到 AllMessages 订阅的接收方。With AllMessages, HighMessages, and LowMessages all in effect, messages sent to mytopic are always delivered to receivers of the AllMessages subscription. 也可以根据消息的 messageposition 属性值,选择性地将消息传送到 HighMessagesLowMessages 订阅。Messages are also selectively delivered to the HighMessages or LowMessages subscription, depending on the message's messageposition property value.

将消息发送到主题Send messages to a topic

应用程序使用 ServiceBusService 对象的 send_topic_message 方法将消息发送到服务总线主题。Applications use the send_topic_message method of the ServiceBusService object to send messages to a Service Bus topic.

以下示例将 5 条测试消息发送到 mytopic 主题。The following example sends five test messages to the mytopic topic. 自定义 messageposition 属性值取决于循环的迭代,确定哪些订阅接收消息。The custom messageposition property value depends on the iteration of the loop, and determines which subscriptions receive the messages.

for i in range(5):
    msg = Message('Msg {0}'.format(i).encode('utf-8'),
                  custom_properties={'messageposition': i})
    bus_service.send_topic_message('mytopic', msg)

消息大小限制和配额Message size limits and quotas

服务总线主题在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。Service Bus topics support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 主题中可以包含的消息数量不受限制,但主题包含的消息总大小有上限。There's no limit on the number of messages a topic can hold, but there's a cap on the total size of the messages the topic holds. 可以在创建时定义主题大小,上限为 5 GB。You can define topic size at creation time, with an upper limit of 5 GB.

有关配额的详细信息,请参阅 服务总线配额For more information about quotas, see Service Bus quotas.

从订阅接收消息Receive messages from a subscription

应用程序对 ServiceBusService 对象使用 receive_subscription_message 方法来从订阅接收消息。Applications use the receive_subscription_message method on the ServiceBusService object to receive messages from a subscription. 以下示例从 LowMessages 订阅接收消息,并在读取消息后删除消息:The following example receives messages from the LowMessages subscription and deletes them as they're read:

msg = bus_service.receive_subscription_message('mytopic', 'LowMessages', peek_lock=False)
print(msg.body)

receive_subscription_messagepeek_lock 可选参数确定服务总线在从订阅读取消息后是否删除消息。The optional peek_lock parameter of receive_subscription_message determines whether Service Bus deletes messages from the subscription as they're read. 默认的消息接收模式是 PeekLock 或设置为 Truepeek_lock,后者在读取(扫视)后锁定消息,而不会从订阅中删除消息。The default mode for message receiving is PeekLock, or peek_lock set to True, which reads (peeks) and locks messages without deleting them from the subscription. 然后,必须显式完成每个消息以将其从订阅中删除。Each message must then be explicitly completed to remove it from the subscription.

若要在从订阅读取消息后删除消息,可将 peek_lock 参数设置为 False,如前面的示例所示。To delete messages from the subscription as they're read, you can set the peek_lock parameter to False, as in the preceding example. 在执行接收操作过程中删除消息是最简单的模型,仅当应用程序在发生失败的情况下能够容许消息缺失时,该模型才能正常工作。Deleting messages as part of the receive operation is the simplest model, and works fine if the application can tolerate missing messages if there's a failure. 为了理解此行为,可以设想这样一种情形:应用程序发出接收请求,但在处理该请求之前发生崩溃。To understand this behavior, consider a scenario in which the application issues a receive request and then crashes before processing it. 如果在接收消息时它已被删除,当应用程序重启并重新开始使用消息时,它便缺少了在发生崩溃之前收到的消息。If the message was deleted on being received, when the application restarts and begins consuming messages again, it has missed the message it received before the crash.

如果应用程序不能容许消息缺失,则接收过程将变成由两个阶段组成的操作。If your application can't tolerate missed messages, the receive becomes a two-stage operation. PeekLock 查找要使用的下一个消息,将其锁定以防其他使用方接收它,然后将该消息返回给应用程序。PeekLock finds the next message to be consumed, locks it to prevent other consumers from receiving it, and returns it to the application. 处理或存储消息后,应用程序通过对 Message 对象调用 complete 方法完成接收过程的第二个阶段。After processing or storing the message, the application completes the second stage of the receive process by calling the complete method on the Message object. complete 方法会将消息标记为已使用,并将其从订阅中删除。The complete method marks the message as being consumed and removes it from the subscription.

以下示例演示了 peek lock 方案:The following example demonstrates a peek lock scenario:

msg = bus_service.receive_subscription_message('mytopic', 'LowMessages', peek_lock=True)
if msg.body is not None:
    print(msg.body)
    msg.complete()

处理应用程序崩溃和不可读消息Handle application crashes and unreadable messages

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 如果接收方应用程序因某种原因无法处理消息,则可对 Message 对象调用 unlock 方法。If a receiver application can't process a message for some reason, it can call the unlock method on the Message object. 服务总线解锁订阅中的消息,并使其能够重新被同一个或另一个使用方应用程序接收。Service Bus unlocks the message within the subscription and makes it available to be received again, either by the same or another consuming application.

订阅中锁定的消息还存在超时。There's also a timeout for messages locked within the subscription. 如果应用程序无法在锁定超时期满前处理消息(例如,如果应用程序崩溃),服务总线会自动解锁消息,让它再次可供接收。If an application fails to process a message before the lock timeout expires, for example if the application crashes, Service Bus unlocks the message automatically and makes it available to be received again.

如果应用程序在处理消息之后,但在调用 complete 方法之前崩溃,则在应用程序重启时会将该消息重新传送给它。If an application crashes after processing a message but before calling the complete method, the message will be redelivered to the application when it restarts. 此行为通常称为“至少处理一次”。 This behavior is often called At-least-once Processing. 每条消息将至少处理一次,但在某些情况下,可能会重新传送同一消息。Each message is processed at least once, but in certain situations the same message may be redelivered. 如果方案无法容许重复处理,可以使用消息的 MessageId 属性(多次尝试传送时,该属性保持不变)来处理重复消息传送。If your scenario can't tolerate duplicate processing, you can use the MessageId property of the message, which remains constant across delivery attempts, to handle duplicate message delivery.

删除主题和订阅Delete topics and subscriptions

若要删除主题和订阅,请使用 Azure 门户delete_topic 方法。To delete topics and subscriptions, use the Azure portal or the delete_topic method. 以下代码删除名为 mytopic 的主题:The following code deletes the topic named mytopic:

bus_service.delete_topic('mytopic')

删除主题会删除对该主题的所有订阅。Deleting a topic deletes all subscriptions to the topic. 也可以单独删除订阅。You can also delete subscriptions independently. 以下代码从 mytopic 主题删除名为 HighMessages 的订阅:The following code deletes the subscription named HighMessages from the mytopic topic:

bus_service.delete_subscription('mytopic', 'HighMessages')

默认情况下,主题和订阅是持久性的,在删除之前会一直存在。By default, topics and subscriptions are persistent, and exist until you delete them. 若要在特定的时间段过后自动删除订阅,可对订阅设置 auto_delete_on_idle 参数。To automatically delete subscriptions after a certain time period elapses, you can set the auto_delete_on_idle parameter on the subscription.

提示

可以使用服务总线资源管理器管理服务总线资源。You can manage Service Bus resources with Service Bus Explorer. 可以使用服务总线资源管理器连接到服务总线命名空间并轻松管理消息传送实体。Service Bus Explorer lets you connect to a Service Bus namespace and easily administer messaging entities. 该工具提供高级功能,例如导入/导出功能,以及用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。The tool provides advanced features like import/export functionality and the ability to test topics, queues, subscriptions, relay services, notification hubs, and event hubs.

后续步骤Next steps

了解有关服务总线主题的基础知识后,请单击以下链接获取更多信息:Now that you've learned the basics of Service Bus topics, follow these links to learn more: