如何通过 Python 使用 Azure 队列存储

概述

本文演示使用 Azure 队列存储服务的常见方案。 涵盖的方案包括插入、速览、获取和删除队列消息。 还介绍了用于创建和删除队列的代码。

本文中的示例是用 Python 编写的,并且使用了用于 Python 的 Azure 队列存储客户端库。 有关队列的详细信息,请参阅后续步骤部分。

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述

  • 队列: 一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式: 使用以下 URL 格式对队列进行寻址: http://<storage account>.queue.core.chinacloudapi.cn/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.chinacloudapi.cn/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

下载并安装用于 Python 的 Azure 存储 SDK

用于 Python 的 Azure 存储 SDK 需要 Python v2.7、v3.3 或更高版本。

通过 PyPI 安装

要通过 Python 包索引 (PyPI) 安装,请键入:

pip install azure-storage-queue

备注

如果要从适用于 Python 的 Azure 存储 SDK v0.36 或更早版本升级,请在安装最新软件包之前使用 pip uninstall azure-storage 卸载旧版 SDK。

有关其他安装方法,请参阅适用于 Python 的 Azure SDK

从 Azure 门户复制凭据

当示例应用程序向 Azure 存储发出请求时,必须对其进行授权。 若要对请求进行授权,请将存储帐户凭据以连接字符串形式添加到应用程序中。 按照以下步骤查看存储帐户凭据:

  1. 登录 Azure 门户

  2. 找到自己的存储帐户。

  3. 在存储帐户概述的“设置”部分,选择“访问密钥”。 在这里,可以查看你的帐户访问密钥以及每个密钥的完整连接字符串。

  4. 找到“密钥 1”下面的“连接字符串”值,选择“复制”按钮复制该连接字符串。 下一步需将此连接字符串值添加到某个环境变量。

    显示如何从 Azure 门户复制连接字符串的屏幕截图

配置存储连接字符串

复制连接字符串以后,请将其写入运行应用程序的本地计算机的新环境变量中。 若要设置环境变量,请打开控制台窗口,并遵照适用于操作系统的说明。 将 <yourconnectionstring> 替换为实际的连接字符串。

Windows

setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"

在 Windows 中添加环境变量后,必须启动命令窗口的新实例。

Linux

export AZURE_STORAGE_CONNECTION_STRING="<yourconnectionstring>"

macOS

export AZURE_STORAGE_CONNECTION_STRING="<yourconnectionstring>"

重新启动程序

添加环境变量后,重启需要读取环境变量的任何正在运行的程序。 例如,重启开发环境或编辑器,然后再继续。

配置应用程序以访问队列存储

可通过 QueueClient 对象来处理队列。 在你希望以编程方式访问服务总线的任何 Python 文件中,将以下代码添加到顶部附近:

from azure.storage.queue import (
        QueueClient,
        BinaryBase64EncodePolicy,
        BinaryBase64DecodePolicy
)

import os, uuid

os 包支持检索环境变量。 uuid 包支持为队列名称生成唯一标识符。

创建队列

连接字符串是从前面设置的 AZURE_STORAGE_CONNECTION_STRING 环境变量检索的。

以下代码使用存储连接字符串创建 QueueClient 对象。

# Retrieve the connection string from an environment
# variable named AZURE_STORAGE_CONNECTION_STRING
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")

# Create a unique name for the queue
q_name = "queue-" + str(uuid.uuid4())

# Instantiate a QueueClient object which will
# be used to create and manipulate the queue
print("Creating queue: " + q_name)
queue_client = QueueClient.from_connection_string(connect_str, q_name)

# Create the queue
queue_client.create_queue()

Azure 队列消息以文本形式存储。 如果要存储二进制数据,请在将消息放入队列之前设置 Base64 编码和解码函数。

在创建客户端对象时配置 Base64 编码和解码函数。

# Setup Base64 encoding and decoding functions
queue_client.message_encode_policy = BinaryBase64EncodePolicy()
queue_client.message_decode_policy = BinaryBase64DecodePolicy()

在队列中插入消息

若要在队列中插入消息,请使用 send_message 方法。

message = u"Hello World"
print("Adding message: " + message)
queue_client.send_message(message)

扫视消息

可以通过调用 peek_messages 方法来速览消息,而不会将其从队列中删除。 默认情况下,此方法会速览单个消息。

# Peek at the first message
messages = queue_client.peek_messages()

for peeked_message in messages:
    print("Peeked message: " + peeked_message.content)

更改已排队消息的内容

可以更改队列中现有消息的内容。 如果消息表示某个任务,则可以使用此功能来更新该任务的状态。

下面的代码使用 update_message 方法来更新消息。 可见性超时设为 0,这意味着消息会立刻出现且内容将更新。

messages = queue_client.receive_messages()
list_result = next(messages)

message = queue_client.update_message(
        list_result.id, list_result.pop_receipt,
        visibility_timeout=0, content=u'Hello World Again')

print("Updated message to: " + message.content)

获取队列长度

可以获取队列中消息的估计数。

get_queue_properties 方法返回包括 approximate_message_count 在内的队列属性。

properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))

结果仅是近似值,因为在服务响应请求之后,可能添加或删除了消息。

取消消息的排队

通过两个步骤从队列中删除消息。 如果你的代码未能处理消息,此两步过程可确保你可以获取同一消息并重试。 在消息成功处理后调用 delete_message

在调用 receive_messages 时,默认情况下会获得队列中的下一条消息。 从 receive_messages 返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 delete_message

messages = queue_client.receive_messages()

for message in messages:
    print("Dequeueing message: " + message.content)
    queue_client.delete_message(message.id, message.pop_receipt)

可通过两种方式自定义队列中消息的检索。 首先,可获取一批消息(最多 32 条)。 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。

以下代码示例使用 receive_messages 方法成批获取消息。 然后,它使用嵌套的 for 循环来处理每批中的每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。

messages = queue_client.receive_messages(messages_per_page=5, visibility_timeout=5*60)

for msg_batch in messages.by_page():
   for msg in msg_batch:
      print("Batch dequeue message: " + msg.content)
      queue_client.delete_message(msg)

删除队列

若要删除队列及其包含的所有消息,请调用 delete_queue 方法。

print("Deleting queue: " + queue_client.queue_name)
queue_client.delete_queue()

提示

尝试 Azure 存储资源管理器

Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。

后续步骤

在了解了队列存储的基础知识后,可单击下面的链接了解更多信息。