如何通过 Python 使用服务总线队列How to use Service Bus queues with Python

本教程介绍如何创建 Python 应用程序,以便向服务总线队列发送消息以及从中接收消息。In this tutorial, you learn how to create Python applications to send messages to and receive messages from a Service Bus queue.

先决条件Prerequisites

  1. Azure 订阅。An Azure subscription. 若要完成本教程,需要一个 Azure 帐户。To complete this tutorial, you need an Azure account. 可以激活 MSDN 订阅者权益注册试用帐户You can activate your MSDN subscriber benefits or sign up for a trial account.
  2. 按照使用 Azure 门户创建服务总线队列一文中的步骤操作。Follow steps in the Use Azure portal to create a Service Bus queue article.
    1. 阅读服务总线队列的快速概述Read the quick overview of Service Bus queues.

    2. 创建一个服务总线命名空间Create a Service Bus namespace.

    3. 获取连接字符串Get the connection string.

      Note

      在本教程中,需使用 Python 在服务总线命名空间中创建一个队列You will create a queue in the Service Bus namespace by using Python in this tutorial.

  3. 若要安装 Python 或 Python Azure 服务总线包,请参阅 Python 安装指南Install Python or the Python Azure Service Bus package, see the Python Installation Guide. 此处查看服务总线 Python SDK 的完整文档。See full documentation of Service Bus Python SDK here.

创建队列Create a queue

可以通过 ServiceBusClient 对象处理队列。The ServiceBusClient object enables you to work with queues. 将以下代码添加到任何 Python 文件的顶部附近,你希望在其中以编程方式访问服务总线:Add the following code near the top of any Python file in which you wish to programmatically access Service Bus:

from azure.servicebus import ServiceBusClient

以下代码创建 ServiceBusClient 对象。The following code creates a ServiceBusClient object. mynamespacesharedaccesskeynamesharedaccesskey 替换为命名空间、共享访问签名 (SAS) 密钥名称和值。Replace mynamespace, sharedaccesskeyname, and sharedaccesskey with your namespace, shared access signature (SAS) key name, and value.

sb_client = ServiceBusClient.from_connection_string('<CONNECTION STRING>')

SAS 密钥名称的值和值可以在 Azure 门户连接信息中找到,也可以在服务器资源管理器中选择服务总线命名空间后,在 Visual Studio“属性”窗格中找到(如前一部分中所示)。The values for the SAS key name and value can be found in the Azure portal connection information, or in the Visual Studio Properties pane when selecting the Service Bus namespace in Server Explorer (as shown in the previous section).

sb_client.create_queue("taskqueue")

create_queue 方法还支持其他选项,通过这些选项可以重写默认队列设置,例如消息生存时间 (TTL) 或最大队列大小。The create_queue method also supports additional options, which enable you to override default queue settings such as message time to live (TTL) or maximum queue size. 以下示例将最大队列大小设置为 5 GB,将 TTL 值设置为 1 分钟:The following example sets the maximum queue size to 5 GB, and the TTL value to 1 minute:

queue_options = Queue()
queue_options.max_size_in_megabytes = '5120'
queue_options.default_message_time_to_live = 'PT1M'

sb_client.create_queue("taskqueue", queue_options)

向队列发送消息Send messages to a queue

要将消息发送到服务总线队列,应用程序需对 ServiceBusClient 对象调用 send 方法。To send a message to a Service Bus queue, your application calls the send method on the ServiceBusClient object.

以下示例演示如何使用 send_queue_message 向名为 taskqueue 的队列发送一条测试消息:The following example demonstrates how to send a test message to the queue named taskqueue using send_queue_message:

from azure.servicebus import QueueClient, Message

# Create the QueueClient 
queue_client = QueueClient.from_connection_string("<CONNECTION STRING>", "<QUEUE NAME>")

# Send a test message to the queue
msg = Message(b'Test Message')
queue_client.send(Message("Message"))

服务总线队列在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。Service Bus queues support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 一个队列可包含的消息数不受限制,但消息的总大小受限。There is no limit on the number of messages held in a queue but there is a cap on the total size of the messages held by a queue. 此队列大小是在创建时定义的,上限为 5 GB。This queue size is defined at creation time, with an upper limit of 5 GB. 有关配额的详细信息,请参阅服务总线配额For more information about quotas, see Service Bus quotas.

从队列接收消息Receive messages from a queue

ServiceBusService 对象使用 get_receiver 方法可从队列接收消息:Messages are received from a queue using the get_receiver method on the ServiceBusService object:

from azure.servicebus import QueueClient, Message

# Create the QueueClient 
queue_client = QueueClient.from_connection_string("<CONNECTION STRING>", "<QUEUE NAME>")

## Receive the message from the queue
with queue_client.get_receiver() as queue_receiver:
    messages = queue_receiver.fetch_next(timeout=3)
    for message in messages:
        print(message)
        message.complete()

peek_lock 参数设置为“False”时,会在读取消息后将其从队列中删除。Messages are deleted from the queue as they are read when the parameter peek_lock is set to False. 通过将参数 peek_lock 设置为“True”,可读取(速览)并锁定消息而不会将其从队列中删除。You can read (peek) and lock the message without deleting it from the queue by setting the parameter peek_lock to True.

在接收过程中读取并删除消息的行为是最简单的模式,并且最适合应用程序允许出现故障时不处理消息的情况。The behavior of reading and deleting the message as part of the receive operation is the simplest model, and works best for scenarios in which an application can tolerate not processing a message in the event of a failure. 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。To understand this, consider a scenario in which the consumer issues the receive request and then crashes before processing it. 由于服务总线会将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它会遗漏在发生崩溃前使用的消息。Because Service Bus will have marked the message as being consumed, then when the application restarts and begins consuming messages again, it will have missed the message that was consumed prior to the crash.

如果将 peek_lock 参数设置为“True”,则接收会变成一个两阶段操作,从而可支持无法容忍遗漏消息的应用程序。If the peek_lock parameter is set to True, the receive becomes a two stage operation, which makes it possible to support applications that cannot tolerate missing messages. 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。When Service Bus receives a request, it finds the next message to be consumed, locks it to prevent other consumers receiving it, and then returns it to the application. 应用程序处理完消息(或安全存储该消息以供将来处理)后,会通过对 Message 对象调用“删除”方法来完成接收过程的第二个阶段。After the application finishes processing the message (or stores it reliably for future processing), it completes the second stage of the receive process by calling the delete method on the Message object. delete 方法会将消息标记为“已使用”并将其从队列中删除。The delete method will mark the message as being consumed and remove it from the queue.

msg.delete()

如何处理应用程序崩溃和不可读消息How to handle application crashes and unreadable messages

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 如果接收方应用程序因某种原因无法处理消息,则可对 Message 对象调用 unlock 方法。If a receiver application is unable to process the message for some reason, then it can call the unlock method on the Message object. 这会导致 Service Bus 解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。This will cause Service Bus to unlock the message within the queue and make it available to be received again, either by the same consuming application or by another consuming application.

还存在与队列中已锁定的消息相关联的超时,并且如果应用程序未能在锁定超时到期之前处理消息(例如,如果应用程序崩溃),服务总线则将自动解锁该消息,使它可以再次被接收。There is also a timeout associated with a message locked within the queue, and if the application fails to process the message before the lock timeout expires (for example, if the application crashes), then Service Bus will unlock the message automatically and make it available to be received again.

如果应用程序在处理消息之后,但在调用 delete 方法之前崩溃,则在应用程序重新启动时,该消息会重新传送给应用程序。In the event that the application crashes after processing the message but before the delete method is called, then the message will be redelivered to the application when it restarts. 此情况通常称作至少处理一次,即每条消息至少被处理一次,但在某些情况下,同一消息可能会被重新传送。This is often called At Least Once Processing, that is, each message will be processed at least once but in certain situations the same message may be redelivered. 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。If the scenario cannot tolerate duplicate processing, then application developers should add additional logic to their application to handle duplicate message delivery. 这通常可以通过消息的 MessageId 属性来实现,该属性在多次传送尝试中保持不变。This is often achieved using the MessageId property of the message, which will remain constant across delivery attempts.

后续步骤Next steps

现在,已了解有关服务总线队列的基础知识,请参阅以下文章了解详细信息。Now that you have learned the basics of Service Bus queues, see these articles to learn more.