如何通过 Java 使用服务总线队列

本文介绍了如何使用服务总线队列。 这些示例用 Java 编写并使用 用于 Java 的 Azure SDK。 涉及的任务包括创建队列发送和接收消息以及删除队列

什么是 Service Bus 队列?

服务总线队列支持中转消息传送通信模型。 在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介(代理)的队列交换消息。 消息创建方(发送方)将消息传送到队列,然后继续对其进行处理。 消息使用方(接收方)以异步方式从队列中提取消息并对其进行处理。 创建方不必等待使用方的答复即可继续处理并发送更多消息。 队列为一个或多个竞争使用方提供先入先出 (FIFO)消息传递方式。 也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。

QueueConcepts

服务总线队列是一种可用于各种应用场景的通用技术:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信。
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信。
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信。

利用队列,你可以更轻松地缩放应用程序,并增强体系结构的弹性。

创建服务命名空间

若要开始在 Azure 中使用服务总线队列,必须先创建一个服务命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。

创建服务命名空间:

  1. 登录到 Azure 经典门户

  2. 在门户的左侧导航窗格中,单击“服务总线”。

  3. 在门户的下方窗格中,单击“创建”。

  4. 在“添加新命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  5. 在确保命名空间名称可用后,选择应承载你的命名空间的国家或地区(确保使用在其中部署计算资源的同一国家/地区)。

    Important

    选取要选择用于部署应用程序的相同区域。 这将为你提供最佳性能。

  6. 将对话框中的其他字段保留其默认值(“消息传送”和“标准层”),然后单击“确定”复选标记。 系统现已创建命名空间并已将其启用。 您可能需要等待几分钟,因为系统将为您的帐户配置资源。

创建的命名空间将花费一段时间来激活,然后显示在门户中。 请等到命名空间状态变为“活动”后再继续操作 。

获取命名空间的默认管理凭据

若要在新命名空间上执行管理操作(如创建队列),则必须获取该命名空间的管理凭据。 可以从 Azure 经典门户中获取这些凭据。

从门户中获取管理凭据

  1. 在左侧导航窗格中,单击“服务总线”节点以显示可用命名空间的列表:

  2. 从显示的列表中选择刚刚创建的命名空间:

  3. 单击“连接信息”。

  4. 在“访问连接信息”窗格中,找到包含 SAS 密钥和密钥名称的连接字符串。

  5. 记下该密钥或将其复制到剪贴板。

若要开始在 Azure 中使用服务总线队列,必须先创建一个命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的作用域容器。

创建命名空间:

  1. 登录到 Azure 门户
  2. 在门户的左侧导航窗格中,依次单击“+”新建,搜索“service bus"。

  3. 在“创建命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  4. 在确保命名空间名称可用后,选择定价层(基础版或标准版)。

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。

  6. 在“资源组” 字段中,选择用于放置该命名空间的现有资源组,或者创建一个新资源组。

  7. 在“位置” 中,选择应在其中托管该命名空间的国家或地区。

    创建命名空间

  8. 单击“创建” 。 系统现已创建命名空间并已将其启用。 可能需要等待几分钟,因为系统会为你的帐户配置资源。

获取管理凭据

  1. 在命名空间列表中,单击新建的命名空间名称。

  2. 在命名空间边栏选项卡中,单击“共享访问策略”。

  3. 在“共享访问策略”边栏选项卡中,单击“RootManageSharedAccessKey”。

    connection-info

  4. 在“策略: RootManageSharedAccessKey”边栏选项卡中,单击“连接字符串 - 主键”旁边的复制按钮,将连接字符串复制到剪贴板供稍后使用。 将此值粘贴到记事本或其他某个临时位置。

    connection-string

  5. 重复上述步骤,将主键的值复制和粘贴到临时位置,以供稍后使用。

配置应用程序以使用服务总线

在生成本示例之前,请确保已安装 用于 Java 的 Azure SDK。 如果使用的是 Eclipse,则可以安装包含用于 Java 的 Azure SDK 的用于 Eclipse 的 Azure 工具包。 然后,用户可以将 Microsoft Azure Libraries for Java 添加到项目:

将以下 import 语句添加到 Java 文件顶部:

// Include the following imports to use Service Bus APIs
import com.microsoft.windowsazure.services.servicebus.*;
import com.microsoft.windowsazure.services.servicebus.models.*;
import com.microsoft.windowsazure.core.*;
import javax.xml.datatype.*;

创建队列

服务总线队列的管理操作可通过 ServiceBusContract 类执行。 ServiceBusContract 对象是使用封装了 SAS 令牌及用于管理其权限的适当配置构造的,而 ServiceBusContract 类是与 Azure 进行通信的单一点。

ServiceBusService 类提供了创建、枚举和删除队列的方法。 以下示例演示了如何通过名为“HowToSample”的命名空间,使用 ServiceBusService 对象创建名为“TestQueue”的队列:

    Configuration config =
        ServiceBusConfiguration.configureWithSASAuthentication(
                "HowToSample",
                "RootManageSharedAccessKey",
                "SAS_key_value",
                ".servicebus.chinacloudapi.cn"
                );

ServiceBusContract service = ServiceBusService.create(config);
QueueInfo queueInfo = new QueueInfo("TestQueue");
try
{
    CreateQueueResult result = service.createQueue(queueInfo);
}
catch (ServiceException e)
{
    System.out.print("ServiceException encountered: ");
    System.out.println(e.getMessage());
    System.exit(-1);
}

可对 QueueInfo 执行某些方法,以调整队列的属性(例如,将默认的生存时间 (TTL) 值设置为应用于发送到队列的消息)。 以下示例演示了如何创建最大大小为 5GB 且名为 TestQueue 的队列:

long maxSizeInMegabytes = 5120;
QueueInfo queueInfo = new QueueInfo("TestQueue");
queueInfo.setMaxSizeInMegabytes(maxSizeInMegabytes);
CreateQueueResult result = service.createQueue(queueInfo);

注意:可对 ServiceBusContract 对象使用 listQueues 方法来检查具有指定名称的队列在某个服务命名空间中是否已存在。

向队列发送消息

要将消息发送到服务总线队列,应用程序将获得 ServiceBusContract 对象。 以下代码演示了如何将消息发送到先前在 HowToSample 命名空间中创建的 TestQueue 队列。

try
{
    BrokeredMessage message = new BrokeredMessage("MyMessage");
    service.sendQueueMessage("TestQueue", message);
}
catch (ServiceException e) 
{
    System.out.print("ServiceException encountered: ");
    System.out.println(e.getMessage());
    System.exit(-1);
}

发送到服务总线队列以及从服务总线队列接收的消息是 BrokeredMessage 类的实例。 BrokeredMessage 对象包含一组标准属性(如 LabelTimeToLive)、一个用来保存自定义应用程序特定属性的字典以及大量随机应用程序数据。 应用程序可通过将任何可序列化对象传入到 BrokeredMessage 的构造函数中来设置消息的正文,然后将使用适当的序列化程序来序列化对象。 或者,可提供 java.IO.InputStream 对象。

以下示例演示了如何将五条测试消息发送到在前面的代码片段中获取的 TestQueue MessageSender

for (int i=0; i<5; i++)
{
     // Create message, passing a string message for the body.
     BrokeredMessage message = new BrokeredMessage("Test message " + i);
     // Set an additional app-specific property.
     message.setProperty("MyProperty", i); 
     // Send message to the queue
     service.sendQueueMessage("TestQueue", message);
}

服务总线队列在标准层中支持的最大消息大小为 256 KB。 标头最大为 64 KB,其中包括标准和自定义应用程序属性。 一个队列可包含的消息数不受限制,但消息的总大小受限。 此队列大小是在创建时定义的,上限为 5 GB。

从队列接收消息

从队列接收消息的主要方法是使用 ServiceBusContract 对象。 收到的消息可在两种不同模式下工作:ReceiveAndDeletePeekLock

当使用 ReceiveAndDelete 模式时,接收是一项单次操作,即,服务总线接收到队列中某条消息的读取请求时,会将该消息标记为“已使用”并将其返回给应用程序。 ReceiveAndDelete 模式(默认模式)是最简单的模式,最适合应用程序可容忍出现故障时不处理消息的情景。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线会将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它会遗漏在发生崩溃前使用的消息。

PeekLock 模式下,接收变成了一个两阶段操作,从而有可能支持无法允许遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序完成消息处理(或可靠地存储消息以供将来处理)后,会通过对收到的消息调用 Delete 完成接收过程的第二个阶段。 服务总线发现 Delete 调用时,会将消息标记为“已使用”并将其从队列中删除。

以下示例演示如何使用 PeekLock 模式(非默认模式)接收和处理消息。 下面的示例执行无限循环并在消息到达我们的“TestQueue”后进行处理:

    try
{
    ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT;
    opts.setReceiveMode(ReceiveMode.PEEK_LOCK);

    while(true)  { 
         ReceiveQueueMessageResult resultQM = 
                 service.receiveQueueMessage("TestQueue", opts);
        BrokeredMessage message = resultQM.getValue();
        if (message != null && message.getMessageId() != null)
        {
            System.out.println("MessageID: " + message.getMessageId());    
            // Display the queue message.
            System.out.print("From queue: ");
            byte[] b = new byte[200];
            String s = null;
            int numRead = message.getBody().read(b);
            while (-1 != numRead)
            {
                s = new String(b);
                s = s.trim();
                System.out.print(s);
                numRead = message.getBody().read(b);
            }
            System.out.println();
            System.out.println("Custom Property: " + 
                message.getProperty("MyProperty"));
            // Remove message from queue.
            System.out.println("Deleting this message.");
            //service.deleteMessage(message);
        }  
        else  
        {        
            System.out.println("Finishing up - no more messages.");        
            break; 
            // Added to handle no more messages.
            // Could instead wait for more messages to be added.
        }
    }
}
catch (ServiceException e) {
    System.out.print("ServiceException encountered: ");
    System.out.println(e.getMessage());
    System.exit(-1);
}
catch (Exception e) {
    System.out.print("Generic exception encountered: ");
    System.out.println(e.getMessage());
    System.exit(-1);
}     

如何处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序出于某种原因无法处理消息,则其可以对收到的消息调用 unlockMessage 方法(而不是 deleteMessage 方法)。 这会导致服务总线解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),则服务总线将自动解锁该消息并使它可再次被接收。

如果在处理消息之后,发出 deleteMessage 请求之前,应用程序发生崩溃,则在应用程序重启时会将该消息重新传送给它。 此情况通常称作至少处理一次,即每条消息至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 通常可使用消息的 getMessageId 方法实现此操作,这在多个传送尝试中保持不变。

后续步骤

现在,已了解服务总线队列的基础知识,请参阅队列、主题和订阅 以获取更多信息。

有关详细信息,请参阅 Java 开发人员中心