如何通过 Python 使用 Azure 队列存储How to use Azure Queue storage from Python

概述Overview

本文演示使用 Azure 队列存储服务的常见方案。This article demonstrates common scenarios using the Azure Queue storage service. 涵盖的方案包括插入、速览、获取和删除队列消息。The scenarios that are covered include inserting, peeking, getting, and deleting queue messages. 还介绍了用于创建和删除队列的代码。Code for creating and deleting queues is also covered.

本文中的示例是用 Python 编写的,使用用于 Python 的 Azure 队列存储客户端库The examples in this article are written in Python and use the Azure Queue storage client library for Python. 有关队列的详细信息,请参阅后续步骤部分。For more information on queues, see the Next steps section.

什么是队列存储?What is Queue storage?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。Azure Queue storage is a service for storing large numbers of messages that can be accessed from anywhere in the world via authenticated calls using HTTP or HTTPS. 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。A single queue message can be up to 64 KB in size, and a queue can contain millions of messages, up to the total capacity limit of a storage account. 队列存储通常用于创建要异步处理的积压工作 (backlog)。Queue storage is often used to create a backlog of work to process asynchronously.

队列服务概念Queue service concepts

Azure 队列服务包含以下组件:The Azure Queue service contains the following components:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。Storage Account: All access to Azure Storage is done through a storage account. 有关存储帐户的详细信息,请参阅存储帐户概述For more information about storage accounts, see Storage account overview.

  • 队列: 一个队列包含一组消息。Queue: A queue contains a set of messages. 所有消息必须位于相应的队列中。All messages must be in a queue. 请注意,队列名称必须全部小写。Note that the queue name must be all lowercase. 有关命名队列的信息,请参阅 命名队列和元数据For information on naming queues, see Naming Queues and Metadata.

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。Message: A message, in any format, of up to 64 KB. 消息可以保留在队列中的最长时间为 7 天。The maximum time that a message can remain in the queue is 7 days. 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。For version 2017-07-29 or later, the maximum time-to-live can be any positive number, or -1 indicating that the message doesn't expire. 如果省略此参数,则默认的生存时间为 7 天。If this parameter is omitted, the default time-to-live is seven days.

  • URL 格式: 使用以下 URL 格式对队列进行寻址: http://<storage account>.queue.core.chinacloudapi.cn/<queue>URL format: Queues are addressable using the following URL format: http://<storage account>.queue.core.chinacloudapi.cn/<queue>

    可使用以下 URL 访问示意图中的某个队列:The following URL addresses a queue in the diagram:

    http://myaccount.queue.core.chinacloudapi.cn/incoming-orders

创建 Azure 存储帐户Create an Azure storage account

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户The easiest way to create your first Azure storage account is by using the Azure portal. 若要了解更多信息,请参阅 创建存储帐户To learn more, see Create a storage account.

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。You can also create an Azure storage account by using Azure PowerShell, Azure CLI, or the Azure Storage Resource Provider for .NET.

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。If you prefer not to create a storage account in Azure at this time, you can also use the Azurite storage emulator to run and test your code in a local environment. 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发For more information, see Use the Azurite emulator for local Azure Storage development.

下载并安装用于 Python 的 Azure 存储 SDKDownload and install Azure Storage SDK for Python

用于 Python 的 Azure 存储 SDK 需要 Python 2.7、3.3 或更高版本。The Azure Storage SDK for Python requires Python version 2.7, 3.3, or later.

通过 PyPI 安装Install via PyPI

要通过 Python 包索引 (PyPI) 安装,请键入:To install via the Python Package Index (PyPI), type:

pip install azure-storage-queue

备注

如果要从适用于 Python 的 Azure 存储 SDK 版本 0.36 或更早版本升级,请在安装最新软件包之前使用 pip uninstall azure-storage 卸载旧版 SDK。If you are upgrading from the Azure Storage SDK for Python version 0.36 or earlier, uninstall the older SDK using pip uninstall azure-storage before installing the latest package.

有关其他安装方法,请参阅[适用于 Python 的 Azure SDK]。For alternative installation methods, see Azure SDK for Python.

从 Azure 门户复制凭据Copy your credentials from the Azure portal

当示例应用程序向 Azure 存储发出请求时,必须对其进行授权。When the sample application makes a request to Azure Storage, it must be authorized. 若要对请求进行授权,请将存储帐户凭据以连接字符串形式添加到应用程序中。To authorize a request, add your storage account credentials to the application as a connection string. 按照以下步骤查看存储帐户凭据:View your storage account credentials by following these steps:

  1. 登录 Azure 门户Sign in to the Azure portal.

  2. 找到自己的存储帐户。Locate your storage account.

  3. 在存储帐户概述的“设置”部分,选择“访问密钥”。 In the Settings section of the storage account overview, select Access keys. 在这里,可以查看你的帐户访问密钥以及每个密钥的完整连接字符串。Here, you can view your account access keys and the complete connection string for each key.

  4. 找到“密钥 1”下面的“连接字符串”值,选择“复制”按钮复制该连接字符串。 Find the Connection string value under key1, and select the Copy button to copy the connection string. 下一步需将此连接字符串值添加到某个环境变量。You will add the connection string value to an environment variable in the next step.

    显示如何从 Azure 门户复制连接字符串的屏幕截图

配置存储连接字符串Configure your storage connection string

复制连接字符串以后,请将其写入运行应用程序的本地计算机的新环境变量中。After you have copied your connection string, write it to a new environment variable on the local machine running the application. 若要设置环境变量,请打开控制台窗口,并遵照适用于操作系统的说明。To set the environment variable, open a console window, and follow the instructions for your operating system. <yourconnectionstring> 替换为实际的连接字符串。Replace <yourconnectionstring> with your actual connection string.

WindowsWindows

setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"

在 Windows 中添加环境变量后,必须启动命令窗口的新实例。After you add the environment variable in Windows, you must start a new instance of the command window.

LinuxLinux

export AZURE_STORAGE_CONNECTION_STRING="<yourconnectionstring>"

macOSmacOS

export AZURE_STORAGE_CONNECTION_STRING="<yourconnectionstring>"

重新启动程序Restart programs

添加环境变量后,重启需要读取环境变量的任何正在运行的程序。After you add the environment variable, restart any running programs that will need to read the environment variable. 例如,重启开发环境或编辑器,然后再继续。For example, restart your development environment or editor before continuing.

配置应用程序以访问队列存储Configure your application to access queue storage

可通过 QueueClient 对象来处理队列。The QueueClient object lets you work with a queue. 在你希望以编程方式访问服务总线的任何 Python 文件中,将以下代码添加到顶部附近:Add the following code near the top of any Python file in which you wish to programmatically access an Azure queue:

from azure.storage.queue import (
        QueueClient,
        BinaryBase64EncodePolicy,
        BinaryBase64DecodePolicy
)

import os, uuid

os 包支持检索环境变量。The os package provides support to retrieve an environment variable. uuid 包支持为队列名称生成唯一标识符。The uuid package provides support for generating a unique identifier for a queue name.

创建队列Create a queue

连接字符串是从前面设置的 AZURE_STORAGE_CONNECTION_STRING 环境变量检索的。The connection string is retrieved from the AZURE_STORAGE_CONNECTION_STRING environment variable set earlier.

以下代码使用存储连接字符串创建 QueueClient 对象。The following code creates a QueueClient object using the storage connection string.

# Retrieve the connection string from an environment
# variable named AZURE_STORAGE_CONNECTION_STRING
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")

# Create a unique name for the queue
queue_name = "queue-" + str(uuid.uuid4())

# Instantiate a QueueClient object which will
# be used to create and manipulate the queue
print("Creating queue: " + queue_name)
queue_client = QueueClient.from_connection_string(connect_str, queue_name)

# Create the queue
queue_client.create_queue()

在队列中插入消息Insert a message into a queue

若要在队列中插入消息,请使用 send_message 方法。To insert a message into a queue, use the send_message method.

message = u"Hello World"
print("Adding message: " + message)
queue_client.send_message(message)

Azure 队列消息以文本形式存储。Azure queue messages are stored as text. 如果要存储二进制数据,请在将消息放入队列之前设置 Base64 编码和解码函数。If you want to store binary data, setup Base64 encoding and decoding functions before putting a message in the queue.

在队列客户端对象上配置 Base64 编码和解码函数。Configure Base64 encoding and decoding functions on the queue client object.

# Setup Base64 encoding and decoding functions
queue_client.message_encode_policy = BinaryBase64EncodePolicy()
queue_client.message_decode_policy = BinaryBase64DecodePolicy()

扫视消息Peek at messages

可以通过调用 peek_messages 方法来速览消息,而不必将其从队列中删除。You can peek at messages without removing them from the queue by calling the peek_messages method. 默认情况下,peek_messages 扫视单条消息。By default, peek_messages peeks at a single message.

# Peek at the first message
messages = queue_client.peek_messages()

for peeked_message in messages:
    print("Peeked message: " + peeked_message.content)

更改已排队消息的内容Change the contents of a queued message

可以更改队列中现有消息的内容。You can change the contents of a message in-place in the queue. 如果消息表示某个任务,则可以使用此功能来更新该任务的状态。If the message represents a task, you can use this feature to update the status of the task.

以下代码使用 update_message 方法来更新消息。The code below uses the update_message method to update a message. 可见性超时设为 0,这意味着消息会立刻出现且内容将更新。The visibility timeout is set to 0, meaning the message appears immediately and the content is updated.

messages = queue_client.receive_messages()
list_result = next(messages)

message = queue_client.update_message(
        list_result.id, list_result.pop_receipt,
        visibility_timeout=0, content=u'Hello World Again')

print("Updated message to: " + message.content)

获取队列长度Get the queue length

可以获取队列中消息的估计数。You can get an estimate of the number of messages in a queue.

get_queue_properties 方法要求队列服务返回有关队列的属性,包括 approximate_message_countThe get_queue_properties method asks the queue service to return properties about the queue, including the approximate_message_count.

properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))

结果仅是近似值,因为在队列服务响应请求之后,可能添加或删除了消息。The result is only approximate because messages can be added or removed after the queue service responds to your request.

取消消息的排队Dequeue messages

通过两个步骤从队列中删除消息。Remove a message from a queue in two steps. 如果你的代码未能处理消息,此两步过程可确保你可以获取同一消息并重试。If your code fails to process a message, this two-step process ensures that you can get the same message and try again. 在消息成功处理后调用 delete_messageCall delete_message after the message has been successfully processed.

在调用 receive_messages 时,默认情况下会获得队列中的下一条消息。When you call receive_messages, you get the next message in the queue by default. receive_messages 返回的消息对于从此队列读取消息的任何其他代码都是不可见的。A message returned from receive_messages becomes invisible to any other code reading messages from this queue. 默认情况下,此消息持续 30 秒不可见。By default, this message stays invisible for 30 seconds. 若要完成从队列中删除消息,还必须调用 delete_messageTo finish removing the message from the queue, you must also call delete_message.

messages = queue_client.receive_messages()

for message in messages:
    print("Dequeueing message: " + message.content)
    queue_client.delete_message(message.id, message.pop_receipt)

可通过两种方式自定义队列中消息的检索。There are two ways you can customize message retrieval from a queue. 首先,可获取一批消息(最多 32 条)。First, you can get a batch of messages (up to 32). 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。Second, you can set a longer or shorter invisibility timeout, allowing your code more or less time to fully process each message.

以下代码示例使用 receive_messages 方法成批获取消息。The following code example uses the receive_messages method to get messages in batches. 然后,它使用嵌套的 for 循环来处理每批中的每条消息。Then it processes each message within each batch by using a nested for loop. 它还将每条消息的不可见超时时间设置为 5 分钟。It also sets the invisibility timeout to five minutes for each message.

messages = queue_client.receive_messages(messages_per_page=5, visibility_timeout=5*60)

for msg_batch in messages.by_page():
   for msg in msg_batch:
      print("Batch dequeue message: " + msg.content)
      queue_client.delete_message(msg)

删除队列Delete a queue

若要删除队列及其中包含的所有消息,请调用 delete_queue 方法。To delete a queue and all the messages contained in it, call the delete_queue method.

print("Deleting queue: " + queue_client.queue_name)
queue_client.delete_queue()

提示

尝试 Azure 存储资源管理器Try the Azure Storage Explorer

Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。Azure Storage Explorer is a free, standalone app from Microsoft that enables you to work visually with Azure Storage data on Windows, macOS, and Linux.

后续步骤Next steps

在了解了队列存储的基础知识后,可单击下面的链接了解详细信息。Now that you've learned the basics of queue storage, follow these links to learn more.