向 Azure 服务总线主题发送消息,并从该主题的订阅接收消息 (JavaScript)

本教程介绍如何使用 JavaScript 程序中的 @azure/service-bus 包将消息发送到服务总线主题,并从该主题的服务总线订阅接收消息。

先决条件

备注

使用节点包管理器 (NPM) 安装包

若要安装服务总线的 npm 包,请打开路径中包含 npm 的命令提示符,将目录更改为要包含示例的文件夹,然后运行此命令

npm install @azure/service-bus

将消息发送到主题

下面的示例代码演示如何将一批消息发送到服务总线主题。 请参阅代码注释以了解详细信息。

  1. 打开你喜好的编辑器,例如 Visual Studio Code

  2. 创建一个名为 sendtotopic.js 的文件,并将下面的代码粘贴到其中。 此代码向主题发送一条消息。

    const { ServiceBusClient } = require("@azure/service-bus");
    
    const connectionString = "<SERVICE BUS NAMESPACE CONNECTION STRING>"
    const topicName = "<TOPIC NAME>";
    
    const messages = [
        { body: "Albert Einstein" },
        { body: "Werner Heisenberg" },
        { body: "Marie Curie" },
        { body: "Steven Hawking" },
        { body: "Isaac Newton" },
        { body: "Niels Bohr" },
        { body: "Michael Faraday" },
        { body: "Galileo Galilei" },
        { body: "Johannes Kepler" },
        { body: "Nikolaus Kopernikus" }
     ];
    
     async function main() {
        // create a Service Bus client using the connection string to the Service Bus namespace
        const sbClient = new ServiceBusClient(connectionString);
    
        // createSender() can also be used to create a sender for a queue.
        const sender = sbClient.createSender(topicName);
    
        try {
            // Tries to send all messages in a single batch.
            // Will fail if the messages cannot fit in a batch.
            // await sender.sendMessages(messages);
    
            // create a batch object
            let batch = await sender.createMessageBatch(); 
            for (let i = 0; i < messages.length; i++) {
                // for each message in the arry         
    
                // try to add the message to the batch
                if (!batch.tryAddMessage(messages[i])) {            
                    // if it fails to add the message to the current batch
                    // send the current batch as it is full
                    await sender.sendMessages(batch);
    
                    // then, create a new batch 
                    batch = await sender.createMessageBatch();
    
                    // now, add the message failed to be added to the previous batch to this batch
                    if (!batch.tryAddMessage(messages[i])) {
                        // if it still can't be added to the batch, the message is probably too big to fit in a batch
                        throw new Error("Message too big to fit in a batch");
                    }
                }
            }
    
            // Send the last created batch of messages to the topic
            await sender.sendMessages(batch);
    
            console.log(`Sent a batch of messages to the topic: ${topicName}`);
    
            // Close the sender
            await sender.close();
        } finally {
            await sbClient.close();
        }
    }
    
    // call the main function
    main().catch((err) => {
        console.log("Error occurred: ", err);
        process.exit(1);
     });    
    
  3. <SERVICE BUS NAMESPACE CONNECTION STRING> 替换为服务总线命名空间的连接字符串。

  4. <TOPIC NAME> 替换为主题名称。

  5. 然后,在命令提示符中运行该命令以执行此文件。

    node sendtotopic.js 
    
  6. 你会看到以下输出。

    Sent a batch of messages to the topic: mytopic
    

从订阅接收消息

  1. 打开你喜好的编辑器,例如 Visual Studio Code

  2. 创建名为 receivefromsubscription.js 的文件,然后将以下代码粘贴到其中。 请参阅代码注释以了解详细信息。

    const { delay, ServiceBusClient, ServiceBusMessage } = require("@azure/service-bus");
    
    const connectionString = "<SERVICE BUS NAMESPACE CONNECTION STRING>"
    const topicName = "<TOPIC NAME>";
    const subscriptionName = "<SUBSCRIPTION NAME>";
    
     async function main() {
        // create a Service Bus client using the connection string to the Service Bus namespace
        const sbClient = new ServiceBusClient(connectionString);
    
        // createReceiver() can also be used to create a receiver for a queue.
        const receiver = sbClient.createReceiver(topicName, subscriptionName);
    
        // function to handle messages
        const myMessageHandler = async (messageReceived) => {
            console.log(`Received message: ${messageReceived.body}`);
        };
    
        // function to handle any errors
        const myErrorHandler = async (error) => {
            console.log(error);
        };
    
        // subscribe and specify the message and error handlers
        receiver.subscribe({
            processMessage: myMessageHandler,
            processError: myErrorHandler
        });
    
        // Waiting long enough before closing the sender to send messages
        await delay(5000);
    
        await receiver.close(); 
        await sbClient.close();
    }
    
    // call the main function
    main().catch((err) => {
        console.log("Error occurred: ", err);
        process.exit(1);
     });    
    
  3. <SERVICE BUS NAMESPACE CONNECTION STRING> 替换为命名空间的连接字符串。

  4. <TOPIC NAME> 替换为主题名称。

  5. <SUBSCRIPTION NAME> 替换为主题的订阅名称。

  6. 然后,在命令提示符中运行该命令以执行此文件。

    node receivefromsubscription.js
    
  7. 你会看到以下输出。

    Received message: Albert Einstein
    Received message: Werner Heisenberg
    Received message: Marie Curie
    Received message: Steven Hawking
    Received message: Isaac Newton
    Received message: Niels Bohr
    Received message: Michael Faraday
    Received message: Galileo Galilei
    Received message: Johannes Kepler
    Received message: Nikolaus Kopernikus
    

在 Azure 门户中,导航到服务总线命名空间,然后选择底部窗格中的主题,以查看主题的“服务总线主题”页面。 在此页上,应会在“消息”图表中看到三条传入消息和三条传出消息。

传入和传出消息

如果下次仅运行发送应用,那么在“服务总线主题”页上,除 3 条传出消息外,你还会看到 6 条传入消息(3 条新消息)。

已更新的主题页

在此页上,如果选择一个订阅,则将转到“服务总线订阅”页。 可以在此页上查看活动消息计数、死信消息计数等。 在此示例中,还有三条活动消息未被接收器接收。

活动消息计数

后续步骤

请参阅以下文档和示例: