如何通过 Java 使用队列存储How to use Queue storage from Java

Tip

查看 Azure 存储代码示例存储库Check out the Azure Storage code samples repository

如需易用且能够下载和运行的端到端 Azure 存储代码示例,请查看我们的 Azure 存储示例列表。For easy-to-use end-to-end Azure Storage code samples that you can download and run, please check out our list of Azure Storage Samples.

本指南演示如何使用 Azure 队列存储服务执行常见方案。This guide will show you how to perform common scenarios using the Azure Queue storage service. 这些示例用 Java 编写并使用 Azure Storage SDK for JavaThe samples are written in Java and use the Azure Storage SDK for Java. 介绍的方案包括插入 、扫视 、获取 和删除 队列消息以及创建 和删除 队列。The scenarios covered include inserting, peeking, getting, and deleting queue messages, as well as creating and deleting queues. 有关队列的详细信息,请参阅后续步骤部分。For more information on queues, see the Next steps section.

Note

SDK 提供给在 Android 设备上使用 Azure 存储的开发人员。An SDK is available for developers who are using Azure Storage on Android devices. 有关详细信息,请参阅 Azure Storage SDK for AndroidFor more information, see the Azure Storage SDK for Android.

什么是队列存储?What is Queue storage?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。Azure Queue storage is a service for storing large numbers of messages that can be accessed from anywhere in the world via authenticated calls using HTTP or HTTPS. 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。A single queue message can be up to 64 KB in size, and a queue can contain millions of messages, up to the total capacity limit of a storage account. 队列存储通常用于创建要异步处理的积压工作 (backlog)。Queue storage is often used to create a backlog of work to process asynchronously.

队列服务概念Queue service concepts

Azure 队列服务包含以下组件:The Azure Queue service contains the following components:

Azure 队列服务组件

  • URL 格式: 使用以下 URL 格式对队列进行寻址: http://<storage account>.queue.core.chinacloudapi.cn/<queue>URL format: Queues are addressable using the following URL format: http://<storage account>.queue.core.chinacloudapi.cn/<queue>

    可使用以下 URL 访问示意图中的某个队列:The following URL addresses a queue in the diagram:

    http://myaccount.queue.core.chinacloudapi.cn/images-to-download

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。Storage Account: All access to Azure Storage is done through a storage account. 有关存储帐户的详细信息,请参阅存储帐户概述For more information about storage accounts, see Storage account overview.

  • 队列: 一个队列包含一组消息。Queue: A queue contains a set of messages. 所有消息必须位于相应的队列中。All messages must be in a queue. 请注意,队列名称必须全部小写。Note that the queue name must be all lowercase. 有关命名队列的详细信息,请参阅 命名队列和元数据For information on naming queues, see Naming Queues and Metadata.

  • 消息: 一条消息(不管采用何种格式)的最大大小为 64 KB。Message: A message, in any format, of up to 64 KB. 消息可以保留在队列中的最长时间为 7 天。The maximum time that a message can remain in the queue is 7 days. 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。For version 2017-07-29 or later, the maximum time-to-live can be any positive number, or -1 indicating that the message doesn't expire. 如果省略此参数,则默认的生存时间为 7 天。If this parameter is omitted, the default time-to-live is seven days.

创建 Azure 存储帐户Create an Azure storage account

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户The easiest way to create your first Azure storage account is by using the Azure portal. 若要了解更多信息,请参阅 创建存储帐户To learn more, see Create a storage account.

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。You can also create an Azure storage account by using Azure PowerShell, Azure CLI, or the Azure Storage Resource Provider for .NET.

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azure 存储模拟器在本地环境中运行和测试代码。If you prefer not to create a storage account in Azure at this time, you can also use the Azure storage emulator to run and test your code in a local environment. 有关详细信息,请参阅 使用 Azure 存储模拟器进行开发和测试For more information, see Use the Azure Storage Emulator for Development and Testing.

创建 Java 应用程序Create a Java application

在本指南中,将使用可以在本地 Java 应用程序中运行或在 Azure 的 Web 应用程序中运行的代码中使用的存储功能。In this guide, you will use storage features that can be run within a Java application locally, or in code running within a web application in Azure.

为此,需要安装 Java 开发工具包 (JDK),并在 Azure 订阅中创建一个 Azure 存储帐户。To do so, you will need to install the Java Development Kit (JDK) and create an Azure storage account in your Azure subscription. 完成此操作后,需要验证开发系统是否满足最低要求和 GitHub 上的 Azure Storage SDK for Java 存储库中列出的依赖项。Once you have done so, you will need to verify that your development system meets the minimum requirements and dependencies that are listed in the Azure Storage SDK for Java repository on GitHub. 如果系统满足这些要求,可以按照说明下载和安装系统中该存储库的 Azure Storage Libraries for Java。If your system meets those requirements, you can follow the instructions for downloading and installing the Azure Storage Libraries for Java on your system from that repository. 完成这些任务后,便能够创建一个 Java 应用程序,以便使用本文中的示例。Once you have completed those tasks, you will be able to create a Java application that uses the examples in this article.

配置应用程序以访问队列存储Configure your application to access queue storage

将下列 import 语句添加到需要在其中使用 Azure 存储 API 来访问队列的 Java 文件的顶部:Add the following import statements to the top of the Java file where you want to use Azure storage APIs to access queues:

// Include the following imports to use queue APIs.
import com.microsoft.azure.storage.*;
import com.microsoft.azure.storage.queue.*;

设置 Azure 存储连接字符串Set up an Azure storage connection string

Azure 存储客户端使用存储连接字符串来存储用于访问数据管理服务的终结点和凭据。An Azure storage client uses a storage connection string to store endpoints and credentials for accessing data management services. 在客户端应用程序中运行时,必须提供以下格式的存储连接字符串,并对 AccountName 和 AccountKey 值使用 Azure 门户中列出的存储帐户的名称和存储帐户的主访问密钥。When running in a client application, you must provide the storage connection string in the following format, using the name of your storage account and the Primary access key for the storage account listed in the Azure portal for the AccountName and AccountKey values. 此示例演示如何声明一个静态字段以保存连接字符串:This example shows how you can declare a static field to hold the connection string:

// Define the connection-string with your values.
public static final String storageConnectionString =
    "DefaultEndpointsProtocol=http;" +
    "AccountName=your_storage_account;" +
    "AccountKey=your_storage_account_key;" +
    "EndpointSuffix=core.chinacloudapi.cn";

在 Azure 的角色中运行的应用程序中,此字符串可存储在服务配置文件 ServiceConfiguration.cscfg中,并可通过调用 RoleEnvironment.getConfigurationSettings 方法进行访问。In an application running within a role in Azure, this string can be stored in the service configuration file, ServiceConfiguration.cscfg, and can be accessed with a call to the RoleEnvironment.getConfigurationSettings method. 下面是从服务配置文件中名为 StorageConnectionStringSetting 元素中获取连接字符串的示例:Here's an example of getting the connection string from a Setting element named StorageConnectionString in the service configuration file:

// Retrieve storage account from connection-string.
String storageConnectionString =
    RoleEnvironment.getConfigurationSettings().get("StorageConnectionString");

下面的示例假定使用了这两个方法之一来获取存储连接字符串。The following samples assume that you have used one of these two methods to get the storage connection string.

如何:创建队列How to: Create a queue

利用 CloudQueueClient 对象,可以获取队列的引用对象。A CloudQueueClient object lets you get reference objects for queues. 以下代码将创建 CloudQueueClient 对象。The following code creates a CloudQueueClient object. (注意:还有其他方式来创建 CloudStorageAccount 对象;有关详细信息,请参阅 Azure 存储客户端 SDK 参考中的 CloudStorageAccount 。(Note: There are additional ways to create CloudStorageAccount objects; for more information, see CloudStorageAccount in the Azure Storage Client SDK Reference.)

使用 CloudQueueClient 对象获取对要使用的队列的引用。Use the CloudQueueClient object to get a reference to the queue you want to use. 如果队列不存在,可以创建它。You can create the queue if it doesn't exist.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
       CloudStorageAccount.parse(storageConnectionString);

   // Create the queue client.
   CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

   // Retrieve a reference to a queue.
   CloudQueue queue = queueClient.getQueueReference("myqueue");

   // Create the queue if it doesn't already exist.
   queue.createIfNotExists();
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

如何:向队列添加消息How to: Add a message to a queue

要将消息插入现有队列,请先创建一个新的 CloudQueueMessageTo insert a message into an existing queue, first create a new CloudQueueMessage. 接下来,调用 addMessage 方法。Next, call the addMessage method. 可从字符串(UTF-8 格式)或字节数组创建 CloudQueueMessage 。A CloudQueueMessage can be created from either a string (in UTF-8 format) or a byte array. 以下代码创建队列(如果该队列不存在)并插入消息“Hello, World”。Here is code that creates a queue (if it doesn't exist) and inserts the message "Hello, World".

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
       CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

    // Create the queue if it doesn't already exist.
    queue.createIfNotExists();

    // Create a message and add it to the queue.
    CloudQueueMessage message = new CloudQueueMessage("Hello, World");
    queue.addMessage(message);
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

如何:扫视下一条消息How to: Peek at the next message

通过调用 peekMessage,可以扫视队列前面的消息,而不会从队列中删除它。You can peek at the message in the front of a queue without removing it from the queue by calling peekMessage.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
       CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

    // Peek at the next message.
    CloudQueueMessage peekedMessage = queue.peekMessage();

    // Output the message value.
    if (peekedMessage != null)
    {
      System.out.println(peekedMessage.getMessageContentAsString());
   }
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

如何:更改已排队消息的内容How to: Change the contents of a queued message

可以更改队列中现有消息的内容。You can change the contents of a message in-place in the queue. 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。If the message represents a work task, you could use this feature to update the status of the work task. 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。The following code updates the queue message with new contents, and sets the visibility timeout to extend another 60 seconds. 延长可见性超时将保存与消息关联的工作状态,并额外为客户端提供一分钟的时间来继续处理消息。Extending the visibility timeout saves the state of work associated with the message, and gives the client another minute to continue working on the message. 可使用此方法跟踪队列消息上的多步骤工作流,即使处理步骤因硬件或软件故障而失败,也无需从头开始操作。You could use this technique to track multi-step workflows on queue messages, without having to start over from the beginning if a processing step fails due to hardware or software failure. 通常同时保留重试计数,当消息重试次数超过 n 时再删除该消息。Typically, you would keep a retry count as well, and if the message is retried more than n times, you would delete it. 这可避免每次处理某条消息时都触发应用程序错误。This protects against a message that triggers an application error each time it is processed.

下面的代码示例将搜索队列中的消息,查找内容中第一个与“Hello, World”匹配的消息,并对消息内容进行修改并退出。The following code sample searches through the queue of messages, locates the first message that matches "Hello, World" for the content, then modifies the message content and exits.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
        CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

    // The maximum number of messages that can be retrieved is 32.
    final int MAX_NUMBER_OF_MESSAGES_TO_PEEK = 32;

    // Loop through the messages in the queue.
    for (CloudQueueMessage message : queue.retrieveMessages(MAX_NUMBER_OF_MESSAGES_TO_PEEK,1,null,null))
    {
        // Check for a specific string.
        if (message.getMessageContentAsString().equals("Hello, World"))
        {
            // Modify the content of the first matching message.
            message.setMessageContent("Updated contents.");
            // Set it to be visible in 30 seconds.
            EnumSet<MessageUpdateFields> updateFields =
                EnumSet.of(MessageUpdateFields.CONTENT,
                MessageUpdateFields.VISIBILITY);
            // Update the message.
            queue.updateMessage(message, 30, updateFields, null, null);
            break;
        }
    }
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

或者,以下代码示例只更新了队列中第一个可见消息Alternatively, the following code sample updates just the first visible message on the queue.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
       CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

    // Retrieve the first visible message in the queue.
    CloudQueueMessage message = queue.retrieveMessage();

    if (message != null)
    {
        // Modify the message content.
        message.setMessageContent("Updated contents.");
        // Set it to be visible in 60 seconds.
        EnumSet<MessageUpdateFields> updateFields =
            EnumSet.of(MessageUpdateFields.CONTENT,
            MessageUpdateFields.VISIBILITY);
        // Update the message.
        queue.updateMessage(message, 60, updateFields, null, null);
    }
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

如何:获取队列长度How to: Get the queue length

可以获取队列中消息的估计数。You can get an estimate of the number of messages in a queue. downloadAttributes 方法会询问队列服务一些当前值,包括队列中消息的计数。The downloadAttributes method asks the Queue service for several current values, including a count of how many messages are in a queue. 此计数仅为近似值,因为只能在队列服务响应请求后添加或删除消息。The count is only approximate because messages can be added or removed after the Queue service responds to your request. getApproximateMessageCount 方法返回通过调用 downloadAttributes 检索到的最后一个值,而不会调用队列服务。The getApproximateMessageCount method returns the last value retrieved by the call to downloadAttributes, without calling the Queue service.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
       CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

   // Download the approximate message count from the server.
    queue.downloadAttributes();

    // Retrieve the newly cached approximate message count.
    long cachedMessageCount = queue.getApproximateMessageCount();

    // Display the queue length.
    System.out.println(String.format("Queue length: %d", cachedMessageCount));
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

如何:取消对下一条消息的排队How to: Dequeue the next message

代码通过两个步骤来取消对队列中某条消息的排队。Your code dequeues a message from a queue in two steps. 在调用 retrieveMessage时,你获得队列中的下一条消息。When you call retrieveMessage, you get the next message in a queue. retrieveMessage 返回的消息变得对从此队列读取消息的任何其他代码不可见。A message returned from retrieveMessage becomes invisible to any other code reading messages from this queue. 默认情况下,此消息持续 30 秒不可见。By default, this message stays invisible for 30 seconds. 要从队列中删除消息,还必须调用 deleteMessageTo finish removing the message from the queue, you must also call deleteMessage. 此删除消息的两步过程可确保,如果代码因硬件或软件故障而无法处理消息,则代码的其他实例可以获取相同消息并重试。This two-step process of removing a message assures that if your code fails to process a message due to hardware or software failure, another instance of your code can get the same message and try again. 代码在处理消息后会立即调用 deleteMessageYour code calls deleteMessage right after the message has been processed.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
        CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

    // Retrieve the first visible message in the queue.
    CloudQueueMessage retrievedMessage = queue.retrieveMessage();

    if (retrievedMessage != null)
    {
        // Process the message in less than 30 seconds, and then delete the message.
        queue.deleteMessage(retrievedMessage);
    }
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

用于取消对消息进行排队的其他选项Additional options for dequeuing messages

可通过两种方式自定义队列中消息的检索。There are two ways you can customize message retrieval from a queue. 首先,可获取一批消息(最多 32 条)。First, you can get a batch of messages (up to 32). 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。Second, you can set a longer or shorter invisibility timeout, allowing your code more or less time to fully process each message.

下面的代码示例使用 retrieveMessages 方法以在一次调用中获取 20 条消息。The following code example uses the retrieveMessages method to get 20 messages in one call. 然后,它会使用 for 循环处理每条消息。Then it processes each message using a for loop. 它还将每条消息的不可见超时设置为 5 分钟(300 秒)。It also sets the invisibility timeout to five minutes (300 seconds) for each message. 这五分钟超时对于所有消息都是同时开始的,因此在调用 retrieveMessages 五分钟后,尚未删除的任何消息都会再次变得可见。The five minutes start for all messages at the same time, so when five minutes have passed since the call to retrieveMessages, any messages that have not been deleted will become visible again.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
        CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

    // Retrieve 20 messages from the queue with a visibility timeout of 300 seconds.
    for (CloudQueueMessage message : queue.retrieveMessages(20, 300, null, null)) {
        // Do processing for all messages in less than 5 minutes,
        // deleting each message after processing.
        queue.deleteMessage(message);
    }
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

如何:列出队列How to: List the queues

若要获取当前队列的列表,请调用 CloudQueueClient.listQueues() 方法,它将返回 CloudQueue 对象的集合。To obtain a list of the current queues, call the CloudQueueClient.listQueues() method, which will return a collection of CloudQueue objects.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
        CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient =
        storageAccount.createCloudQueueClient();

    // Loop through the collection of queues.
    for (CloudQueue queue : queueClient.listQueues())
    {
        // Output each queue name.
        System.out.println(queue.getName());
    }
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

如何:删除队列How to: Delete a queue

若要删除队列及其包含的所有消息,请对 CloudQueue 对象调用 deleteIfExists 方法。To delete a queue and all the messages contained in it, call the deleteIfExists method on the CloudQueue object.

try
{
    // Retrieve storage account from connection-string.
    CloudStorageAccount storageAccount =
        CloudStorageAccount.parse(storageConnectionString);

    // Create the queue client.
    CloudQueueClient queueClient = storageAccount.createCloudQueueClient();

    // Retrieve a reference to a queue.
    CloudQueue queue = queueClient.getQueueReference("myqueue");

    // Delete the queue if it exists.
    queue.deleteIfExists();
}
catch (Exception e)
{
    // Output the stack trace.
    e.printStackTrace();
}

后续步骤Next steps

现在,了解了有关队列存储的基础知识,可单击下面的链接来了解更复杂的存储任务。Now that you've learned the basics of queue storage, follow these links to learn about more complex storage tasks.