服务总线队列入门Get started with Service Bus queues

在本教程中,你将创建 .NET Core 控制台应用程序来向服务总线队列发送消息以及从中接收消息。In this tutorial, you create .NET Core console applications to send messages to and receive messages from a Service Bus queue.

先决条件Prerequisites

将消息发送到队列Send messages to the queue

为了将消息发送到队列中,请使用 Visual Studio 编写一个 C# 控制台应用程序。To send messages to the queue, write a C# console application using Visual Studio.

创建控制台应用程序Create a console application

启动 Visual Studio 并创建新的用于 C# 的控制台应用 (.NET Core) 项目。Launch Visual Studio and create a new Console App (.NET Core) project for C#. 此示例将应用命名为 CoreSenderAppThis example names the app CoreSenderApp.

添加服务总线 NuGet 包Add the Service Bus NuGet package

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.

  2. 选择“浏览”。Select Browse. 搜索并选择 Microsoft.Azure.ServiceBusSearch for and select Microsoft.Azure.ServiceBus.

  3. 选择“安装”以完成安装,然后关闭“NuGet 包管理器”。Select Install to complete the installation, then close the NuGet Package Manager.

    选择 NuGet 包

编写将消息发送到队列的代码Write code to send messages to the queue

  1. Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:

    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量:In the Program class, declare the following variables:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string QueueName = "<your_queue_name>";
    static IQueueClient queueClient;
    

    ServiceBusConnectionString 变量的形式输入命名空间的连接字符串。Enter your connection string for the namespace as the ServiceBusConnectionString variable. 输入队列名称。Enter your queue name.

  3. Main() 方法替换为以下 async Main 方法。Replace the Main() method with the following async Main method. 它调用 SendMessagesAsync() 方法(将在下一步中添加),以将消息发送到队列。It calls the SendMessagesAsync() method that you will add in the next step to send messages to the queue.

    public static async Task Main(string[] args)
    {    
        const int numberOfMessages = 10;
        queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after sending all the messages.");
        Console.WriteLine("======================================================");
    
        // Send messages.
        await SendMessagesAsync(numberOfMessages);
    
        Console.ReadKey();
    
        await queueClient.CloseAsync();
    }
    
  4. MainAsync() 方法后直接添加以下 SendMessagesAsync() 方法,以便执行发送 numberOfMessagesToSend 所指定的消息数(当前设置为 10)的工作:Directly after the MainAsync() method, add the following SendMessagesAsync() method that does the work of sending the number of messages specified by numberOfMessagesToSend (currently set to 10):

    static async Task SendMessagesAsync(int numberOfMessagesToSend)
    {
        try
        {
            for (var i = 0; i < numberOfMessagesToSend; i++)
            {
                // Create a new message to send to the queue.
                string messageBody = $"Message {i}";
                var message = new Message(Encoding.UTF8.GetBytes(messageBody));
    
                // Write the body of the message to the console.
                Console.WriteLine($"Sending message: {messageBody}");
    
                // Send the message to the queue.
                await queueClient.SendAsync(message);
            }
        }
        catch (Exception exception)
        {
            Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
        }
    }
    

Program.cs 文件的内容如下所示。Here is what your Program.cs file should look like.

namespace CoreSenderApp
{
    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;

    class Program
    {
        // Connection String for the namespace can be obtained from the Azure portal under the 
        // 'Shared Access policies' section.
        const string ServiceBusConnectionString = "<your_connection_string>";
        const string QueueName = "<your_queue_name>";
        static IQueueClient queueClient;

        public static async Task Main(string[] args)
        {    
            const int numberOfMessages = 10;
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
    
            Console.WriteLine("======================================================");
            Console.WriteLine("Press ENTER key to exit after sending all the messages.");
            Console.WriteLine("======================================================");
    
            // Send messages.
            await SendMessagesAsync(numberOfMessages);
    
            Console.ReadKey();
    
            await queueClient.CloseAsync();
        }

        static async Task SendMessagesAsync(int numberOfMessagesToSend)
        {
            try
            {
                for (var i = 0; i < numberOfMessagesToSend; i++)
                {
                    // Create a new message to send to the queue
                    string messageBody = $"Message {i}";
                    var message = new Message(Encoding.UTF8.GetBytes(messageBody));

                    // Write the body of the message to the console
                    Console.WriteLine($"Sending message: {messageBody}");

                    // Send the message to the queue
                    await queueClient.SendAsync(message);
                }
            }
            catch (Exception exception)
            {
                Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
            }
        }
    }
}

运行该程序,并检查 Azure 门户。Run the program and check the Azure portal.

选择命名空间“概览”窗口中的队列名称,以便显示队列基本信息Select the name of your queue in the namespace Overview window to display queue Essentials.

收到的消息,包含计数和大小

队列的“活动消息计数”值现在为 10The Active message count value for the queue is now 10. 每次运行此发件人应用而没有检索消息时,该值会增加 10。Each time you run this sender app without retrieving the messages, this value increases by 10.

每次该应用将消息添加到队列,队列的当前大小就会递增,增量为“基本信息”中的“当前”值。 The current size of the queue increments the CURRENT value in Essentials each time the app adds messages to the queue.

下一部分介绍如何检索这些消息。The next section describes how to retrieve these messages.

从队列接收消息Receive messages from the queue

若要接收发送的消息,请创建另一控制台应用 (.NET Core) 应用程序。To receive the messages you sent, create another Console App (.NET Core) application. 安装 Microsoft.Azure.ServiceBus NuGet 包,就像为发件人应用程序所做的那样。Install the Microsoft.Azure.ServiceBus NuGet package, as you did for the sender application.

编写从队列接收消息的代码Write code to receive messages from the queue

  1. Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:

    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量:In the Program class, declare the following variables:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string QueueName = "<your_queue_name>";
    static IQueueClient queueClient;
    

    ServiceBusConnectionString 变量的形式输入命名空间的连接字符串。Enter your connection string for the namespace as the ServiceBusConnectionString variable. 输入队列名称。Enter your queue name.

  3. Main() 方法替换为以下代码:Replace the Main() method with the following code:

    static void Main(string[] args)
    {
        MainAsync().GetAwaiter().GetResult();
    }
    
    static async Task MainAsync()
    {
        queueClient = new QueueClient(ServiceBusConnectionString, QueueName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
        Console.WriteLine("======================================================");
    
        // Register QueueClient's MessageHandler and receive messages in a loop
        RegisterOnMessageHandlerAndReceiveMessages();
    
        Console.ReadKey();
    
        await queueClient.CloseAsync();
    }
    
  4. MainAsync() 方法后直接添加以下方法,以便注册消息处理程序并接收发件人应用程序发送的消息:Directly after the MainAsync() method, add the following method, which registers the message handler and receives the messages sent by the sender application:

    static void RegisterOnMessageHandlerAndReceiveMessages()
    {
        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
    
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };
    
        // Register the function that processes messages.
        queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }
    
  5. 直接在前面的方法后添加以下 ProcessMessagesAsync() 方法,以便处理接收的消息:Directly after the previous method, add the following ProcessMessagesAsync() method to process the received messages:

    static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Process the message.
        Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    
        // Complete the message so that it is not received again.
        // This can be done only if the queue Client is created in ReceiveMode.PeekLock mode (which is the default).
        await queueClient.CompleteAsync(message.SystemProperties.LockToken);
    
        // Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
        // If queueClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
        // to avoid unnecessary exceptions.
    }
    
  6. 最后,添加以下方法,用于处理可能发生的任何异常:Finally, add the following method to handle any exceptions that might occur:

    // Use this handler to examine the exceptions received on the message pump.
    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        Console.WriteLine("Exception context for troubleshooting:");
        Console.WriteLine($"- Endpoint: {context.Endpoint}");
        Console.WriteLine($"- Entity Path: {context.EntityPath}");
        Console.WriteLine($"- Executing Action: {context.Action}");
        return Task.CompletedTask;
    }
    

Program.cs 文件的内容如下所示:Here is what your Program.cs file should look like:

namespace CoreReceiverApp
{
    using System;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;

    class Program
    {
        // Connection String for the namespace can be obtained from the Azure portal under the 
        // 'Shared Access policies' section.
        const string ServiceBusConnectionString = "<your_connection_string>";
        const string QueueName = "<your_queue_name>";
        static IQueueClient queueClient;

        static void Main(string[] args)
        {
            MainAsync().GetAwaiter().GetResult();
        }

        static async Task MainAsync()
        {
            queueClient = new QueueClient(ServiceBusConnectionString, QueueName);

            Console.WriteLine("======================================================");
            Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
            Console.WriteLine("======================================================");

            // Register QueueClient's MessageHandler and receive messages in a loop
            RegisterOnMessageHandlerAndReceiveMessages();
 
            Console.ReadKey();

            await queueClient.CloseAsync();
        }

        static void RegisterOnMessageHandlerAndReceiveMessages()
        {
            // Configure the MessageHandler Options in terms of exception handling, number of concurrent messages to deliver etc.
            var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
            {
                // Maximum number of Concurrent calls to the callback `ProcessMessagesAsync`, set to 1 for simplicity.
                // Set it according to how many messages the application wants to process in parallel.
                MaxConcurrentCalls = 1,

                // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                AutoComplete = false
            };

            // Register the function that will process messages
            queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
        }

        static async Task ProcessMessagesAsync(Message message, CancellationToken token)
        {
            // Process the message
            Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");

            // Complete the message so that it is not received again.
            // This can be done only if the queueClient is created in ReceiveMode.PeekLock mode (which is default).
            await queueClient.CompleteAsync(message.SystemProperties.LockToken);

            // Note: Use the cancellationToken passed as necessary to determine if the queueClient has already been closed.
            // If queueClient has already been Closed, you may chose to not call CompleteAsync() or AbandonAsync() etc. calls 
            // to avoid unnecessary exceptions.
        }

        static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
        {
            Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
            var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
            Console.WriteLine("Exception context for troubleshooting:");
            Console.WriteLine($"- Endpoint: {context.Endpoint}");
            Console.WriteLine($"- Entity Path: {context.EntityPath}");
            Console.WriteLine($"- Executing Action: {context.Action}");
            return Task.CompletedTask;
        }
    }
}

运行该程序,并再次检查门户。Run the program, and check the portal again. “活动消息计数”和“当前”值现在为 0The Active message count and CURRENT values are now 0.

收到消息后的队列

祝贺!Congratulations! 你现在已创建队列、将一组消息发送到该队列,以及从该队列接收这些消息。You've now created a queue, sent a set of messages to that queue, and received those messages from the same queue.

Note

可以使用服务总线资源管理器管理服务总线资源。You can manage Service Bus resources with Service Bus Explorer. 服务总线资源管理器允许用户轻松连接到服务总线命名空间并管理消息传送实体。The Service Bus Explorer allows users to easily connect to a Service Bus namespace and administer messaging entities. 该工具提供高级功能,例如导入/导出功能,或者用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。The tool provides advanced features like import/export functionality or the ability to test topics, queues, subscriptions, relay services, notification hubs, and event hubs.

后续步骤Next steps

查看 GitHub 存储库中的示例,了解服务总线消息传送的一些更高级的功能。Check out our GitHub repository with samples that demonstrate some of the more advanced features of Service Bus messaging.