如何通过 Node.js 使用服务总线主题和订阅How to Use Service Bus topics and subscriptions with Node.js

本指南介绍如何从 Node.js 应用程序使用服务总线主题和订阅。This guide describes how to use Service Bus topics and subscriptions from Node.js applications. 涉及的方案包括:The scenarios covered include:

  • 创建主题和订阅Creating topics and subscriptions
  • 创建订阅筛选器Creating subscription filters
  • 将消息发送到主题Sending messages to a topic
  • 从订阅接收消息Receiving messages from a subscription
  • 删除主题和订阅Deleting topics and subscriptions

有关主题和订阅的详细信息,请参阅后续步骤一节。For more information about topics and subscriptions, see Next steps section.

先决条件Prerequisites

  1. Azure 订阅。An Azure subscription. 若要完成本教程,需要一个 Azure 帐户。To complete this tutorial, you need an Azure account. 你可以激活 Visual Studio 或 MSDN 订阅者权益或者注册免费试用帐户You can activate your Visual Studio or MSDN subscriber benefits or sign up for a trial account.

  2. 按照快速入门:使用 Azure 门户创建一个服务总线主题和对此主题的订阅来创建服务总线命名空间并获取连接字符串Follow steps in the Quickstart: Use the Azure portal to create a Service Bus topic and subscriptions to the topic to create a Service Bus namespace and get the connection string.

    Note

    在本快速入门中,你将使用 Node.js 创建一个主题和对此主题的订阅You will create a topic and a subscription to the topic by using Node.js in this quickstart.

创建 Node.js 应用程序Create a Node.js application

创建一个空的 Node.js 应用程序。Create a blank Node.js application. 有关创建 Node.js 应用程序的说明,请参阅创建 Node.js 应用程序并将其部署到 Azure 网站、使用 Windows PowerShell 创建 Node.js 云服务或使用 WebMatrix 创建网站。For instructions on creating a Node.js application, see Create and deploy a Node.js application to an Azure Web Site, Node.js Cloud Service using Windows PowerShell, or Web Site with WebMatrix.

配置应用程序以使用服务总线Configure your application to use Service Bus

若要使用服务总线,请下载 Node.js Azure 包。To use Service Bus, download the Node.js Azure package. 此程序包包括一组用来与服务总线 REST 服务通信的库。This package includes a set of libraries that communicate with the Service Bus REST services.

使用 Node 包管理器 (NPM) 可获取该程序包Use Node Package Manager (NPM) to obtain the package

  1. 打开命令行接口,例如 PowerShell (Windows)、Terminal (Mac) 或 Bash (Unix)。Open a command-line interface such as PowerShell (Windows), Terminal (Mac), or Bash (Unix).

  2. 导航到创建示例应用程序的文件夹。Navigate to the folder where you created your sample application.

  3. 在命令窗口中键入 npm install azure ,这应会生成以下输出:Type npm install azure in the command window, which should result in the following output:

        azure@0.7.5 node_modules\azure
    ├── dateformat@1.0.2-1.2.3
    ├── xmlbuilder@0.4.2
    ├── node-uuid@1.2.0
    ├── mime@1.2.9
    ├── underscore@1.4.4
    ├── validator@1.1.1
    ├── tunnel@0.0.2
    ├── wns@0.5.3
    ├── xml2js@0.2.7 (sax@0.5.2)
    └── request@2.21.0 (json-stringify-safe@4.0.0, forever-agent@0.5.0, aws-sign@0.3.0, tunnel-agent@0.3.0, oauth-sign@0.3.0, qs@0.6.5, cookie-jar@0.3.0, node-uuid@1.4.0, http-signature@0.9.11, form-data@0.0.8, hawk@0.13.1)
    
  4. 可以手动运行 ls 命令来验证是否创建了 node_modules 文件夹。You can manually run the ls command to verify that a node_modules folder was created. 在该文件夹中,找到 azure 包,其中包含访问服务总线主题所需的库。Inside that folder, find the azure package, which contains the libraries you need to access Service Bus topics.

导入模块Import the module

使用记事本或其他文本编辑器将以下内容添加到应用程序的 server.js 文件的顶部:Using Notepad or another text editor, add the following to the top of the server.js file of the application:

var azure = require('azure');

设置服务总线连接Set up a Service Bus connection

Azure 模块将读取前面在执行步骤“获取凭据”时获取的连接字符串的环境变量 AZURE_SERVICEBUS_CONNECTION_STRINGThe Azure module reads the environment variable AZURE_SERVICEBUS_CONNECTION_STRING for the connection string that you obtained from the earlier step, "Obtain the credentials." 如果未设置此环境变量,则在调用 createServiceBusService 时必须指定帐户信息。If this environment variable is not set, you must specify the account information when calling createServiceBusService.

有关设置 Azure 云服务环境变量的示例,请参阅使用存储的 Node.js 云服务For an example of setting the environment variables for an Azure Cloud Service, see Node.js Cloud Service with Storage.

创建主题Create a topic

可以通过 ServiceBusService 对象处理主题。The ServiceBusService object enables you to work with topics. 以下代码创建 ServiceBusService 对象。The following code creates a ServiceBusService object. 将它添加到靠近 server.js 文件顶部、用于导入 azure 模块的语句之后的位置:Add it near the top of the server.js file, after the statement to import the azure module:

var serviceBusService = azure.createServiceBusService();

如果调用 ServiceBusService 对象上的 createTopicIfNotExists,会返回指定的主题(如果存在),否则会使用指定的名称创建一个新主题。If you call createTopicIfNotExists on the ServiceBusService object, the specified topic is returned (if it exists), or a new topic with the specified name is created. 以下代码使用 createTopicIfNotExists 创建或连接到名为 MyTopic 的主题:The following code uses createTopicIfNotExists to create or connect to the topic named MyTopic:

serviceBusService.createTopicIfNotExists('MyTopic',function(error){
    if(!error){
        // Topic was created or exists
        console.log('topic created or exists.');
    }
});

createTopicIfNotExists 方法还支持其他选项,通过这些选项可以重写默认主题设置,例如消息生存时间或最大主题大小。The createTopicIfNotExists method also supports additional options, which enable you to override default topic settings such as message time to live or maximum topic size.

以下示例将最大主题大小设置为 5GB,将生存时间设置为 1 分钟:The following example sets the maximum topic size to 5 GB with a time to live of one minute:

var topicOptions = {
        MaxSizeInMegabytes: '5120',
        DefaultMessageTimeToLive: 'PT1M'
    };

serviceBusService.createTopicIfNotExists('MyTopic', topicOptions, function(error){
    if(!error){
        // topic was created or exists
    }
});

筛选器Filters

可选的筛选操作可应用于使用 ServiceBusService 执行的操作。Optional filtering operations can be applied to operations performed using ServiceBusService. 筛选操作可包括日志记录、自动重试等。筛选器是实现具有签名的方法的对象:Filtering operations can include logging, automatically retrying, etc. Filters are objects that implement a method with the signature:

function handle (requestOptions, next)

在对请求选项执行预处理后,该方法会调用 next,传递具有以下签名的回叫:After performing preprocessing on the request options, the method calls next, and passes a callback with the following signature:

function (returnObject, finalCallback, next)

在此回叫中并且在处理 returnObject(来自对服务器请求的响应)后,回叫必须调用 next(如果存在),继续处理其他筛选器或调用 finalCallback 以结束服务调用。In this callback, and after processing the returnObject (the response from the request to the server), the callback must either invoke next (if it exists) to continue processing other filters, or invoke finalCallback to end the service invocation.

Azure SDK for Node.js 中附带了两个实现了重试逻辑的筛选器,分别是 ExponentialRetryPolicyFilterLinearRetryPolicyFilterTwo filters that implement retry logic are included with the Azure SDK for Node.js, ExponentialRetryPolicyFilter and LinearRetryPolicyFilter. 以下代码创建一个 ServiceBusService 对象,该对象使用 ExponentialRetryPolicyFilter:The following code creates a ServiceBusService object that uses the ExponentialRetryPolicyFilter:

var retryOperations = new azure.ExponentialRetryPolicyFilter();
var serviceBusService = azure.createServiceBusService().withFilter(retryOperations);

创建订阅Create subscriptions

主题订阅也是使用 ServiceBusService 对象创建的。Topic subscriptions are also created with the ServiceBusService object. 订阅已命名,并可具有可选筛选器,用于限制传送到订阅的虚拟队列的消息集。Subscriptions are named, and can have an optional filter that restricts the set of messages delivered to the subscription's virtual queue.

Note

除非删除它或与之相关的主题,否则订阅是永久性的。Subscriptions are persistent until either they, or the topic they are associated with, are deleted. 如果应用程序包含创建订阅的逻辑,则它应首先使用 getSubscription 方法检查该订阅是否存在。If your application contains logic to create a subscription, it should first check if the subscription exists by using the getSubscription method.

创建具有默认 (MatchAll) 筛选器的订阅Create a subscription with the default (MatchAll) filter

MatchAll 筛选器是创建订阅时使用的默认筛选器。The MatchAll filter is the default filter used when a subscription is created. 使用 MatchAll 筛选器时,发布到主题的所有消息都会置于订阅的虚拟队列中。When you use the MatchAll filter, all messages published to the topic are placed in the subscription's virtual queue. 以下示例创建名为“AllMessages”的订阅,并使用默认的 MatchAll 筛选器。The following example creates a subscription named AllMessages and uses the default MatchAll filter.

serviceBusService.createSubscription('MyTopic','AllMessages',function(error){
    if(!error){
        // subscription created
    }
});

创建具有筛选器的订阅Create subscriptions with filters

还可以创建筛选器,以确定发送到主题的哪些消息应该在特定主题订阅中显示。You can also create filters that allow you to scope which messages sent to a topic should show up within a specific topic subscription.

订阅支持的最灵活的一种筛选器是 SqlFilter,它实现了一部分 SQL92 功能。The most flexible type of filter supported by subscriptions is the SqlFilter, which implements a subset of SQL92. SQL 筛选器将对发布到主题的消息的属性进行操作。SQL filters operate on the properties of the messages that are published to the topic. 有关可用于 SQL 筛选器的表达式的更多详细信息,请参阅 SqlFilter.SqlExpression 语法。For more details about the expressions that can be used with a SQL filter, review the SqlFilter.SqlExpression syntax.

可以使用 ServiceBusService 对象的 createRule 方法向订阅中添加筛选器。Filters can be added to a subscription by using the createRule method of the ServiceBusService object. 此方法允许向现有订阅中添加新筛选器。This method allows you to add new filters to an existing subscription.

Note

由于默认筛选器会自动应用到所有新订阅,因此,必须首先删除默认筛选器,否则 MatchAll 会替代你可能指定的任何其他筛选器。Because the default filter is applied automatically to all new subscriptions, you must first remove the default filter or the MatchAll will override any other filters you may specify. 可以使用 ServiceBusService 对象的 deleteRule 方法删除默认规则。You can remove the default rule by using the deleteRule method of the ServiceBusService object.

以下示例创建一个名为 HighMessages 的订阅,该订阅包含一个 SqlFilter,它仅选择自定义 messagenumber 属性大于 3 的消息:The following example creates a subscription named HighMessages with a SqlFilter that only selects messages that have a custom messagenumber property greater than 3:

serviceBusService.createSubscription('MyTopic', 'HighMessages', function (error){
    if(!error){
        // subscription created
        rule.create();
    }
});
var rule={
    deleteDefault: function(){
        serviceBusService.deleteRule('MyTopic',
            'HighMessages', 
            azure.Constants.ServiceBusConstants.DEFAULT_RULE_NAME, 
            rule.handleError);
    },
    create: function(){
        var ruleOptions = {
            sqlExpressionFilter: 'messagenumber > 3'
        };
        rule.deleteDefault();
        serviceBusService.createRule('MyTopic', 
            'HighMessages', 
            'HighMessageFilter', 
            ruleOptions, 
            rule.handleError);
    },
    handleError: function(error){
        if(error){
            console.log(error)
        }
    }
}

同样,以下示例创建一个名为 LowMessages 的订阅,该订阅包含一个 SqlFilter,它仅选择 messagenumber 属性小于或等于 3 的消息:Similarly, the following example creates a subscription named LowMessages with a SqlFilter that only selects messages that have a messagenumber property less than or equal to 3:

serviceBusService.createSubscription('MyTopic', 'LowMessages', function (error){
    if(!error){
        // subscription created
        rule.create();
    }
});
var rule={
    deleteDefault: function(){
        serviceBusService.deleteRule('MyTopic',
            'LowMessages', 
            azure.Constants.ServiceBusConstants.DEFAULT_RULE_NAME, 
            rule.handleError);
    },
    create: function(){
        var ruleOptions = {
            sqlExpressionFilter: 'messagenumber <= 3'
        };
        rule.deleteDefault();
        serviceBusService.createRule('MyTopic', 
            'LowMessages', 
            'LowMessageFilter', 
            ruleOptions, 
            rule.handleError);
    },
    handleError: function(error){
        if(error){
            console.log(error)
        }
    }
}

现在,当消息发送到 MyTopic 时,它会传送给订阅了 AllMessages 主题订阅的接收者,并且选择性地传送给订阅了 HighMessagesLowMessages 主题订阅的接收者(具体取决于消息内容)。When a message is now sent to MyTopic, it is delivered to receivers subscribed to the AllMessages topic subscription, and selectively delivered to receivers subscribed to the HighMessages and LowMessages topic subscriptions (depending upon the message content).

如何将消息发送到主题How to send messages to a topic

要将消息发送到服务总线主题,应用程序必须使用 ServiceBusService 对象的 sendTopicMessage 方法。To send a message to a Service Bus topic, your application must use the sendTopicMessage method of the ServiceBusService object. 发送到服务总线主题的消息是 BrokeredMessage 对象。Messages sent to Service Bus topics are BrokeredMessage objects. BrokeredMessage 对象具有一组标准属性(如 LabelTimeToLive)、一个用于保存特定于应用程序的自定义属性的字典,以及一段字符串数据正文。BrokeredMessage objects have a set of standard properties (such as Label and TimeToLive), a dictionary that is used to hold custom application-specific properties, and a body of string data. 应用程序可以通过将字符串值传递给 sendTopicMessage 设置消息正文,并且任何必需的标准属性将用默认值填充。An application can set the body of the message by passing a string value to the sendTopicMessage and any required standard properties are populated by default values.

以下示例演示如何向 MyTopic发送五条测试消息。The following example demonstrates how to send five test messages to MyTopic. 每条消息的 messagenumber 属性值因循环迭代而异(此属性确定哪些订阅接收它):The messagenumber property value of each message varies on the iteration of the loop (this property determines which subscriptions receive it):

var message = {
    body: '',
    customProperties: {
        messagenumber: 0
    }
}

for (i = 0;i < 5;i++) {
    message.customProperties.messagenumber=i;
    message.body='This is Message #'+i;
    serviceBusService.sendTopicMessage(topic, message, function(error) {
      if (error) {
        console.log(error);
      }
    });
}

服务总线主题在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。Service Bus topics support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 一个主题中包含的消息数量不受限制,但消息的总大小受限制。There is no limit on the number of messages held in a topic, but there is a limit on the total size of the messages held by a topic. 此主题大小是在创建时定义的,上限为 5 GB。This topic size is defined at creation time, with an upper limit of 5 GB.

从订阅接收消息Receive messages from a subscription

对 ServiceBusService 对象使用 receiveSubscriptionMessage 方法可从订阅接收消息。Messages are received from a subscription using the receiveSubscriptionMessage method on the ServiceBusService object. 默认情况下,会在读取消息后将其从订阅删除。By default, messages are deleted from the subscription as they are read. 但是,可以将可选参数 isPeekLock 设置为“true”以读取(速览)并锁定消息,而不将其从订阅中删除。However, you can set the optional parameter isPeekLock to true to read (peek) and lock the message without deleting it from the subscription.

在接收过程中读取并删除消息的默认行为是最简单的模式,并且最适合在发生故障时应用程序可以容忍不处理消息的情况。The default behavior of reading and deleting the message as part of the receive operation is the simplest model, and works best for scenarios in which an application can tolerate not processing a message in the event of a failure. 为了理解此行为,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。To understand this behavior, consider a scenario in which the consumer issues the receive request and then crashes before processing it. 由于服务总线已将消息标记为“已使用”,因此当应用程序重新启动并重新开始使用消息时,它会漏掉在发生崩溃前使用的消息。Because Service Bus has marked the message as being consumed, then when the application restarts and begins consuming messages again, it has missed the message that was consumed prior to the crash.

如果将 isPeekLock 参数设置为“true”,则接收会变成一个两阶段操作,从而可支持无法容忍遗漏消息的应用程序。If the isPeekLock parameter is set to true, the receive becomes a two-stage operation, which makes it possible to support applications that cannot tolerate missed messages. 服务总线收到请求时,它会找到要使用的下一个消息,将其锁定以防其他使用者接收它,并将该消息返回给应用程序。When Service Bus receives a request, it finds the next message to consume, locks it to prevent other consumers from receiving it, and returns it to the application. 应用程序处理该消息(或将它可靠地存储起来留待将来处理)后,通过调用 deleteMessage 方法来完成接收过程的第二阶段,并将要删除的消息作为参数传递。After the application processes the message (or stores it reliably for future processing), it completes the second stage of the receive process by calling deleteMessage method, and passes the message to delete as a parameter. deleteMessage 方法会将消息标记为已使用,并将其从订阅中删除。The deleteMessage method marks the message as consumed and removes it from the subscription.

以下示例演示如何使用 receiveSubscriptionMessage 接收和处理消息。The following example demonstrates how messages can be received and processed using receiveSubscriptionMessage. 该示例先从“LowMessages”订阅接收并删除一条消息,然后将 isPeekLock 设置为“true”,从“HighMessages”订阅接收一条消息。The example first receives and deletes a message from the 'LowMessages' subscription, and then receives a message from the 'HighMessages' subscription using isPeekLock set to true. 最后使用 deleteMessage 删除该消息:It then deletes the message using deleteMessage:

serviceBusService.receiveSubscriptionMessage('MyTopic', 'LowMessages', function(error, receivedMessage){
    if(!error){
        // Message received and deleted
        console.log(receivedMessage);
    }
});
serviceBusService.receiveSubscriptionMessage('MyTopic', 'HighMessages', { isPeekLock: true }, function(error, lockedMessage){
    if(!error){
        // Message received and locked
        console.log(lockedMessage);
        serviceBusService.deleteMessage(lockedMessage, function (deleteError){
            if(!deleteError){
                // Message deleted
                console.log('message has been deleted.');
            }
        })
    }
});

如何处理应用程序崩溃和不可读消息How to handle application crashes and unreadable messages

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 如果接收方应用程序因某种原因无法处理消息,则它可以对 ServiceBusService 对象调用 unlockMessage 方法。If a receiver application is unable to process the message for some reason, then it can call the unlockMessage method on the ServiceBusService object. 此方法会导致服务总线解锁订阅中的消息并使其能够重新被接收。This method causes Service Bus to unlock the message within the subscription and make it available to be received again. 在此实例中,消息被相同的使用方应用程序或另一个使用方应用程序重新接收。In this instance, either by the same consuming application or by another consuming application.

订阅中也有一个超时与锁定的消息关联。There is also a timeout associated with a message locked within the subscription. 如果应用程序无法在锁定超时期满前处理消息(例如,如果应用程序发生故障),服务总线会自动解锁消息,让它再次可供接收。If the application fails to process the message before the lock timeout expires (for example, if the application crashes), then Service Bus unlocks the message automatically and makes it available to be received again.

如果应用程序在处理消息之后,但在调用 deleteMessage 方法之前崩溃,则在应用程序重启时会将该消息重新传送给它。In the event the application crashes after processing the message but before the deleteMessage method is called, the message is redelivered to the application when it restarts. 此行为通常称为“至少处理一次”。This behavior is often called At Least Once Processing. 也就是说,每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。That is, each message is processed at least once, but in certain situations the same message may be redelivered. 如果方案不允许重复处理,则应该向应用程序添加逻辑来处理重复消息传送。If the scenario cannot tolerate duplicate processing, then you should add logic to your application to handle duplicate message delivery. 可以使用消息的 MessageId 属性,该属性在各次传送尝试中保持不变。You can use the MessageId property of the message, which remains constant across delivery attempts.

删除主题和订阅Delete topics and subscriptions

主题和订阅具有持久性,必须通过 Azure 门户或以编程方式显式删除。Topics and subscriptions are persistent, and must be explicitly deleted either through the Azure portal or programmatically. 以下示例演示了如何删除名为 MyTopic的主题:The following example demonstrates how to delete the topic named MyTopic:

serviceBusService.deleteTopic('MyTopic', function (error) {
    if (error) {
        console.log(error);
    }
});

删除某个主题也会删除向该主题注册的所有订阅。Deleting a topic also deletes any subscriptions that are registered with the topic. 也可以单独删除订阅。Subscriptions can also be deleted independently. 以下示例说明如何从 MyTopic 主题删除名为 HighMessages 的订阅:The following example shows how to delete a subscription named HighMessages from the MyTopic topic:

serviceBusService.deleteSubscription('MyTopic', 'HighMessages', function (error) {
    if(error) {
        console.log(error);
    }
});

后续步骤Next Steps

现在,已了解有关 Service Bus 主题的基础知识,单击下面的链接可了解更多信息。Now that you've learned the basics of Service Bus topics, follow these links to learn more.