如何通过 Python 使用队列存储

Tip

尝试 Azure 存储资源管理器

Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。

概述

本指南演示如何使用 Azure 队列存储服务执行常见方案。 这些示例用 Python 编写并使用 Azure Storage SDK for Python。 介绍的方案包括插入扫视获取删除队列消息以及创建和删除队列。 有关队列的详细信息,请参阅[后续步骤]部分。

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。

队列存储的常见用途包括:

  • 创建积压工作以进行异步处理
  • 将消息从 Azure Web 角色传递到 Azure 辅助角色

队列服务概念

队列服务包含以下组件:

队列 1

  • URL 格式:可使用以下 URL 格式对队列进行寻址:
    http://<storage account>.queue.core.chinacloudapi.cn/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.chinacloudapi.cn/images-to-download

  • 存储帐户: 对 Azure 存储服务的所有访问都要通过存储帐户来完成。 有关存储帐户容量的详细信息,请参阅 Azure 存储可伸缩性和性能目标

  • 队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的详细信息,请参阅 命名队列和元数据
  • 消息: 一条消息(不管采用何种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的存储资源提供程序客户端库创建 Azure 存储帐户。

如果暂时不想创建存储帐户,也可以使用 Azure 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅 使用 Azure 存储模拟器进行开发和测试

下载和安装适用于 Python 的 Azure 存储 SDK

适用于 Python 的 Azure 存储 SDK 需要 Python 2.7、3.3、3.4、3.5 或 3.6,并且包含 4 个不同包:azure-storage-blobazure-storage-fileazure-storage-tableazure-storage-queue。 在本教程中,我们要用到 azure-storage-queue 包。

通过 PyPi 安装

要通过 Python 包索引 (PyPI) 安装,请键入:

pip install azure-storage-queue

Note

如果要从用于 Python 的 Azure 存储 SDK 版本 0.36 或更早版本升级,首先需要使用 pip uninstall azure-storage 进行卸载,因为我们不再通过单个包的形式发布用于 Python 的存储 SDK 了。

有关备用安装方法,请访问 Github 上用于 Python 的 Azure 存储 SDK

如何:创建队列

可以通过 QueueService 对象来处理队列。 以下代码创建 QueueService 对象。 在希望在其中以编程方式访问 Azure 存储的任何 Python 文件中,将以下代码添加到文件的顶部附近:

from azure.storage.queue import QueueService

以下代码使用存储帐户名称和帐户密钥创建 QueueService 对象。 使用帐户名称和密钥替换“myaccount”和“mykey”。

queue_service = QueueService(account_name='myaccount', account_key='mykey', endpoint_suffix='core.chinacloudapi.cn')

queue_service.create_queue('taskqueue')

如何:在队列中插入消息

若要在队列中插入消息,可使用 put_message 方法创建一条新消息并将其添加到队列中。

queue_service.put_message('taskqueue', u'Hello World')

如何:扫视下一条消息

可以通过调用 peek_messages 方法,查看队列前面的消息,而不必从队列中将其删除。 默认情况下,peek_messages 扫视单条消息。

messages = queue_service.peek_messages('taskqueue')
for message in messages:
    print(message.content)

如何:取消消息的排队

代码分两步从队列中删除消息。 在调用 get_messages 时,默认情况下会获得队列中的下一条消息。 对于从此队列读取消息的任何其他代码,从 get_messages 返回的消息将变得不可见。 默认情况下,此消息持续 30 秒不可见。 若要从队列中删除消息,还必须调用 delete_message。 此删除消息的两步过程可确保当代码因硬件或软件故障而无法处理消息时,其他代码实例可以获取同一消息并重试。 处理消息后代码会立即调用 delete_message。

messages = queue_service.get_messages('taskqueue')
for message in messages:
    print(message.content)
    queue_service.delete_message('taskqueue', message.id, message.pop_receipt)

可通过两种方式自定义队列中消息的检索。 首先,可获取一批消息(最多 32 条)。 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。 以下代码示例使用 get_messages 方法在一次调用中获取 16 条消息。 然后,它会使用 for 循环处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。

messages = queue_service.get_messages('taskqueue', num_messages=16, visibility_timeout=5*60)
for message in messages:
    print(message.content)
    queue_service.delete_message('taskqueue', message.id, message.pop_receipt)      

如何:更改已排队消息的内容

可以更改队列中现有消息的内容。 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。 以下代码使用 update_message 方法来更新消息。 可见性超时设为 0,这意味着消息会立刻出现且内容将更新。

messages = queue_service.get_messages('taskqueue')
for message in messages:
    queue_service.update_message('taskqueue', message.id, message.pop_receipt, 0, u'Hello World Again')

如何:获取队列长度

可以获取队列中消息的估计数。 get_queue_metadata 方法要求队列服务返回有关队列的元数据和 approximate_message_count。 结果仅是近似值,因为在队列服务响应请求之后,可能添加或删除了消息。

metadata = queue_service.get_queue_metadata('taskqueue')
count = metadata.approximate_message_count

如何:删除队列

若要删除队列及其中包含的所有消息,请调用 delete_queue 方法。

queue_service.delete_queue('taskqueue')

后续步骤

在了解了队列存储的基础知识后,可单击下面的链接了解详细信息。