如何通过 Node.js 使用服务总线队列How to use Service Bus queues with Node.js

本教程介绍如何创建 Node.js 应用程序,以便向服务总线队列发送消息以及从中接收消息。In this tutorial, you learn how to create Node.js applications to send messages to and receive messages from a Service Bus queue. 示例用 JavaScript 编写并使用 Node.js Azure 模块。The samples are written in JavaScript and use the Node.js Azure module.

先决条件Prerequisites

  1. Azure 订阅。An Azure subscription. 若要完成本教程,需要一个 Azure 帐户。To complete this tutorial, you need an Azure account. 可以激活 [MSDN 订阅者权益]https://www.azure.cn/zh-cn/support/legal/offer-rate-plans/)或注册试用帐户You can activate your MSDN subscriber benefits or sign up for a trial account.
  2. 如果没有可使用的队列,请遵循使用 Azure 门户创建服务总线队列一文来创建队列。If you don't have a queue to work with, follow steps in the Use Azure portal to create a Service Bus queue article to create a queue.
    1. 阅读服务总线队列的快速概述Read the quick overview of Service Bus queues.

    2. 创建一个服务总线命名空间Create a Service Bus namespace.

    3. 获取连接字符串Get the connection string.

      Note

      在本教程中,需使用 Node.js 在服务总线命名空间中创建一个队列You will create a queue in the Service Bus namespace by using Node.js in this tutorial.

创建 Node.js 应用程序Create a Node.js application

创建一个空的 Node.js 应用程序。Create a blank Node.js application. 有关如何创建 Node.js 应用程序的说明,请参阅创建 Node.js 应用程序并将其部署到 Azure 网站Node.js 云服务(使用 Windows PowerShell)。For instructions on how to create a Node.js application, see Create and deploy a Node.js application to an Azure Website, or Node.js Cloud Service using Windows PowerShell.

配置应用程序以使用服务总线Configure your application to use Service Bus

若要使用 Azure 服务总线,请下载并使用 Node.js Azure 包。To use Azure Service Bus, download and use the Node.js Azure package. 此程序包包括一组用来与服务总线 REST 服务通信的库。This package includes a set of libraries that communicate with the Service Bus REST services.

使用 Node 包管理器 (NPM) 可获取该程序包Use Node Package Manager (NPM) to obtain the package

  1. 使用 Windows PowerShell for Node.js 命令窗口导航到在其中创建了示例应用程序的 c:\node\sbqueues\WebRole1 文件夹。Use the Windows PowerShell for Node.js command window to navigate to the c:\node\sbqueues\WebRole1 folder in which you created your sample application.

  2. 在命令窗口中键入 npm install azure,这应该产生类似如下的输出:Type npm install azure in the command window, which should result in output similar to the following:

    azure@0.7.5 node_modules\azure
        ├── dateformat@1.0.2-1.2.3
        ├── xmlbuilder@0.4.2
        ├── node-uuid@1.2.0
        ├── mime@1.2.9
        ├── underscore@1.4.4
        ├── validator@1.1.1
        ├── tunnel@0.0.2
        ├── wns@0.5.3
        ├── xml2js@0.2.7 (sax@0.5.2)
        └── request@2.21.0 (json-stringify-safe@4.0.0, forever-agent@0.5.0, aws-sign@0.3.0, tunnel-agent@0.3.0, oauth-sign@0.3.0, qs@0.6.5, cookie-jar@0.3.0, node-uuid@1.4.0, http-signature@0.9.11, form-data@0.0.8, hawk@0.13.1)
    
  3. 可以手动运行 ls 命令来验证是否创建了 node_modules 文件夹。You can manually run the ls command to verify that a node_modules folder was created. 在该文件夹中,找到 azure 包,其中包含访问服务总线队列所需的库。Inside that folder find the azure package, which contains the libraries you need to access Service Bus queues.

导入模块Import the module

使用记事本或其他文本编辑器将以下内容添加到应用程序的 server.js 文件的顶部:Using Notepad or another text editor, add the following to the top of the server.js file of the application:

var azure = require('azure');

设置 Azure 服务总线连接Set up an Azure Service Bus connection

Azure 模块读取环境变量 AZURE_SERVICEBUS_CONNECTION_STRING,获取连接到服务总线所需的信息。The Azure module reads the environment variable AZURE_SERVICEBUS_CONNECTION_STRING to obtain information required to connect to Service Bus. 如果未设置此环境变量,则在调用 createServiceBusService 时必须指定帐户信息。If this environment variable is not set, you must specify the account information when calling createServiceBusService.

有关在 Azure 门户中为 Azure 网站设置环境变量的示例,请参阅使用存储的 Node.js Web 应用程序For an example of setting the environment variables in the Azure portal for an Azure Website, see Node.js Web Application with Storage.

创建队列Create a queue

可以通过 ServiceBusService 对象处理服务总线队列。The ServiceBusService object enables you to work with Service Bus queues. 以下代码创建 ServiceBusService 对象。The following code creates a ServiceBusService object. 将它添加到靠近 server.js 文件顶部、用于导入 Azure 模块的语句之后的位置:Add it near the top of the server.js file, after the statement to import the Azure module:

var serviceBusService = azure.createServiceBusService();

通过对 ServiceBusService 对象调用 createQueueIfNotExists,会返回指定的队列(如果存在),否则会使用指定的名称创建一个新队列。By calling createQueueIfNotExists on the ServiceBusService object, the specified queue is returned (if it exists), or a new queue with the specified name is created. 以下代码使用 createQueueIfNotExists 创建或连接到名为 myqueue 的队列:The following code uses createQueueIfNotExists to create or connect to the queue named myqueue:

serviceBusService.createQueueIfNotExists('myqueue', function(error){
    if(!error){
        // Queue exists
    }
});

createServiceBusService 方法还支持其他选项,通过这些选项可以重写默认队列设置,例如消息生存时间或最大队列大小。The createServiceBusService method also supports additional options, which enable you to override default queue settings such as message time to live or maximum queue size. 以下示例将最大队列大小设置为 5 GB,将生存时间 (TTL) 值设置为 1 分钟:The following example sets the maximum queue size to 5 GB, and a time to live (TTL) value of 1 minute:

var queueOptions = {
      MaxSizeInMegabytes: '5120',
      DefaultMessageTimeToLive: 'PT1M'
    };

serviceBusService.createQueueIfNotExists('myqueue', queueOptions, function(error){
    if(!error){
        // Queue exists
    }
});

筛选器Filters

可选的筛选操作可应用于使用 ServiceBusService 执行的操作。Optional filtering operations can be applied to operations performed using ServiceBusService. 筛选操作可包括日志记录、自动重试等。筛选器是实现具有签名的方法的对象:Filtering operations can include logging, automatically retrying, etc. Filters are objects that implement a method with the signature:

function handle (requestOptions, next)

在对请求选项执行预处理后,该方法必须调用 next 并传递具有以下签名的回调:After doing its pre-processing on the request options, the method must call next, passing a callback with the following signature:

function (returnObject, finalCallback, next)

在此回调中并且在处理 returnObject(来自对服务器请求的响应)后,回调必须调用 next(如果存在),继续处理其他筛选器,或者调用 finalCallback,结束服务调用。In this callback, and after processing the returnObject (the response from the request to the server), the callback must either invoke next if it exists to continue processing other filters, or invoke finalCallback, which ends the service invocation.

用于 Node.js 的 Azure SDK 中包含实现重试逻辑的两个筛选器:ExponentialRetryPolicyFilterLinearRetryPolicyFilterTwo filters that implement retry logic are included with the Azure SDK for Node.js, ExponentialRetryPolicyFilter and LinearRetryPolicyFilter. 以下代码创建使用 ExponentialRetryPolicyFilterServiceBusService 对象:The following code creates a ServiceBusService object that uses the ExponentialRetryPolicyFilter:

var retryOperations = new azure.ExponentialRetryPolicyFilter();
var serviceBusService = azure.createServiceBusService().withFilter(retryOperations);

向队列发送消息Send messages to a queue

要向服务总线队列发送消息,应用程序需对 ServiceBusService 对象调用 sendQueueMessage 方法。To send a message to a Service Bus queue, your application calls the sendQueueMessage method on the ServiceBusService object. 发往服务总线队列的消息以及从服务总线队列接收的消息是 BrokeredMessage 对象,它们具有一组标准属性(如 LabelTimeToLive)、一个用来保存自定义应用程序特定属性的字典和一段任意应用程序数据正文。Messages sent to (and received from) Service Bus queues are BrokeredMessage objects, and have a set of standard properties (such as Label and TimeToLive), a dictionary that is used to hold custom application-specific properties, and a body of arbitrary application data. 应用程序可以通过将字符串作为消息传递来设置消息正文。An application can set the body of the message by passing a string as the message. 任何必需的标准属性将用默认值来填充。Any required standard properties are populated with default values.

以下示例演示如何使用 sendQueueMessage 向名为 myqueue 的队列发送一条测试消息:The following example demonstrates how to send a test message to the queue named myqueue using sendQueueMessage:

var message = {
    body: 'Test message',
    customProperties: {
        testproperty: 'TestValue'
    }};
serviceBusService.sendQueueMessage('myqueue', message, function(error){
    if(!error){
        // message sent
    }
});

服务总线队列在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。Service Bus queues support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 一个队列可包含的消息数不受限制,但消息的总大小受限。There is no limit on the number of messages held in a queue but there is a cap on the total size of the messages held by a queue. 此队列大小是在创建时定义的,上限为 5 GB。This queue size is defined at creation time, with an upper limit of 5 GB. 有关配额的详细信息,请参阅服务总线配额For more information about quotas, see Service Bus quotas.

从队列接收消息Receive messages from a queue

对 ServiceBusService 对象使用 receiveQueueMessage 方法可从队列接收消息。Messages are received from a queue using the receiveQueueMessage method on the ServiceBusService object. 默认情况下,消息被读取后即从队列删除;但是可以读取(速览)并锁定消息而不将其从队列删除,只要将可选参数 isPeekLock 设置为“true”即可。By default, messages are deleted from the queue as they are read; however, you can read (peek) and lock the message without deleting it from the queue by setting the optional parameter isPeekLock to true.

在接收过程中读取并删除消息的默认行为是最简单的模式,并且最适合在发生故障时应用程序可以容忍不处理消息的情况。The default behavior of reading and deleting the message as part of the receive operation is the simplest model, and works best for scenarios in which an application can tolerate not processing a message in the event of a failure. 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。To understand this, consider a scenario in which the consumer issues the receive request and then crashes before processing it. 由于服务总线会将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它会遗漏在发生崩溃前使用的消息。Because Service Bus will have marked the message as being consumed, then when the application restarts and begins consuming messages again, it will have missed the message that was consumed prior to the crash.

如果将 isPeekLock 参数设置为“true”,则接收会变成一个两阶段操作,从而可支持无法容忍遗漏消息的应用程序。If the isPeekLock parameter is set to true, the receive becomes a two stage operation, which makes it possible to support applications that cannot tolerate missing messages. 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。When Service Bus receives a request, it finds the next message to be consumed, locks it to prevent other consumers receiving it, and then returns it to the application. 应用程序处理完该消息(或将它可靠地存储起来留待将来处理)后,通过调用 deleteMessage 方法并提供要删除的消息作为参数,完成接收过程的第二阶段。After the application finishes processing the message (or stores it reliably for future processing), it completes the second stage of the receive process by calling deleteMessage method and providing the message to be deleted as a parameter. deleteMessage 方法会将消息标记为已使用,并将其从队列中删除。The deleteMessage method marks the message as being consumed and removes it from the queue.

以下示例演示如何使用 receiveQueueMessage 接收和处理消息。The following example demonstrates how to receive and process messages using receiveQueueMessage. 该示例先接收并删除一条消息,然后使用设置为“true”的 isPeekLock 接收一条消息,最后使用 deleteMessage 删除该消息:The example first receives and deletes a message, and then receives a message using isPeekLock set to true, then deletes the message using deleteMessage:

serviceBusService.receiveQueueMessage('myqueue', function(error, receivedMessage){
    if(!error){
        // Message received and deleted
    }
});
serviceBusService.receiveQueueMessage('myqueue', { isPeekLock: true }, function(error, lockedMessage){
    if(!error){
        // Message received and locked
        serviceBusService.deleteMessage(lockedMessage, function (deleteError){
            if(!deleteError){
                // Message deleted
            }
        });
    }
});

如何处理应用程序崩溃和不可读消息How to handle application crashes and unreadable messages

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 如果接收方应用程序因某种原因无法处理消息,则它可以对 ServiceBusService 对象调用 unlockMessage 方法。If a receiver application is unable to process the message for some reason, then it can call the unlockMessage method on the ServiceBusService object. 这会导致 Service Bus 解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。This will cause Service Bus to unlock the message within the queue and make it available to be received again, either by the same consuming application or by another consuming application.

还存在与队列中已锁定的消息相关联的超时,并且如果应用程序未能在锁定超时到期之前处理消息(例如,如果应用程序崩溃),服务总线则将自动解锁该消息,使它可以再次被接收。There is also a timeout associated with a message locked within the queue, and if the application fails to process the message before the lock timeout expires (for example, if the application crashes), then Service Bus will unlock the message automatically and make it available to be received again.

如果应用程序在处理消息之后,但在调用 deleteMessage 方法之前崩溃,则在应用程序重启时会将该消息重新传送给它。In the event that the application crashes after processing the message but before the deleteMessage method is called, then the message will be redelivered to the application when it restarts. 此情况通常称作至少处理一次,即每条消息至少被处理一次,但在某些情况下,同一消息可能会被重新传送。This is often called At Least Once Processing, that is, each message will be processed at least once but in certain situations the same message may be redelivered. 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。If the scenario cannot tolerate duplicate processing, then application developers should add additional logic to their application to handle duplicate message delivery. 这通常可以通过消息的 MessageId 属性来实现,该属性在多次传送尝试中保持不变。This is often achieved using the MessageId property of the message, which will remain constant across delivery attempts.

后续步骤Next steps

若要了解有关队列的详细信息,请参阅以下资源。To learn more about queues, see the following resources.