向/从服务总线队列发送/接收消息 (.NET)Send messages to and receive messages from Azure Service Bus queues (.NET)

在本教程中,你将使用 Azure.Messaging.ServiceBus 包创建 .NET Core C# 控制台应用程序,用于向/从 Azure 服务总线队列发送/接收消息。In this tutorial, you create a .NET Core console application to send messages to and receive messages from a Service Bus queue using the Azure.Messaging.ServiceBus package.

重要

本快速入门使用新的 Azure.Messaging.ServiceBus 包。This quickstart uses the new Azure.Messaging.ServiceBus package. 有关使用旧的 Microsoft.Azure.ServiceBus 包的快速入门,请参阅使用 Microsoft.Azure.ServiceBus 包发送和接收事件For a quickstart that uses the old Microsoft.Azure.ServiceBus package, see Send and receive events using Microsoft.Azure.ServiceBus package.

先决条件Prerequisites

向队列发送消息Send messages to a queue

在此快速入门中,你将创建 C# .Net Core 控制台应用程序,以将消息发送到队列。In this quick start, you'll create a C# .NET Core console application to send messages to the queue.

创建控制台应用程序Create a console application

启动 Visual Studio 并创建新的用于 C# 的 控制台应用 (.NET Core) 项目。Launch Visual Studio and create a new Console App (.NET Core) project for C#.

添加服务总线 NuGet 包Add the Service Bus NuGet package

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.
  2. 选择“浏览”。Select Browse. 搜索并选择 Azure.Messaging.ServiceBusSearch for and select Azure.Messaging.ServiceBus.
  3. 选择“安装”以完成安装,然后关闭“NuGet 包管理器”。Select Install to complete the installation, then close the NuGet Package Manager.

添加将消息发送到队列的代码Add code to send messages to the queue

  1. Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    
    using Azure.Messaging.ServiceBus;
    
  2. Program 类中声明以下变量:In the Program class, declare the following variables:

        static string connectionString = "<NAMESPACE CONNECTION STRING>";
        static string queueName = "<QUEUE NAME>";
    

    connectionString 变量的形式输入命名空间的连接字符串。Enter your connection string for the namespace as the connectionString variable. 输入队列名称。Enter your queue name.

  3. 紧跟在 Main() 方法的后面,添加以下 SendMessagesAsync() 方法来发送消息:Directly after the Main() method, add the following SendMessagesAsync() method that does the work of sending a message:

        static async Task SendMessageAsync()
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the queue 
                ServiceBusSender sender = client.CreateSender(queueName);
    
                // create a message that we can send
                ServiceBusMessage message = new ServiceBusMessage("Hello world!");
    
                // send the message
                await sender.SendMessageAsync(message);
                Console.WriteLine($"Sent a single message to the queue: {queueName}");
            }
        }
    
  4. 添加一个名为 CreateMessages 的方法,以创建指向 Program 类的消息队列(.NET 队列)。Add a method named CreateMessages to create a queue (.NET queue) of messages to the Program class. 通常,可以从应用程序的不同部分获得这些消息。Typically, you get these messages from different parts of your application. 在这里,我们将创建一个示例消息队列。Here, we create a queue of sample messages.

        static Queue<ServiceBusMessage> CreateMessages()
        {
            // create a queue containing the messages and return it to the caller
            Queue<ServiceBusMessage> messages = new Queue<ServiceBusMessage>();
            messages.Enqueue(new ServiceBusMessage("First message in the batch"));
            messages.Enqueue(new ServiceBusMessage("Second message in the batch"));
            messages.Enqueue(new ServiceBusMessage("Third message in the batch"));
            return messages;
        }
    
  5. 将名为 SendMessageBatchAsync 的方法添加到 Program 类,并添加以下代码。Add a method named SendMessageBatchAsync to the Program class, and add the following code. 此方法使用消息队列,并准备一个或多个要发送到服务总线队列的批处理。This method takes a queue of messages, and prepares one or more batches to send to the Service Bus queue.

        static async Task SendMessageBatchAsync()
        {
            // create a Service Bus client 
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a sender for the queue 
                ServiceBusSender sender = client.CreateSender(queueName);
    
                // get the messages to be sent to the Service Bus queue
                Queue<ServiceBusMessage> messages = CreateMessages();
    
                // total number of messages to be sent to the Service Bus queue
                int messageCount = messages.Count;
    
                // while all messages are not sent to the Service Bus queue
                while (messages.Count > 0)
                {
                    // start a new batch 
                    using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
    
                    // add the first message to the batch
                    if (messageBatch.TryAddMessage(messages.Peek()))
                    {
                        // dequeue the message from the .NET queue once the message is added to the batch
                        messages.Dequeue();
                    }
                    else
                    {
                        // if the first message can't fit, then it is too large for the batch
                        throw new Exception($"Message {messageCount - messages.Count} is too large and cannot be sent.");
                    }
    
                    // add as many messages as possible to the current batch
                    while (messages.Count > 0 && messageBatch.TryAddMessage(messages.Peek()))
                    {
                        // dequeue the message from the .NET queue as it has been added to the batch
                        messages.Dequeue();
                    }
    
                    // now, send the batch
                    await sender.SendMessagesAsync(messageBatch);
    
                    // if there are any remaining messages in the .NET queue, the while loop repeats 
                }
    
                Console.WriteLine($"Sent a batch of {messageCount} messages to the topic: {queueName}");
            }
        }
    
  6. Main() 方法替换为以下 async Main 方法。Replace the Main() method with the following async Main method. 它同时调用发送方法,将单个消息和一批消息发送到队列。It calls both the send methods to send a single message and a batch of messages to the queue.

        static async Task Main()
        {
            // send a message to the queue
            await SendMessageAsync();
    
            // send a batch of messages to the queue
            await SendMessageBatchAsync();
        }
    
  7. 运行该应用程序。Run the application. 你应该会看到以下消息。You should see the following messages.

    Sent a single message to the queue: myqueue
    Sent a batch of messages to the queue: myqueue
    
  8. 在 Azure 门户中按照以下步骤操作:In the Azure portal, follow these steps:

    1. 导航到服务总线命名空间。Navigate to your Service Bus namespace.
    2. 在“概述”页上,在底部中间的窗格中选择队列。On the Overview page, select the queue in the bottom-middle pane.
    3. 请注意“必备”部分中的值。Notice the values in the Essentials section.

    收到的消息,包含计数和大小

    请注意以下值:Notice the following values:

    • 队列的“活动消息计数”值现在为 4 。The Active message count value for the queue is now 4. 每次运行此发件人应用而不检索消息时,该值会增加 4。Each time you run this sender app without retrieving the messages, this value increases by 4.
    • 每次该应用将消息添加到队列,队列的当前大小就会递增,增量为“基本信息”中的“当前”值 。The current size of the queue increments the CURRENT value in Essentials each time the app adds messages to the queue.
    • 在“消息”图表中的底部“指标”部分中,可以看到队列有四条传入的消息 。In the Messages chart in the bottom Metrics section, you can see that there are four incoming messages for the queue.

从队列接收消息Receive messages from a queue

在本部分中,你将添加从队列检索消息的代码。In this section, you'll add code to retrieve messages from the queue.

  1. 将以下方法添加到处理消息和任何错误的 Program 类中。Add the following methods to the Program class that handle messages and any errors.

        // handle received messages
        static async Task MessageHandler(ProcessMessageEventArgs args)
        {
            string body = args.Message.Body.ToString();
            Console.WriteLine($"Received: {body}");
    
            // complete the message. messages is deleted from the queue. 
            await args.CompleteMessageAsync(args.Message);
        }
    
        // handle any errors when receiving messages
        static Task ErrorHandler(ProcessErrorEventArgs args)
        {
            Console.WriteLine(args.Exception.ToString());
            return Task.CompletedTask;
        }
    
  2. 将名为 ReceiveMessagesAsync 的方法添加到 Program 类中,并添加以下代码来接收消息。Add a method named ReceiveMessagesAsync to the Program class, and add the following code to receive messages.

        static async Task ReceiveMessagesAsync()
        {
            await using (ServiceBusClient client = new ServiceBusClient(connectionString))
            {
                // create a processor that we can use to process the messages
                ServiceBusProcessor processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
    
                // add handler to process messages
                processor.ProcessMessageAsync += MessageHandler;
    
                // add handler to process any errors
                processor.ProcessErrorAsync += ErrorHandler;
    
                // start processing 
                await processor.StartProcessingAsync();
    
                Console.WriteLine("Wait for a minute and then press any key to end the processing");
                Console.ReadKey();
    
                // stop processing 
                Console.WriteLine("\nStopping the receiver...");
                await processor.StopProcessingAsync();
                Console.WriteLine("Stopped receiving messages");
            }
        }
    
  3. ReceiveMessagesAsync 方法中添加对 Main 方法的调用。Add a call to ReceiveMessagesAsync method from the Main method. 如果只想对接收消息进行测试,请注释禁止 SendMessagesAsync 方法。Comment out the SendMessagesAsync method if you want to test only receiving of messages. 如果不这样做,则会看到发送到队列的其他四条消息。If you don't, you see another four messages sent to the queue.

        static async Task Main()
        {
            // send a message to the queue
            await SendMessageAsync();
    
            // send a batch of messages to the queue
            await SendMessageBatchAsync();
    
            // receive message from the queue
            await ReceiveMessagesAsync();
        }
    

运行应用Run the app

运行该应用程序。Run the application. 等待一分钟,然后按任意键停止接收消息。Wait for a minute and then press any key to stop receiving messages. 你应该会看到以下输出(密钥的空格)。You should see the following output (spacebar for the key).

Sent a single message to the queue: myqueue
Sent a batch of messages to the queue: myqueue
Wait for a minute and then press any key to end the processing
Received: Hello world!
Received: First message in the batch
Received: Second message in the batch
Received: Third message in the batch
Received: Hello world!
Received: First message in the batch
Received: Second message in the batch
Received: Third message in the batch

Stopping the receiver...
Stopped receiving messages

再次检查门户。Check the portal again.

  • “活动消息计数”和“当前”值现在为 0The Active message count and CURRENT values are now 0.

  • 在“消息”图表中的底部“指标”部分中,可以看到队列有八条传入消息和八条传出消息 。In the Messages chart in the bottom Metrics section, you can see that there are eight incoming messages and eight outgoing messages for the queue.

    接收后的活动消息和大小

后续步骤Next steps

请参阅以下文档和示例:See the following documentation and samples: