如何通过 Java 使用队列存储How to use Queue storage from Java

概述Overview

本指南演示了如何为使用 Azure 队列存储服务的常见方案编写代码。This guide will show you how to code for common scenarios using the Azure Queue storage service. 这些示例用 Java 编写并使用用于 Java 的 Azure 存储 SDKThe samples are written in Java and use the Azure Storage SDK for Java. 方案包括插入速览获取删除队列消息。Scenarios include inserting, peeking, getting, and deleting queue messages. 还介绍了用于创建删除队列的代码。Code for creating and deleting queues is also covered. 有关队列的详细信息,请参阅后续步骤部分。For more information on queues, see the Next steps section.

什么是队列存储?What is Queue storage?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。Azure Queue storage is a service for storing large numbers of messages that can be accessed from anywhere in the world via authenticated calls using HTTP or HTTPS. 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。A single queue message can be up to 64 KB in size, and a queue can contain millions of messages, up to the total capacity limit of a storage account. 队列存储通常用于创建要异步处理的积压工作 (backlog)。Queue storage is often used to create a backlog of work to process asynchronously.

队列服务概念Queue service concepts

Azure 队列服务包含以下组件:The Azure Queue service contains the following components:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。Storage Account: All access to Azure Storage is done through a storage account. 有关存储帐户的详细信息,请参阅存储帐户概述For more information about storage accounts, see Storage account overview.

  • 队列: 一个队列包含一组消息。Queue: A queue contains a set of messages. 所有消息必须位于相应的队列中。All messages must be in a queue. 请注意,队列名称必须全部小写。Note that the queue name must be all lowercase. 有关命名队列的信息,请参阅 命名队列和元数据For information on naming queues, see Naming Queues and Metadata.

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。Message: A message, in any format, of up to 64 KB. 消息可以保留在队列中的最长时间为 7 天。The maximum time that a message can remain in the queue is 7 days. 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。For version 2017-07-29 or later, the maximum time-to-live can be any positive number, or -1 indicating that the message doesn't expire. 如果省略此参数,则默认的生存时间为 7 天。If this parameter is omitted, the default time-to-live is seven days.

  • URL 格式: 使用以下 URL 格式对队列进行寻址: http://<storage account>.queue.core.chinacloudapi.cn/<queue>URL format: Queues are addressable using the following URL format: http://<storage account>.queue.core.chinacloudapi.cn/<queue>

    可使用以下 URL 访问示意图中的某个队列:The following URL addresses a queue in the diagram:

    http://myaccount.queue.core.chinacloudapi.cn/incoming-orders

创建 Azure 存储帐户Create an Azure storage account

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户The easiest way to create your first Azure storage account is by using the Azure portal. 若要了解更多信息,请参阅 创建存储帐户To learn more, see Create a storage account.

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。You can also create an Azure storage account by using Azure PowerShell, Azure CLI, or the Azure Storage Resource Provider for .NET.

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。If you prefer not to create a storage account in Azure at this time, you can also use the Azurite storage emulator to run and test your code in a local environment. 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发For more information, see Use the Azurite emulator for local Azure Storage development.

创建 Java 应用程序Create a Java application

首先,验证你的开发系统是否满足用于 Java v12 的 Azure 队列存储客户端库中列出的先决条件。First, verify your development system meets the prerequisites listed in Azure Queue storage client library for Java v12.

若要创建名为 queues-how-to-v12 的 Java 应用程序,请执行以下操作:To create a Java application named queues-how-to-v12:

  1. 在控制台窗口(例如 cmd、PowerShell 或 Bash)中,使用 Maven 创建名为 queues-how-to-v12 的新控制台应用。In a console window (such as cmd, PowerShell, or Bash), use Maven to create a new console app with the name queues-how-to-v12. 键入以下“mvn”命令,创建“Hello World!”Type the following mvn command to create a "Hello world!" Java 项目。Java project.

     mvn archetype:generate \
         --define interactiveMode=n \
         --define groupId=com.queues.howto \
         --define artifactId=queues-howto-v12 \
         --define archetypeArtifactId=maven-archetype-quickstart \
         --define archetypeVersion=1.4
    
     mvn archetype:generate `
         --define interactiveMode=n `
         --define groupId=com.queues.howto `
         --define artifactId=queues-howto-v12 `
         --define archetypeArtifactId=maven-archetype-quickstart `
         --define archetypeVersion=1.4
    
  2. 生成项目的输出应如下所示:The output from generating the project should look something like this:

    [INFO] Scanning for projects...
    [INFO]
    [INFO] ------------------< org.apache.maven:standalone-pom >-------------------
    [INFO] Building Maven Stub Project (No POM) 1
    [INFO] --------------------------------[ pom ]---------------------------------
    [INFO]
    [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>>
    [INFO]
    [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<<
    [INFO]
    [INFO]
    [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom ---
    [INFO] Generating project in Batch mode
    [INFO] ----------------------------------------------------------------------------
    [INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4
    [INFO] ----------------------------------------------------------------------------
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: packageInPathFormat, Value: com/queues/howto
    [INFO] Parameter: version, Value: 1.0-SNAPSHOT
    [INFO] Parameter: package, Value: com.queues.howto
    [INFO] Parameter: groupId, Value: com.queues.howto
    [INFO] Parameter: artifactId, Value: queues-howto-v12
    [INFO] Project created from Archetype in dir: C:\queues\queues-howto-v12
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  6.775 s
    [INFO] Finished at: 2020-08-17T15:27:31-07:00
    [INFO] ------------------------------------------------------------------------
        ```
    
    
  3. Switch to the newly created queues-howto-v12 directory.

    cd queues-howto-v12
    

安装包Install the package

在文本编辑器中打开 pom.xml 文件。Open the pom.xml file in your text editor. 将以下依赖项元素添加到依赖项组。Add the following dependency element to the group of dependencies.

<dependency>
  <groupId>com.azure</groupId>
  <artifactId>azure-storage-queue</artifactId>
  <version>12.6.0</version>
</dependency>

配置应用程序以访问队列存储Configure your application to access queue storage

将下列 import 语句添加到需要在其中使用 Azure 存储 API 来访问队列的 Java 文件的顶部:Add the following import statements to the top of the Java file where you want to use Azure storage APIs to access queues:

// Include the following imports to use queue APIs
import com.azure.core.util.*;
import com.azure.storage.queue.*;
import com.azure.storage.queue.models.*;

设置 Azure 存储连接字符串Set up an Azure storage connection string

Azure 存储客户端使用存储连接字符串来访问数据管理服务。An Azure storage client uses a storage connection string for accessing data management services. 获取 Azure 门户中列出的你的存储帐户的名称和主访问密钥.Get the name and the Primary access key for your storage account listed in the Azure portal. 将它们用作连接字符串中的 AccountName 和 AccountKey 值。Use them as the AccountName and AccountKey values in the connection string. 此示例演示如何声明一个静态字段以保存连接字符串:This example shows how you can declare a static field to hold the connection string:

// Define the connection-string with your values
final String connectStr = 
    "DefaultEndpointsProtocol=https;" +
    "AccountName=your_storage_account;" +
    "AccountKey=your_storage_account_key;" +
    "EndpointSuffix=core.chinacloudapi.cn";

以下示例假设你有一个包含存储连接字符串的 String 对象。The following samples assume that you have a String object containing the storage connection string.

如何:创建队列How to: Create a queue

QueueClient 对象包含用于与队列进行交互的操作。A QueueClient object contains the operations for interacting with a queue. 以下代码创建一个 QueueClient 对象。The following code creates a QueueClient object. 使用该 QueueClient 对象创建要使用的队列。Use the QueueClient object to create the queue you want to use.

public static String createQueue(String connectStr)
{
    try
    {
        // Create a unique name for the queue
        String queueName = "queue-" + java.util.UUID.randomUUID();

        System.out.println("Creating queue: " + queueName);

        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queue = new QueueClientBuilder()
                                .connectionString(connectStr)
                                .queueName(queueName)
                                .buildClient();

        // Create the queue
        queue.create();
        return queue.getQueueName();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println("Error code: " + e.getErrorCode() + "Message: " + e.getMessage());
        return null;
    }
}

如何:向队列添加消息How to: Add a message to a queue

若要在现有队列中插入消息,请调用 sendMessage 方法。To insert a message into an existing queue, call the sendMessage method. 消息可以是字符串(UTF-8 格式)或字节数组。A message can be either a string (in UTF-8 format) or a byte array. 下面是将字符串消息发送到队列的代码。Here is code that sends a string message into the queue.

public static void addQueueMessage
    (String connectStr, String queueName, String messageText)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Adding message to the queue: " + messageText);

        // Add a message to the queue
        queueClient.sendMessage(messageText);
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:扫视下一条消息How to: Peek at the next message

通过调用 peekMessage,可以扫视队列前面的消息,而不会从队列中删除它。You can peek at the message in the front of a queue without removing it from the queue by calling peekMessage.

public static void peekQueueMessage
    (String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Peek at the first message
        PeekedMessageItem peekedMessageItem = queueClient.peekMessage();
        System.out.println("Peeked message: " + peekedMessageItem.getMessageText());
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:更改已排队消息的内容How to: Change the contents of a queued message

可以更改队列中现有消息的内容。You can change the contents of a message in-place in the queue. 如果消息表示一个工作任务,则可以使用此功能来更新状态。If the message represents a work task, you could use this feature to update the status. 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 30 秒。The following code updates a queue message with new contents and sets the visibility timeout to extend another 30 seconds. 延长可见性超时会再给客户端 30 秒时间来继续处理该消息。Extending the visibility timeout gives the client another 30 seconds to continue working on the message. 你还可以保留重试计数。You could keep a retry count, as well. 如果消息重试了 n 次以上,则你会将其删除。If the message is retried more than n times, you would delete it. 此方案可避免每次处理某条消息时都触发应用程序错误。This scenario protects against a message that triggers an application error each time it's processed.

下面的代码示例将在消息队列中进行搜索,查找与搜索字符串匹配的第一个消息内容,对消息内容进行修改,然后退出。The following code sample searches through the queue of messages, locates the first message content that matches a search string, modifies the message content, and exits.

public static void updateQueueMessage
    (String connectStr, String queueName,
    String searchString, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 32
        final int MAX_MESSAGES = 32;

        // Iterate through the queue messages
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES))
        {
            // Check for a specific string
            if (message.getMessageText().equals(searchString))
            {
                // Update the message to be visible in 30 seconds
                queueClient.updateMessage(message.getMessageId(),
                                          message.getPopReceipt(),
                                          updatedContents,
                                          Duration.ofSeconds(30));
                System.out.println(
                    String.format("Found message: \'%s\' and updated it to \'%s\'",
                                    searchString,
                                    updatedContents)
                                  );
                break;
            }
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

以下代码示例只更新队列中的第一个可见消息。The following code sample updates just the first visible message in the queue.

public static void updateFirstQueueMessage
    (String connectStr, String queueName, String updatedContents)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            // Update the message to be visible in 30 seconds
            UpdateMessageResult result = queueClient.updateMessage(message.getMessageId(),
                                                                   message.getPopReceipt(),
                                                                   updatedContents,
                                                                   Duration.ofSeconds(30));
            System.out.println("Updated the first message with the receipt: " +
                    result.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:获取队列长度How to: Get the queue length

可以获取队列中消息的估计数。You can get an estimate of the number of messages in a queue.

getProperties 方法要求队列服务提供一些当前值。The getProperties method asks the Queue service for several current values. 其中一个值是队列中消息的计数。One of the values is a count of how many messages are in a queue. 此计数仅为近似值,因为可能会在请求后添加或删除消息。The count is only approximate because messages can be added or removed after your request. getApproximateMessageCount 方法返回通过调用 getProperties 检索到的最后一个值,而不调用队列服务。The getApproximateMessageCount method returns the last value retrieved by the call to getProperties, without calling the Queue service.

public static void getQueueLength(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        QueueProperties properties = queueClient.getProperties();
        long messageCount = properties.getApproximateMessagesCount();

        System.out.println(String.format("Queue length: %d", messageCount));
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:取消对下一条消息的排队How to: Dequeue the next message

代码通过两个步骤来取消对队列中某条消息的排队。Your code dequeues a message from a queue in two steps. 调用 receiveMessage 时,可获得队列中的下一条消息。When you call receiveMessage, you get the next message in a queue. 从 receiveMessage 返回的消息对从此队列读取消息的任何其他代码不可见。A message returned from receiveMessage becomes invisible to any other code reading messages from this queue. 默认情况下,此消息持续 30 秒不可见。By default, this message stays invisible for 30 seconds. 要从队列中删除消息,还必须调用 deleteMessageTo finish removing the message from the queue, you must also call deleteMessage. 如果你的代码未能处理消息,此两步过程可确保你可以获取同一消息并重试。If your code fails to process a message, this two-step process ensures that you can get the same message and try again. 代码在处理消息后会立即调用 deleteMessageYour code calls deleteMessage right after the message has been processed.

public static void dequeueMessage(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // Get the first queue message
        QueueMessageItem message = queueClient.receiveMessage();

        // Check for a specific string
        if (null != message)
        {
            System.out.println("Dequeing message: " + message.getMessageText());

            // Delete the message
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
        else
        {
            System.out.println("No visible messages in queue");
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

用于取消对消息进行排队的其他选项Additional options for dequeuing messages

可以通过两种方式自定义队列中的消息检索。There are two ways to customize message retrieval from a queue. 首先,获取一批消息(最多 32 条)。First, get a batch of messages (up to 32). 其次,设置更长或更短的不可见超时时间,从而允许代码使用更多或更少的时间来完全处理每个消息。Second, set a longer or shorter invisibility timeout, allowing your code more or less time to fully process each message.

下面的代码示例使用 receiveMessages 方法,目的是在一次调用中获取 20 条消息。The following code example uses the receiveMessages method to get 20 messages in one call. 然后,它会使用 for 循环处理每条消息。Then it processes each message using a for loop. 它还将每条消息的不可见超时设置为 5 分钟(300 秒)。It also sets the invisibility timeout to five minutes (300 seconds) for each message. 超时同时针对所有消息启动。The timeout starts for all messages at the same time. 自调用 receiveMessages 起经过五分钟后,未删除的任何消息都将再次变得可见。When five minutes have passed since the call to receiveMessages, any messages not deleted will become visible again.

public static void dequeueMessages(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        // The maximum number of messages to retrieve is 20
        final int MAX_MESSAGES = 20;

        // Retrieve 20 messages from the queue with a
        // visibility timeout of 300 seconds (5 minutes)
        for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES,
                Duration.ofSeconds(300), Duration.ofSeconds(1), new Context("key1", "value1")))
        {
            // Do processing for all messages in less than 5 minutes,
            // deleting each message after processing.
            System.out.println("Dequeing message: " + message.getMessageText());
            queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:列出队列How to: List the queues

要获取当前队列的列表,请调用 QueueServiceClient.listQueues() 方法,它将返回 QueueItem 对象的集合。To obtain a list of the current queues, call the QueueServiceClient.listQueues() method, which will return a collection of QueueItem objects.

public static void listQueues(String connectStr)
{
    try
    {
        // Instantiate a QueueServiceClient which will be
        // used to list the queues
        QueueServiceClient queueServiceClient = new QueueServiceClientBuilder()
                                    .connectionString(connectStr)
                                    .buildClient();

        // Loop through the collection of queues.
        for (QueueItem queue : queueServiceClient.listQueues())
        {
            // Output each queue name.
            System.out.println(queue.getName());
        }
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

如何:删除队列How to: Delete a queue

若要删除队列及其包含的所有消息,请对 QueueClient 对象调用 delete 方法。To delete a queue and all the messages contained in it, call the delete method on the QueueClient object.

public static void deleteMessageQueue(String connectStr, String queueName)
{
    try
    {
        // Instantiate a QueueClient which will be
        // used to create and manipulate the queue
        QueueClient queueClient = new QueueClientBuilder()
                                    .connectionString(connectStr)
                                    .queueName(queueName)
                                    .buildClient();

        System.out.println("Deleting queue: " + queueClient.getQueueName());

        // Delete the queue
        queueClient.delete();
    }
    catch (QueueStorageException e)
    {
        // Output the exception message and stack trace
        System.out.println(e.getMessage());
        e.printStackTrace();
    }
}

提示

查看 Azure 存储代码示例存储库Check out the Azure Storage code samples repository

如需易用且能够下载和运行的端到端 Azure 存储代码示例,请查看我们的 Azure 存储示例列表。For easy-to-use end-to-end Azure Storage code samples that you can download and run, please check out our list of Azure Storage Samples.

后续步骤Next steps

现在,了解了有关队列存储的基础知识,可单击下面的链接来了解更复杂的存储任务。Now that you've learned the basics of queue storage, follow these links to learn about more complex storage tasks.