如何通过 Java 使用服务总线队列How to use Service Bus queues with Java

在本教程中,你将了解如何创建 Java 应用程序来向服务总线队列发送消息以及从中接收消息。In this tutorial, you learn how to create Java applications to send messages to and receive messages from a Service Bus queue.

Note

可以在 GitHub 上的 azure-service-bus 存储库中找到 Java 示例。You can find Java samples on GitHub in the azure-service-bus repository.

先决条件Prerequisites

  1. Azure 订阅。An Azure subscription. 若要完成本教程,需要一个 Azure 帐户。To complete this tutorial, you need an Azure account. 你可以激活 MSDN 订阅者权益注册试用帐户You can activate your MSDN subscriber benefits or sign up for a trial account.
  2. 如果没有可使用的队列,请遵循使用 Azure 门户创建服务总线队列一文来创建队列。If you don't have a queue to work with, follow steps in the Use Azure portal to create a Service Bus queue article to create a queue.
    1. 阅读服务总线队列的快速概述Read the quick overview of Service Bus queues.
    2. 创建一个服务总线命名空间Create a Service Bus namespace.
    3. 获取连接字符串Get the connection string.
    4. 创建一个服务总线队列Create a Service Bus queue.
  3. 安装 Azure SDK for JavaInstall Azure SDK for Java.

配置应用程序以使用服务总线Configure your application to use Service Bus

在生成本示例之前,请确保已安装 Azure SDK for JavaMake sure you have installed the Azure SDK for Java before building this sample. 如果使用 Eclipse,则可以安装包含 Azure SDK for Java 的用于 Eclipse 的 Azure 工具包If you are using Eclipse, you can install the Azure Toolkit for Eclipse that includes the Azure SDK for Java. 然后,用户可以将21Vianet Azure Libraries for Java 添加到项目:You can then add the 21Vianet Azure Libraries for Java to your project:

将以下 import 语句添加到 Java 文件顶部:Add the following import statements to the top of the Java file:

// Include the following imports to use Service Bus APIs
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;

import static java.nio.charset.StandardCharsets.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

import org.apache.commons.cli.*;

向队列发送消息Send messages to a queue

要将消息发送到服务总线队列,应用程序将实例化 QueueClient 对象并异步发送消息。To send messages to a Service Bus Queue, your application instantiates a QueueClient object and sends messages asynchronously. 以下代码显示如何为通过门户创建的队列发送消息。The following code shows how to send a message for a Queue that was created through the portal.

public void run() throws Exception {
    // Create a QueueClient instance and then asynchronously send messages.
    // Close the sender once the send operation is complete.
    QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
    this.sendMessageAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());

    sendClient.close();
}

    CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
        List<HashMap<String, String>> data =
                GSON.fromJson(
                        "[" +
                                "{'name' = 'Einstein', 'firstName' = 'Albert'}," +
                                "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
                                "{'name' = 'Curie', 'firstName' = 'Marie'}," +
                                "{'name' = 'Hawking', 'firstName' = 'Steven'}," +
                                "{'name' = 'Newton', 'firstName' = 'Isaac'}," +
                                "{'name' = 'Bohr', 'firstName' = 'Niels'}," +
                                "{'name' = 'Faraday', 'firstName' = 'Michael'}," +
                                "{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
                                "{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
                                "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
                                "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType());

        List<CompletableFuture> tasks = new ArrayList<>();
        for (int i = 0; i < data.size(); i++) {
            final String messageId = Integer.toString(i);
            Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
            message.setContentType("application/json");
            message.setLabel("Scientist");
            message.setMessageId(messageId);
            message.setTimeToLive(Duration.ofMinutes(2));
            System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
            tasks.add(
                    sendClient.sendAsync(message).thenRunAsync(() -> {
                        System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
                    }));
        }
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }

发送到服务总线队列以及从服务总线队列收到的消息是 Message 类的实例。Messages sent to, and received from Service Bus queues are instances of the Message class. Message 对象包含一组标准属性(如 Label 和 TimeToLive)、一个用来保存自定义应用程序特定属性的字典以及大量任意应用程序数据。Message objects have a set of standard properties (such as Label and TimeToLive), a dictionary that is used to hold custom application-specific properties, and a body of arbitrary application data. 应用程序可通过将任何可序列化对象传入到 Message 的构造函数中来设置消息的正文,然后将使用适当的序列化程序来序列化对象。An application can set the body of the message by passing any serializable object into the constructor of the Message, and the appropriate serializer will then be used to serialize the object. 或者,可提供 java.IO.InputStream 对象。Alternatively, you can provide a java.IO.InputStream object.

服务总线队列在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。Service Bus queues support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 一个队列可包含的消息数不受限制,但消息的总大小受限。There is no limit on the number of messages held in a queue but there is a cap on the total size of the messages held by a queue. 此队列大小是在创建时定义的,上限为 5 GB。This queue size is defined at creation time, with an upper limit of 5 GB.

从队列接收消息Receive messages from a queue

从队列接收消息的主要方法是使用 ServiceBusContract 对象。The primary way to receive messages from a queue is to use a ServiceBusContract object. 收到的消息可在两种不同模式下工作:ReceiveAndDeletePeekLockReceived messages can work in two different modes: ReceiveAndDelete and PeekLock.

当使用 ReceiveAndDelete 模式时,接收是一项单次操作,即,服务总线接收到队列中某条消息的读取请求时,会将该消息标记为“已使用”并将其返回给应用程序。When using the ReceiveAndDelete mode, receive is a single-shot operation - that is, when Service Bus receives a read request for a message in a queue, it marks the message as being consumed and returns it to the application. ReceiveAndDelete 模式(默认模式)是最简单的模式,最适合应用程序可容忍出现故障时不处理消息的情景。ReceiveAndDelete mode (which is the default mode) is the simplest model and works best for scenarios in which an application can tolerate not processing a message in the event of a failure. 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。To understand this, consider a scenario in which the consumer issues the receive request and then crashes before processing it. 由于服务总线已将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它就漏掉了在发生故障前使用的消息。Because Service Bus has marked the message as being consumed, then when the application restarts and begins consuming messages again, it has missed the message that was consumed prior to the crash.

在 PeekLock 模式下,接收变成了一个两阶段操作,从而有可能支持无法允许遗漏消息的应用程序。In PeekLock mode, receive becomes a two stage operation, which makes it possible to support applications that cannot tolerate missing messages. 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,然后将该消息返回到应用程序。When Service Bus receives a request, it finds the next message to be consumed, locks it to prevent other consumers receiving it, and then returns it to the application. 应用程序完成消息处理(或可靠地存储消息以供将来处理)后,将通过对收到的消息调用 Delete 完成接收过程的第二个阶段。After the application finishes processing the message (or stores it reliably for future processing), it completes the second stage of the receive process by calling Delete on the received message. 发现“Delete”调用时,服务总线会将消息标记为“已使用”,并将消息从队列中删除。When Service Bus sees the Delete call, it marks the message as being consumed and remove it from the queue.

以下示例演示如何使用 PeekLock 模式(非默认模式)接收和处理消息。The following example demonstrates how messages can be received and processed using PeekLock mode (not the default mode). 下面的示例将执行无限循环并在消息达到我们的 TestQueue 后进行处理:The example below does an infinite loop and processes messages as they arrive into our TestQueue:

    public void run() throws Exception {
        // Create a QueueClient instance for receiving using the connection string builder
        // We set the receive mode to "PeekLock", meaning the message is delivered
        // under a lock and must be acknowledged ("completed") to be removed from the queue
        QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
        this.registerReceiver(receiveClient);

        // shut down receiver to close the receive loop
        receiveClient.close();
    }
    void registerReceiver(QueueClient queueClient) throws Exception {
        // register the RegisterMessageHandler callback
        queueClient.registerMessageHandler(new IMessageHandler() {
        // callback invoked when the message handler loop has obtained a message
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
            // receives message is passed to callback
                if (message.getLabel() != null &&
                    message.getContentType() != null &&
                    message.getLabel().contentEquals("Scientist") &&
                    message.getContentType().contentEquals("application/json")) {

                        byte[] body = message.getBody();
                        Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

                        System.out.printf(
                            "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                            "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
                            message.getMessageId(),
                            message.getSequenceNumber(),
                            message.getEnqueuedTimeUtc(),
                            message.getExpiresAtUtc(),
                            message.getContentType(),
                            scientist != null ? scientist.get("firstName") : "",
                            scientist != null ? scientist.get("name") : "");
                    }
                    return CompletableFuture.completedFuture(null);
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
        },
        // 1 concurrent call, messages are auto-completed, auto-renew duration
        new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));
    }

如何处理应用程序崩溃和不可读消息How to handle application crashes and unreadable messages

服务总线提供了相关功能,帮助你轻松地从应用程序错误或消息处理问题中恢复。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 如果接收方应用程序出于某种原因无法处理消息,则其可以对收到的消息调用 unlockMessage 方法(而不是 deleteMessage 方法)。If a receiver application is unable to process the message for some reason, then it can call the unlockMessage method on the received message (instead of the deleteMessage method). 这会导致服务总线解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。This causes Service Bus to unlock the message within the queue and make it available to be received again, either by the same consuming application or by another consuming application.

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),服务总线会自动解锁该消息并使它可再次被接收。There is also a timeout associated with a message locked within the queue, and if the application fails to process the message before the lock timeout expires (for example, if the application crashes), then Service Bus unlocks the message automatically and makes it available to be received again.

如果在处理消息之后,发出 deleteMessage 请求之前,应用程序发生崩溃,该消息会在应用程序重新启动时重新传送给它。In the event that the application crashes after processing the message but before the deleteMessage request is issued, then the message is redelivered to the application when it restarts. 此情况通常称作至少处理一次,即每条消息至少被处理一次,但在某些情况下,同一消息可能会被重新传送。This is often called At Least Once Processing; that is, each message is processed at least once but in certain situations the same message may be redelivered. 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。If the scenario cannot tolerate duplicate processing, then application developers should add additional logic to their application to handle duplicate message delivery. 通常可使用消息的 getMessageId 方法实现此操作,这在多个传送尝试中保持不变。This is often achieved using the getMessageId method of the message, which remains constant across delivery attempts.

后续步骤Next Steps

现在,已了解服务总线队列的基础知识,请参阅队列、主题和订阅 以获取更多信息。Now that you've learned the basics of Service Bus queues, see Queues, topics, and subscriptions for more information.

有关详细信息,请参阅 Java 开发人员中心For more information, see the Java Developer Center.