向 Azure 服务总线主题发送消息,并从该主题的订阅接收消息 (.NET)Send messages to an Azure Service Bus topic and receive messages from subscriptions to the topic (.NET)

本教程介绍如何创建 .NET Core 控制台应用,该应用将消息发送到服务总线主题,并从该主题的订阅接收消息。This tutorial shows you how to create a .NET Core console app that sends messages to a Service Bus topic and receives messages from a subscription of the topic.

重要

本快速入门使用新的 Azure.Messaging.ServiceBus 包。This quickstart uses the new Azure.Messaging.ServiceBus package. 有关使用旧的 Microsoft.Azure.ServiceBus 包的快速入门,请参阅使用 Microsoft.Azure.ServiceBus 包发送和接收消息For a quickstart that uses the old Microsoft.Azure.ServiceBus package, see Send and receive messages using the Microsoft.Azure.ServiceBus package.

先决条件Prerequisites

将消息发送到主题Send messages to a topic

在本部分中,你将在 Visual Studio 中创建 .NET Core 控制台应用程序,并添加代码以将消息发送到创建的主题。In this section, you'll create a .NET Core console application in Visual Studio, add code to send messages to the topic you created.

创建控制台应用程序Create a console application

启动 Visual Studio 并创建新的用于 C# 的 控制台应用 (.NET Core) 项目。Launch Visual Studio and create a new Console App (.NET Core) project for C#.

添加服务总线 NuGet 包Add the Service Bus NuGet package

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.
  2. 选择“浏览”。Select Browse. 搜索并选择 Azure.Messaging.ServiceBusSearch for and select Azure.Messaging.ServiceBus.
  3. 选择“安装”以完成安装,然后关闭“NuGet 包管理器”。Select Install to complete the installation, then close the NuGet Package Manager.

添加将消息发送到主题的代码Add code to send messages to the topic

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
  2. Program 类中声明以下变量:In the Program class, declare the following variables:

        static string connectionString = "<NAMESPACE CONNECTION STRING>";
        static string topicName = "<TOPIC NAME>";
        static string subscriptionName = "<SUBSCRIPTION NAME>";
    

    请替换以下值:Replace the following values:

    • <NAMESPACE CONNECTION STRING> 替换为服务总线命名空间的连接字符串<NAMESPACE CONNECTION STRING> with the connection string to your Service Bus namespace
    • <TOPIC NAME> 替换为主题名称<TOPIC NAME> with the name of the topic
    • <SUBSCRIPTION NAME> 替换为订阅的名称<SUBSCRIPTION NAME> with the name of the subscription
  3. 添加名为 SendMessageToTopicAsync 的方法,该方法向主题发送一条消息。Add a method named SendMessageToTopicAsync that sends one message to the topic.

        static async Task SendMessageToTopicAsync()
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the topic
                ServiceBusSender sender = client.CreateSender(topicName);
                await sender.SendMessageAsync(new ServiceBusMessage("Hello, World!"));
                Console.WriteLine($"Sent a single message to the topic: {topicName}");
            }
        }
    
  4. 添加一个名为 CreateMessages 的方法,以创建指向 Program 类的消息队列(.NET 队列)。Add a method named CreateMessages to create a queue (.NET queue) of messages to the Program class. 通常,可以从应用程序的不同部分获得这些消息。Typically, you get these messages from different parts of your application. 在这里,我们将创建一个示例消息队列。Here, we create a queue of sample messages.

        static Queue<ServiceBusMessage> CreateMessages()
        {
            // create a queue containing the messages and return it to the caller
            Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();
            messages.Enqueue(new ServiceBusMessage("First message"));
            messages.Enqueue(new ServiceBusMessage("Second message"));
            messages.Enqueue(new ServiceBusMessage("Third message"));
            return messages;
        }
    
  5. 将名为 SendMessageBatchAsync 的方法添加到 Program 类,并添加以下代码。Add a method named SendMessageBatchAsync to the Program class, and add the following code. 此方法使用消息队列,并准备一个或多个要发送到服务总线主题的批处理。This method takes a queue of messages, and prepares one or more batches to send to the Service Bus topic.

        static async Task SendMessageBatchToTopicAsync()
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
    
                // create a sender for the topic 
                ServiceBusSender sender = client.CreateSender(topicName);
    
                // get the messages to be sent to the Service Bus topic
                Queue<ServiceBusMessage> messages = CreateMessages();
    
                // total number of messages to be sent to the Service Bus topic
                int messageCount = messages.Count;
    
                // while all messages are not sent to the Service Bus topic
                while (messages.Count > 0)
                {
                    // start a new batch 
                    using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
    
                    // add the first message to the batch
                    if (messageBatch.TryAddMessage(messages.Peek()))
                    {
                        // dequeue the message from the .NET queue once the message is added to the batch
                        messages.Dequeue();
                    }
                    else
                    {
                        // if the first message can't fit, then it is too large for the batch
                        throw new Exception($"Message {messageCount - messages.Count} is too large and cannot be sent.");
                    }
    
                    // add as many messages as possible to the current batch
                    while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
                    {
                        // dequeue the message from the .NET queue as it has been added to the batch
                        messages.Dequeue();
                    }
    
                    // now, send the batch
                    await sender.SendMessagesAsync(messageBatch);
    
                    // if there are any remaining messages in the .NET queue, the while loop repeats 
                }
    
                Console.WriteLine($"Sent a batch of {messageCount} messages to the topic: {topicName}");
            }
        }
    
  6. Main() 方法替换为以下 async Main 方法。Replace the Main() method with the following async Main method. 它同时调用发送方法,将单个消息和一批消息发送到主题。It calls both the send methods to send a single message and a batch of messages to the topic.

        static async Task Main()
        {
            // send a message to the topic
            await SendMessageToTopicAsync();
    
            // send a batch of messages to the topic
            await SendMessageBatchToTopicAsync();
        }
    
  7. 运行该应用程序。Run the application. 应该会看到以下输出:You should see the following output:

    Sent a single message to the topic: mytopic
    Sent a batch of 3 messages to the topic: mytopic
    
  8. 在 Azure 门户中按照以下步骤操作:In the Azure portal, follow these steps:

    1. 导航到服务总线命名空间。Navigate to your Service Bus namespace.

    2. 在“概述”页底部居中位置的窗格中,切换到“主题”选项卡,然后选择“服务总线”主题 。On the Overview page, in the bottom-middle pane, switch to the Topics tab, and select the Service Bus topic. 下面的示例中采用的是 mytopicIn the following example, it's mytopic.

      选择主题

    3. 在“服务总线主题”页的“消息”图表中的底部“指标”部分中,可以看到主题有四条传入的消息 。On the Service Bus Topic page, In the Messages chart in the bottom Metrics section, you can see that there are four incoming messages for the topic. 如果未看到该值,请等待几分钟,然后刷新页面以查看更新后的图表。If you don't see the value, wait for a few minutes and refresh the page to see the updated chart.

      发送到主题的消息

    4. 在底部窗格中选择订阅。Select the subscription in the bottom pane. 下面的示例中采用的是 S1。In the following example, it's S1. 在“服务总线订阅”页面上,你会看到“活动消息计数”为 4 。On the Service Bus Subscription page, you see the Active message count as 4. 订阅已接收你发送到主题的四条消息,但接收方尚未选择它们。The subscription has received the four messages that you sent to the topic, but no receiver has picked them yet.

      订阅中接收的消息

从订阅接收消息Receive messages from a subscription

  1. 将以下方法添加到处理消息和任何错误的 Program 类中。Add the following methods to the Program class that handle messages and any errors.

        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine($"Received: {body} from subscription: {subscriptionName}");
    
            // complete the message. messages is deleted from the queue. 
            await args.CompleteMessageAsync(args.Message);
        }
    
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    
  2. 将以下方法 ReceiveMessagesFromSubscriptionAsync 添加到 Program 类。Add the following method ReceiveMessagesFromSubscriptionAsync to the Program class.

        static async Task ReceiveMessagesFromSubscriptionAsync()
        {
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
    
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;
    
                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;
    
                // start processing 
                await processor.StartProcessingAsync();
    
                Console.WriteLine("Wait for a minute and then press any key to end the processing");
                Console.ReadKey();
    
                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
        }
    
  3. 将对 ReceiveMessagesFromSubscriptionAsync 方法的调用添加到 Main 方法。Add a call to the ReceiveMessagesFromSubscriptionAsync method to the Main method. 如果只想对接收消息进行测试,请注释禁止 SendMessagesToTopicAsync 方法。Comment out the SendMessagesToTopicAsync method if you want to test only receiving of messages. 如果不这样做,则会看到发送到主题的其他四条消息。If you don't, you see another four messages sent to the topic.

        static async Task Main()
        {
            // send a message to the topic
            await SendMessageToTopicAsync();
    
            // send a batch of messages to the topic
            await SendMessageBatchToTopicAsync();
    
            // receive messages from the subscription
            await ReceiveMessagesFromSubscriptionAsync();
        }
    

运行应用Run the app

运行该应用程序。Run the application. 等待一分钟,然后按任意键停止接收消息。Wait for a minute and then press any key to stop receiving messages. 你应该会看到以下输出(密钥的空格)。You should see the following output (spacebar for the key).

Sent a single message to the topic: mytopic
Sent a batch of 3 messages to the topic: mytopic
Wait for a minute and then press any key to end the processing
Received: Hello, World! from subscription: mysub
Received: First message from subscription: mysub
Received: Second message from subscription: mysub
Received: Third message from subscription: mysub
Received: Hello, World! from subscription: mysub
Received: First message from subscription: mysub
Received: Second message from subscription: mysub
Received: Third message from subscription: mysub

Stopping the receiver...
Stopped receiving messages

再次检查门户。Check the portal again.

  • 在“服务总线主题”页面上的“消息”图表中,你会看到八条传入消息和八条传出消息 。On the Service Bus Topic page, in the Messages chart, you see eight incoming messages and eight outgoing messages. 如果未看到这些值,请等待几分钟,然后刷新页面以查看更新后的图表。If you don't see these numbers, wait for a few minutes, and refresh the page to see the updated chart.

    发送和接收的消息

  • 在“服务总线订阅”页面上,你会看到“活动消息计数”为零 。On the Service Bus Subscription page, you see the Active message count as zero. 这是因为接收方已接收并已完成来自此订阅的消息。It's because a receiver has received messages from this subscription and completed the messages.

    末尾订阅中的活动消息计数

后续步骤Next steps

请参阅以下文档和示例:See the following documentation and samples: