如何使用 Service Bus 队列

本文介绍如何通过 Node.js 使用服务总线队列。 示例用 JavaScript 编写并使用 Node.js Azure 模块。 涉及的任务包括创建队列发送和接收消息以及删除队列。 有关队列的详细信息,请参阅后续步骤部分。

什么是 Service Bus 队列?

服务总线队列支持中转消息传送通信模型。 在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介(代理)的队列交换消息。 消息创建方(发送方)将消息传送到队列,然后继续对其进行处理。 消息使用方(接收方)以异步方式从队列中提取消息并对其进行处理。 创建方不必等待使用方的答复即可继续处理并发送更多消息。 队列为一个或多个竞争使用方提供先入先出 (FIFO)消息传递方式。 也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。

QueueConcepts

服务总线队列是一种可用于各种应用场景的通用技术:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信。
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信。
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信。

利用队列,你可以更轻松地缩放应用程序,并增强体系结构的弹性。

创建服务命名空间

若要开始在 Azure 中使用服务总线队列,必须先创建一个服务命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。

创建服务命名空间:

  1. 登录到 Azure 经典门户

  2. 在门户的左侧导航窗格中,单击“服务总线”。

  3. 在门户的下方窗格中,单击“创建”。

  4. 在“添加新命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  5. 在确保命名空间名称可用后,选择应承载你的命名空间的国家或地区(确保使用在其中部署计算资源的同一国家/地区)。

    Important

    选取要选择用于部署应用程序的相同区域。 这将为你提供最佳性能。

  6. 将对话框中的其他字段保留其默认值(“消息传送”和“标准层”),然后单击“确定”复选标记。 系统现已创建命名空间并已将其启用。 您可能需要等待几分钟,因为系统将为您的帐户配置资源。

创建的命名空间将花费一段时间来激活,然后显示在门户中。 请等到命名空间状态变为“活动”后再继续操作 。

获取命名空间的默认管理凭据

若要在新命名空间上执行管理操作(如创建队列),则必须获取该命名空间的管理凭据。 可以从 Azure 经典门户中获取这些凭据。

从门户中获取管理凭据

  1. 在左侧导航窗格中,单击“服务总线”节点以显示可用命名空间的列表:

  2. 从显示的列表中选择刚刚创建的命名空间:

  3. 单击“连接信息”。

  4. 在“访问连接信息”窗格中,找到包含 SAS 密钥和密钥名称的连接字符串。

  5. 记下该密钥或将其复制到剪贴板。

  1. 登录到 Azure 门户

  2. 在门户的左侧导航窗格中,依次单击“+”新建,搜索“service bus"。

  3. 在“创建命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  4. 在确保命名空间名称可用后,选择定价层(基础版或标准版)。

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。

  6. 在“资源组” 字段中,选择该命名空间将存在于的现有资源组,或者创建一个新资源组。

  7. 在“位置” 中,选择将在其中托管该命名空间的国家或地区。

    创建命名空间

  8. 单击“创建” 。 系统现已创建命名空间并已将其启用。 您可能需要等待几分钟,因为系统将为您的帐户配置资源。

获取管理凭据

  1. 在命名空间列表中,单击新建的命名空间名称。

  2. 在命名空间边栏选项卡中,单击“共享访问策略”。

  3. 在“共享访问策略”边栏选项卡中,单击“RootManageSharedAccessKey”。

    connection-info

  4. 在“策略: RootManageSharedAccessKey”边栏选项卡中,单击“连接字符串 - 主键”旁边的复制按钮,将连接字符串复制到剪贴板供以后使用。 将此值粘贴到记事本或其他某个临时位置。

    连接字符串

创建 Node.js 应用程序

创建一个空的 Node.js 应用程序。 有关如何创建 Node.js 应用程序的说明,请参阅创建 Node.js 应用程序并将其部署到 Azure 网站Node.js 云服务(使用 Windows PowerShell)。

配置应用程序以使用 Service Bus

若要使用 Azure 服务总线,请下载并使用 Node.js Azure 包。 此程序包包括一组用来与服务总线 REST 服务通信的库。

使用 Node 包管理器 (NPM) 可获取该程序包

  1. 使用 Windows PowerShell for Node.js 命令窗口导航到你在其中创建了示例应用程序的 c:\node\sbqueues\WebRole1 文件夹。
  2. 在命令窗口中键入 npm install azure,这应该产生类似如下的输出:

    azure@0.7.5 node_modules\azure
        ├── dateformat@1.0.2-1.2.3
        ├── xmlbuilder@0.4.2
        ├── node-uuid@1.2.0
        ├── mime@1.2.9
        ├── underscore@1.4.4
        ├── validator@1.1.1
        ├── tunnel@0.0.2
        ├── wns@0.5.3
        ├── xml2js@0.2.7 (sax@0.5.2)
        └── request@2.21.0 (json-stringify-safe@4.0.0, forever-agent@0.5.0, aws-sign@0.3.0, tunnel-agent@0.3.0, oauth-sign@0.3.0, qs@0.6.5, cookie-jar@0.3.0, node-uuid@1.4.0, http-signature@0.9.11, form-data@0.0.8, hawk@0.13.1)
    
  3. 可以手动运行 ls 命令来验证是否创建了 node_modules 文件夹。 在该文件夹中,找到 azure 包,其中包含访问服务总线队列所需的库。

导入模块

使用记事本或其他文本编辑器将以下内容添加到应用程序的 server.js 文件的顶部:

var azure = require('azure');

设置 Azure 服务总线连接

Azure 模块将读取环境变量 AZURE_SERVICEBUS_NAMESPACE and AZURE_SERVICEBUS_ACCESS_KEY 以获取连接到服务总线所需的信息。 如果未设置这些环境变量,则在调用 createServiceBusService 时必须指定帐户信息。

有关在 Azure 云服务的配置文件中设置环境变量的示例,请参阅使用存储的 Node.js 云服务

有关在 Azure 经典管理门户中为 Azure 网站设置环境变量的示例,请参阅使用存储的 Node.js Web 应用程序

创建队列

可以通过 ServiceBusService 对象处理服务总线队列。 以下代码创建 ServiceBusService 对象。 将它添加到靠近 server.js 文件顶部、用于导入 Azure 模块的语句之后的位置:

var serviceBusService = azure.createServiceBusService();

通过对 ServiceBusService 对象调用 createQueueIfNotExists,将返回指定的队列(如果存在),否则将使用指定的名称创建一个新队列。 以下代码使用 createQueueIfNotExists 创建或连接到名为 myqueue 的队列:

serviceBusService.createQueueIfNotExists('myqueue', function(error){
    if(!error){
        // Queue exists
    }
});

createServiceBusService 也支持其他选项,这些选项允许重写默认队列设置,如消息生存时间或最大队列大小。 以下示例将最大队列大小设置为 5 GB,将生存时间 (TTL) 值设置为 1 分钟:

var queueOptions = {
      MaxSizeInMegabytes: '5120',
      DefaultMessageTimeToLive: 'PT1M'
    };

serviceBusService.createQueueIfNotExists('myqueue', queueOptions, function(error){
    if(!error){
        // Queue exists
    }
});

筛选器

可选的筛选操作可应用于使用 ServiceBusService 执行的操作。 筛选操作可包括日志记录、自动重试等。筛选器是实现具有签名的方法的对象:

function handle (requestOptions, next)

在对请求选项执行预处理后,该方法必须调用 next 并传递具有以下签名的回调:

function (returnObject, finalCallback, next)

在此回调中并且在处理 returnObject(来自对服务器请求的响应)后,回调必须调用 next(如果它存在)以便继续处理其他筛选器或只调用 finalCallback,以便结束服务调用。

Azure SDK for Node.js 中附带了两个实现了重试逻辑的筛选器,分别是 ExponentialRetryPolicyFilterLinearRetryPolicyFilter。 以下代码创建一个 ServiceBusService 对象,该对象使用 ExponentialRetryPolicyFilter

var retryOperations = new azure.ExponentialRetryPolicyFilter();
var serviceBusService = azure.createServiceBusService().withFilter(retryOperations);

向队列发送消息

若要向服务总线队列发送消息,你的应用程序需对 ServiceBusService 对象调用 sendQueueMessage 方法。 发往服务总线队列的消息以及从服务总线队列接收的消息是 BrokeredMessage 对象,它们具有一组标准属性(如 LabelTimeToLive)、一个用来保存自定义应用程序特定属性的字典和一段任意应用程序数据正文。 应用程序可以通过将字符串作为消息传递来设置消息正文。 任何必需的标准属性将用默认值来填充。

以下示例演示如何使用 sendQueueMessage 向名为 myqueue 的队列发送一条测试消息:

var message = {
    body: 'Test message',
    customProperties: {
        testproperty: 'TestValue'
    }};
serviceBusService.sendQueueMessage('myqueue', message, function(error){
    if(!error){
        // message sent
    }
});

服务总线队列在标准层中支持的最大消息大小为 256 KB。 标头最大为 64 KB,其中包括标准和自定义应用程序属性。 一个队列可包含的消息数不受限制,但消息的总大小受限。 此队列大小是在创建时定义的,上限为 5 GB。 有关配额的详细信息,请参阅服务总线配额

从队列接收消息

ServiceBusService 对象使用 receiveQueueMessage方法可从队列接收消息。 默认情况下,消息被读取后即从队列删除;但是你可以读取(速览)并锁定消息而不将其从队列删除,只要将可选参数 isPeekLock 设置为 true 即可。

在接收过程中读取并删除消息的默认行为是最简单的模式,并且最适合在发生故障时应用程序可以容忍不处理消息的情况。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于 Service Bus 会将消息标记为“将使用”,因此当应用程序重启并重新开始使用消息时,它会丢失在发生崩溃前使用的消息。

如果将 isPeekLock 参数设置为 true,则接收将变成一个两阶段操作,这样就可以支持无法允许遗漏消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,然后将该消息返回到应用程序。 应用程序处理完该消息(或将它可靠地存储起来留待将来处理)后,通过调用 deleteMessage 方法并提供要删除的消息作为参数来完成接收过程的第二阶段。 deleteMessage 方法将此消息标记为“已使用”并将其从队列中删除。

以下示例演示如何使用 receiveQueueMessage 接收和处理消息。 该示例先接收并删除一条消息,然后使用设置为 trueisPeekLock 接收一条消息,最后使用 deleteMessage 删除该消息:

serviceBusService.receiveQueueMessage('myqueue', function(error, receivedMessage){
    if(!error){
        // Message received and deleted
    }
});
serviceBusService.receiveQueueMessage('myqueue', { isPeekLock: true }, function(error, lockedMessage){
    if(!error){
        // Message received and locked
        serviceBusService.deleteMessage(lockedMessage, function (deleteError){
            if(!deleteError){
                // Message deleted
            }
        });
    }
});

如何处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序因某种原因无法处理消息,则它可以对 ServiceBusService 对象调用 unlockMessage 方法。 这将导致 Service Bus 解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定消息关联的超时,并且如果应用程序无法在锁定超时到期之前处理消息(例如,如果应用程序崩溃),Service Bus 将自动解锁该消息并使它可再次被接收。

如果应用程序在处理消息之后,调用 deleteMessage 方法之前崩溃,则在应用程序重新启动时会将该消息重新传送给它。 此情况通常称作 至少处理一次,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 这通常可以通过消息的 MessageId 属性来实现,该属性在多次传送尝试中保持不变。

后续步骤

若要了解有关队列的详细信息,请参阅以下资源。