快速入门:通过 Java 使用 Azure 服务总线队列发送和接收消息

本教程介绍如何创建 Java 应用程序来向 Azure 服务总线队列发送消息以及从中接收消息。

备注

可以在 GitHub 上的 azure-service-bus 存储库中找到 Java 示例。

先决条件

  1. Azure 订阅。 若要完成本教程,需要一个 Azure 帐户。 你可以激活 MSDN 订阅者权益或注册试用订阅
  2. 如果没有可使用的队列,请遵循使用 Azure 门户创建服务总线队列一文来创建队列。
    1. 阅读服务总线 队列 的快速 概述
    2. 创建一个服务总线 命名空间
    3. 获取 连接字符串
    4. 创建一个服务总线 队列
  3. 安装 Azure SDK for Java

配置应用程序以使用服务总线

在生成本示例之前,请确保已安装 Azure SDK for Java

如果使用了 Eclipse,则可以安装包含 Azure SDK for Java 的 Azure Toolkit for Eclipse 。 然后,可将“适用于 Java 的 Microsoft Azure 库”添加到项目 。 有关详细信息,请参阅安装 Azure Toolkit for IntelliJ

将适用于 Java 的 Azure 库添加到 Eclipse 项目

将以下 import 语句添加到 Java 文件顶部:

// Include the following imports to use Service Bus APIs
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;

import static java.nio.charset.StandardCharsets.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

import org.apache.commons.cli.*;

向队列发送消息

若要将消息发送到服务总线队列,应用程序将实例化 QueueClient 对象并以异步方式发送消息。 以下代码显示如何为通过门户创建的队列发送消息。

public void run() throws Exception {
    // Create a QueueClient instance and then asynchronously send messages.
    // Close the sender once the send operation is complete.
    QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
    this.sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());

    sendClient.close();
}

    CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
        List<HashMap<String, String>> data =
                GSON.fromJson(
                        "[" +
                                "{'name' = 'Einstein', 'firstName' = 'Albert'}," +
                                "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
                                "{'name' = 'Curie', 'firstName' = 'Marie'}," +
                                "{'name' = 'Hawking', 'firstName' = 'Steven'}," +
                                "{'name' = 'Newton', 'firstName' = 'Isaac'}," +
                                "{'name' = 'Bohr', 'firstName' = 'Niels'}," +
                                "{'name' = 'Faraday', 'firstName' = 'Michael'}," +
                                "{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
                                "{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
                                "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
                                "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType());

        List<CompletableFuture> tasks = new ArrayList<>();
        for (int i = 0; i < data.size(); i++) {
            final String messageId = Integer.toString(i);
            Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
            message.setContentType("application/json");
            message.setLabel("Scientist");
            message.setMessageId(messageId);
            message.setTimeToLive(Duration.ofMinutes(2));
            System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
            tasks.add(
                    sendClient.sendAsync(message).thenRunAsync(() -> {
                        System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
                    }));
        }
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }

发送到服务总线队列以及从服务总线队列收到的消息是 Message 类的实例。 Message 对象包含一组标准属性(如 Label 和 TimeToLive)、一个用来保存自定义应用程序特定属性的字典以及大量任意应用程序数据。 应用程序可通过将任何可序列化对象传入到 Message 的构造函数中来设置消息的正文,并将使用适当的序列化程序来序列化对象。 或者,可以提供 java.IO.InputStream 对象。

服务总线队列在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。 一个队列中包含的消息数量不受限制,但消息的总大小受限制。 此队列大小是在创建时定义的,上限为 5 GB。

从队列接收消息

从队列接收消息的主要方法是使用 ServiceBusContract 对象。 收到的消息可在两种不同模式下工作:ReceiveAndDeletePeekLock

当使用 ReceiveAndDelete 模式时,接收是一项单次操作,即,服务总线接收到队列中某条消息的读取请求时,会将该消息标记为“已使用”并将其返回给应用程序。 ReceiveAndDelete 模式(默认模式)是最简单的模式,最适合应用程序可容忍出现故障时不处理消息的情景。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线已将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它就漏掉了在发生故障前使用的消息。

PeekLock 模式下,接收变成了一个两阶段操作,从而有可能支持无法允许遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序完成消息处理(或可靠地存储消息以供将来处理)后,它将通过对收到的消息调用 complete() 完成接收过程的第二个阶段。 看到 complete() 调用时,服务总线会将消息标记为“已使用”,并将消息从队列中删除。

以下示例演示如何使用 PeekLock 模式(非默认模式)接收和处理消息。 下面的示例将回调模型与已注册消息处理程序配合使用,并在消息到达我们的 TestQueue 时对其进行处理。 此模式在回调正常返回时自动调用 complete() ,如果回调引发了异常,则会调用 abandon()

    public void run() throws Exception {
        // Create a QueueClient instance for receiving using the connection string builder
        // We set the receive mode to "PeekLock", meaning the message is delivered
        // under a lock and must be acknowledged ("completed") to be removed from the queue
        QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
        this.registerReceiver(receiveClient);

        // shut down receiver to close the receive loop
        receiveClient.close();
    }
    void registerReceiver(QueueClient queueClient) throws Exception {
        // register the RegisterMessageHandler callback
        queueClient.registerMessageHandler(new IMessageHandler() {
        // callback invoked when the message handler loop has obtained a message
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
            // receives message is passed to callback
                if (message.getLabel() != null &&
                    message.getContentType() != null &&
                    message.getLabel().contentEquals("Scientist") &&
                    message.getContentType().contentEquals("application/json")) {

                        byte[] body = message.getBody();
                        Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

                        System.out.printf(
                            "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                            "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
                            message.getMessageId(),
                            message.getSequenceNumber(),
                            message.getEnqueuedTimeUtc(),
                            message.getExpiresAtUtc(),
                            message.getContentType(),
                            scientist != null ? scientist.get("firstName") : "",
                            scientist != null ? scientist.get("name") : "");
                    }
                    return CompletableFuture.completedFuture(null);
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
        },
        // 1 concurrent call, messages are auto-completed, auto-renew duration
        new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));
    }

如何处理应用程序崩溃和不可读消息

服务总线提供了相关功能,帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序出于某种原因无法处理消息,它可以在客户端对象上调用 abandon() 方法并使用通过 getLockToken() 获取的已收到消息的锁定令牌。 这会导致服务总线解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),则服务总线将自动解锁该消息并使它可再次被接收。

请注意,如果应用程序在处理消息之后,但在发出 complete() 请求之前发生崩溃,则在应用程序重新启动时会将该消息重新传送给它。 此情况通常称作 至少处理一次,即每条消息至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 通常可使用消息的 getMessageId 方法实现此操作,这在多个传送尝试中保持不变。

备注

可以使用服务总线资源管理器管理服务总线资源。 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。

后续步骤

现在,已了解服务总线队列的基础知识,请参阅队列、主题和订阅 以获取更多信息。

有关详细信息,请参阅 Java 开发人员中心