快速入门:通过 Python 使用 Azure 服务总线队列Quickstart: Use Azure Service Bus queues with Python

本文介绍如何使用 Python 创建 Azure 服务总线队列,并向其发送消息和从中接收消息。This article shows you how to use Python to create, send messages to, and receive messages from Azure Service Bus queues.

有关 Python Azure 服务总线库的详细信息,请参阅适用于 Python 的服务总线库For more information about the Python Azure Service Bus libraries, see Service Bus libraries for Python.

先决条件Prerequisites

创建队列Create a queue

可通过某个 ServiceBusService 对象来使用队列。A ServiceBusClient object lets you work with queues. 若要以编程方式访问服务总线,请将以下行添加到 Python 文件的顶部附近:To programmatically access Service Bus, add the following line near the top of your Python file:

from azure.servicebus import ServiceBusClient

添加以下代码以创建 ServiceBusClient 对象。Add the following code to create a ServiceBusClient object. 请将 <connectionstring> 替换为服务总线的主连接字符串值。Replace <connectionstring> with your Service Bus primary connection string value. 可以在 Azure 门户上服务总线命名空间中的“共享访问策略”下找到此值。You can find this value under Shared access policies in your Service Bus namespace in the Azure portal.

sb_client = ServiceBusClient.from_connection_string('<connectionstring>')

以下代码使用 ServiceBusClientcreate_queue 方法创建名为 taskqueue 且采用默认设置的队列:The following code uses the create_queue method of the ServiceBusClient to create a queue named taskqueue with default settings:

sb_client.create_queue("taskqueue")

可以使用相应的选项来重写默认队列设置,例如消息生存时间 (TTL) 或最大主题大小。You can use options to override default queue settings, such as message time to live (TTL) or maximum topic size. 以下代码创建名为 taskqueue 的队列,其最大队列大小为 5 GB,TTL 值为 1 分钟:The following code creates a queue called taskqueue with a maximum queue size of 5 GB and TTL value of 1 minute:

sb_client.create_queue("taskqueue", max_size_in_megabytes=5120,
                       default_message_time_to_live=datetime.timedelta(minutes=1))

向队列发送消息Send messages to a queue

若要向服务总线队列发送消息,应用程序需对 ServiceBusService 对象调用 send 方法。To send a message to a Service Bus queue, an application calls the send method on the ServiceBusClient object. 以下代码示例创建一个队列客户端,并将测试消息发送到 taskqueue 队列。The following code example creates a queue client and sends a test message to the taskqueue queue. 请将 <connectionstring> 替换为服务总线的主连接字符串值。Replace <connectionstring> with your Service Bus primary connection string value.

from azure.servicebus import QueueClient, Message

# Create the QueueClient
queue_client = QueueClient.from_connection_string("<connectionstring>", "taskqueue")

# Send a test message to the queue
msg = Message(b'Test Message')
queue_client.send(msg)

消息大小限制和配额Message size limits and quotas

服务总线队列在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。Service Bus queues support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 队列中可以包含的消息数量不受限制,但队列包含的消息总大小有上限。There's no limit on the number of messages a queue can hold, but there's a cap on the total size of the messages the queue holds. 可以在创建时定义队列大小,上限为 5 GB。You can define queue size at creation time, with an upper limit of 5 GB.

有关配额的详细信息,请参阅 服务总线配额For more information about quotas, see Service Bus quotas.

从队列接收消息Receive messages from a queue

队列客户端通过对 ServiceBusClient 对象使用 get_receiver 方法来从队列接收消息。The queue client receives messages from a queue by using the get_receiver method on the ServiceBusClient object. 以下代码示例创建一个队列客户端,并从 taskqueue 队列接收消息。The following code example creates a queue client and receives a message from the taskqueue queue. 请将 <connectionstring> 替换为服务总线的主连接字符串值。Replace <connectionstring> with your Service Bus primary connection string value.

from azure.servicebus import QueueClient, Message

# Create the QueueClient
queue_client = QueueClient.from_connection_string("<connectionstring>", "taskqueue")

# Receive the message from the queue
with queue_client.get_receiver() as queue_receiver:
    messages = queue_receiver.fetch_next(timeout=3)
    for message in messages:
        print(message)
        message.complete()

使用 peek_lock 参数Use the peek_lock parameter

get_receiverpeek_lock 可选参数确定服务总线在从队列读取消息后是否删除消息。The optional peek_lock parameter of get_receiver determines whether Service Bus deletes messages from the queue as they're read. 默认的消息接收模式是 PeekLock 或设置为 Truepeek_lock,后者在读取(扫视)后锁定消息,而不会从队列中删除消息。The default mode for message receiving is PeekLock, or peek_lock set to True, which reads (peeks) and locks messages without deleting them from the queue. 然后,必须显式完成每个消息以将其从队列中删除。Each message must then be explicitly completed to remove it from the queue.

若要在读取消息后将其从队列中删除,可将 get_receiverpeek_lock 参数设置为 FalseTo delete messages from the queue as they're read, you can set the peek_lock parameter of get_receiver to False. 在执行接收操作过程中删除消息是最简单的模型,但仅当应用程序在发生失败的情况下能够容许消息缺失时,该模型才能正常工作。Deleting messages as part of the receive operation is the simplest model, but only works if the application can tolerate missing messages if there's a failure. 为了理解此行为,可以设想这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。To understand this behavior, consider a scenario in which the consumer issues a receive request and then crashes before processing it. 如果在接收消息时它已被删除,当应用程序重启并重新开始使用消息时,它便缺少了在发生崩溃之前收到的消息。If the message was deleted on being received, when the application restarts and begins consuming messages again, it has missed the message it received before the crash.

如果应用程序不能容许消息缺失,则接收过程是由两个阶段组成的操作。If your application can't tolerate missed messages, receive is a two-stage operation. PeekLock 查找要使用的下一个消息,将其锁定以防其他使用方接收它,然后将该消息返回给应用程序。PeekLock finds the next message to be consumed, locks it to prevent other consumers from receiving it, and returns it to the application. 处理或存储消息后,应用程序通过对 Message 对象调用 complete 方法完成接收过程的第二个阶段。After processing or storing the message, the application completes the second stage of the receive process by calling the complete method on the Message object. complete 方法会将消息标记为已使用,并将其从队列中删除。The complete method marks the message as being consumed and removes it from the queue.

处理应用程序崩溃和不可读消息Handle application crashes and unreadable messages

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 如果接收方应用程序因某种原因无法处理消息,则可对 Message 对象调用 unlock 方法。If a receiver application can't process a message for some reason, it can call the unlock method on the Message object. 服务总线解锁队列中的消息,并使其能够重新被同一个或另一个使用方应用程序接收。Service Bus unlocks the message within the queue and makes it available to be received again, either by the same or another consuming application.

队列中锁定的消息还存在超时。There's also a timeout for messages locked within the queue. 如果应用程序无法在锁定超时期满前处理消息(例如,如果应用程序崩溃),服务总线会自动解锁消息,让它再次可供接收。If an application fails to process a message before the lock timeout expires, for example if the application crashes, Service Bus unlocks the message automatically and makes it available to be received again.

如果应用程序在处理消息之后,但在调用 complete 方法之前崩溃,则在应用程序重启时会将该消息重新传送给它。If an application crashes after processing a message but before calling the complete method, the message is redelivered to the application when it restarts. 此行为通常称为“至少处理一次”。This behavior is often called At-least-once Processing. 每条消息将至少处理一次,但在某些情况下,可能会重新传送同一消息。Each message is processed at least once, but in certain situations the same message may be redelivered. 如果方案无法容许重复处理,可以使用消息的 MessageId 属性(多次尝试传送时,该属性保持不变)来处理重复消息传送。If your scenario can't tolerate duplicate processing, you can use the MessageId property of the message, which remains constant across delivery attempts, to handle duplicate message delivery.

Tip

可以使用服务总线资源管理器管理服务总线资源。You can manage Service Bus resources with Service Bus Explorer. 可以使用服务总线资源管理器连接到服务总线命名空间并轻松管理消息传送实体。Service Bus Explorer lets you connect to a Service Bus namespace and easily administer messaging entities. 该工具提供高级功能,例如导入/导出功能,以及用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。The tool provides advanced features like import/export functionality and the ability to test topics, queues, subscriptions, relay services, notification hubs, and event hubs.

后续步骤Next steps

了解服务总线队列的基本信息后,请参阅队列、主题和订阅以获取更多信息。Now that you've learned the basics of Service Bus queues, see Queues, topics, and subscriptions to learn more.