如何通过 Java 使用服务总线队列

本文介绍了如何使用服务总线队列。 这些示例用 Java 编写并使用 用于 Java 的 Azure SDK。 涉及的任务包括创建队列发送和接收消息以及删除队列

Note

可以在 GitHub 上的 azure-service-bus 存储库中找到 Java 示例。

什么是 Service Bus 队列?

服务总线队列支持中转消息传送通信模型。 在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介(代理)的队列交换消息。 消息创建方(发送方)将消息传送到队列,然后继续对其进行处理。 消息使用方(接收方)以异步方式从队列中提取消息并对其进行处理。 创建方不必等待使用方的答复即可继续处理并发送更多消息。 队列为一个或多个竞争使用方提供先入先出 (FIFO)消息传递方式。 也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。

QueueConcepts

服务总线队列是一种可用于各种应用场景的通用技术:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信。
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信。
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信。

利用队列,你可以更轻松地缩放应用程序,并增强体系结构的弹性。

创建服务总线命名空间

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。

创建命名空间:

  1. 登录到 Azure 门户
  2. 在门户的左侧导航窗格中,依次单击“+”新建,搜索“service bus"。

  3. 在“创建命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  4. 在确保命名空间名称可用后,选择定价层(基础版或标准版)。

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。

  6. 在“资源组” 字段中,选择用于放置该命名空间的现有资源组,或者创建一个新资源组。

  7. 在“位置” 中,选择应在其中托管该命名空间的国家或地区。

    创建命名空间

  8. 单击“创建” 。 系统现已创建命名空间并已将其启用。 可能需要等待几分钟,因为系统会为你的帐户配置资源。

获取管理凭据

创建新的命名空间时,会自动生成一项初始的共享访问签名 (SAS) 规则,将一对主密钥和辅助密钥关联到一起,向每个密钥授予对命名空间的所有资产的完全控制权限。 请参阅服务总线身份验证和授权,了解如何创建更多的规则,对常规的发送者和接收者的权限进行更多限制。 若要复制初始规则,请执行以下步骤:

  1. 单击“所有资源”,然后单击新创建的命名空间名称。
  2. 在命名空间窗口中,单击“共享访问策略”。
  3. 在“共享访问策略”屏幕中,单击“RootManageSharedAccessKey”。

    connection-info

  4. 在“策略: RootManageSharedAccessKey”窗口中,单击“主连接字符串”旁边的“复制”按钮,将连接字符串复制到剪贴板供以后使用。 将此值粘贴到记事本或其他某个临时位置。

    connection-string

  5. 重复上述步骤,将主键的值复制和粘贴到临时位置,以供稍后使用。

创建服务总线队列

请确保已创建服务总线命名空间,如 此处所示。

  1. 登录到 Azure 门户
  2. 在门户左侧的导航窗格中,单击“服务总线”(如果未看到“服务总线”,请单击“所有服务”)。
  3. 单击要在其中创建队列的命名空间。 在此示例中,它是“sbnstest1”。

    创建队列

  4. 在命名空间窗口中单击“队列”,然后在“队列”窗口中单击“+ 队列”。

    选择“队列”

  5. 输入队列名称,其他值则保留默认值。

    选择“新建”

  6. 在窗口底部,单击“创建”。

配置应用程序以使用服务总线

在生成本示例之前,请确保已安装 用于 Java 的 Azure SDK。 如果使用的是 Eclipse,则可以安装包含用于 Java 的 Azure SDK 的用于 Eclipse 的 Azure 工具包。 然后,用户可以将 Azure Libraries for Java 添加到项目:

将以下 import 语句添加到 Java 文件顶部:

// Include the following imports to use Service Bus APIs
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.*;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import com.google.gson.Gson;

import static java.nio.charset.StandardCharsets.*;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;

import org.apache.commons.cli.*;

向队列发送消息

若要将消息发送到服务总线队列,应用程序将实例化 QueueClient 对象并以异步方式发送消息。 以下代码显示如何为通过门户创建的队列发送消息。

public void run() throws Exception {
    // Create a QueueClient instance and then asynchronously send messages.
    // Close the sender once the send operation is complete.
    QueueClient sendClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
    this.sendMessageAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());

    sendClient.close();
}

    CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
        List<HashMap<String, String>> data =
                GSON.fromJson(
                        "[" +
                                "{'name' = 'Einstein', 'firstName' = 'Albert'}," +
                                "{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
                                "{'name' = 'Curie', 'firstName' = 'Marie'}," +
                                "{'name' = 'Hawking', 'firstName' = 'Steven'}," +
                                "{'name' = 'Newton', 'firstName' = 'Isaac'}," +
                                "{'name' = 'Bohr', 'firstName' = 'Niels'}," +
                                "{'name' = 'Faraday', 'firstName' = 'Michael'}," +
                                "{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
                                "{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
                                "{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
                                "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType());

        List<CompletableFuture> tasks = new ArrayList<>();
        for (int i = 0; i < data.size(); i++) {
            final String messageId = Integer.toString(i);
            Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
            message.setContentType("application/json");
            message.setLabel("Scientist");
            message.setMessageId(messageId);
            message.setTimeToLive(Duration.ofMinutes(2));
            System.out.printf("\nMessage sending: Id = %s", message.getMessageId());
            tasks.add(
                    sendClient.sendAsync(message).thenRunAsync(() -> {
                        System.out.printf("\n\tMessage acknowledged: Id = %s", message.getMessageId());
                    }));
        }
        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }

发送到服务总线队列以及从服务总线队列收到的消息是 Message 类的实例。 Message 对象包含一组标准属性(如 Label 和 TimeToLive)、一个用来保存自定义应用程序特定属性的字典以及大量任意应用程序数据。 应用程序可通过将任何可序列化对象传入到 Message 的构造函数中来设置消息的正文,并将使用适当的序列化程序来序列化对象。 或者,可以提供 java.IO.InputStream 对象。 服务总线队列在标准层中支持的最大消息大小为 256 KB。 标头大小为 64 KB,其中包括标准和自定义应用程序属性。 一个队列中包含的消息数量不受限制,但消息的总大小受限制。 此队列大小是在创建时定义的,上限为 5 GB。

从队列接收消息

从队列接收消息的主要方法是使用 ServiceBusContract 对象。 收到的消息可在两种不同模式下工作:ReceiveAndDeletePeekLock

当使用 ReceiveAndDelete 模式时,接收是一项单次操作,即,服务总线接收到队列中某条消息的读取请求时,会将该消息标记为“已使用”并将其返回给应用程序。 ReceiveAndDelete 模式(默认模式)是最简单的模式,最适合应用程序可容忍出现故障时不处理消息的情景。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线已将消息标记为“已使用”,因此当应用程序重新启动并重新开始使用消息时,它会漏掉在发生崩溃前使用的消息。

PeekLock 模式下,接收变成了一个两阶段操作,从而有可能支持无法允许遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序完成消息处理(或可靠地存储消息以供将来处理)后,会通过对收到的消息调用 Delete 完成接收过程的第二个阶段。 服务总线发现 Delete 调用时,会将消息标记为“已使用”,并将消息从队列中删除。

以下示例演示如何使用 PeekLock 模式(非默认模式)接收和处理消息。 下面的示例将执行无限循环并在消息达到我们的 TestQueue 后进行处理:

    public void run() throws Exception {
        // Create a QueueClient instance for receiving using the connection string builder
        // We set the receive mode to "PeekLock", meaning the message is delivered
        // under a lock and must be acknowledged ("completed") to be removed from the queue
        QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(ConnectionString, QueueName), ReceiveMode.PEEKLOCK);
        this.registerReceiver(receiveClient);

        // shut down receiver to close the receive loop
        receiveClient.close();
    }
    void registerReceiver(QueueClient queueClient) throws Exception {
        // register the RegisterMessageHandler callback
        queueClient.registerMessageHandler(new IMessageHandler() {
        // callback invoked when the message handler loop has obtained a message
            public CompletableFuture<Void> onMessageAsync(IMessage message) {
            // receives message is passed to callback
                if (message.getLabel() != null &&
                    message.getContentType() != null &&
                    message.getLabel().contentEquals("Scientist") &&
                    message.getContentType().contentEquals("application/json")) {

                        byte[] body = message.getBody();
                        Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

                        System.out.printf(
                            "\n\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
                            "\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
                            message.getMessageId(),
                            message.getSequenceNumber(),
                            message.getEnqueuedTimeUtc(),
                            message.getExpiresAtUtc(),
                            message.getContentType(),
                            scientist != null ? scientist.get("firstName") : "",
                            scientist != null ? scientist.get("name") : "");
                    }
                    return CompletableFuture.completedFuture(null);
                }

                // callback invoked when the message handler has an exception to report
                public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                    System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                }
        },
        // 1 concurrent call, messages are auto-completed, auto-renew duration
        new MessageHandlerOptions(1, true, Duration.ofMinutes(1)));
    }

如何处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序出于某种原因无法处理消息,则其可以对收到的消息调用 unlockMessage 方法(而不是 deleteMessage 方法)。 这会导致服务总线解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),则服务总线将自动解锁该消息并使它可再次被接收。

如果在处理消息之后,发出 deleteMessage 请求之前,应用程序发生崩溃,则在应用程序重启时会将该消息重新传送给它。 此情况通常称作至少处理一次,即每条消息至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 通常可使用消息的 getMessageId 方法实现此操作,这在多个传送尝试中保持不变。

后续步骤

现在,已了解服务总线队列的基础知识,请参阅队列、主题和订阅 以获取更多信息。

有关详细信息,请参阅 Java 开发人员中心