服务总线主题入门Get started with Service Bus topics

本教程涵盖以下步骤:This tutorial covers the following steps:

  1. 编写 .NET Core 控制台应用程序,向主题发送一组消息。Write a .NET Core console application to send a set of messages to the topic.
  2. 编写 .NET Core 控制台应用程序,从订阅接收这些消息。Write a .NET Core console application to receive those messages from the subscription.

先决条件Prerequisites

  1. Azure 订阅。An Azure subscription. 若要完成本教程,需要一个 Azure 帐户。To complete this tutorial, you need an Azure account. 你可以激活 Visual Studio 或 MSDN 订阅者权益或者注册试用帐户You can activate your Visual Studio or MSDN subscriber benefits or sign-up for a trial account.
  2. 按照快速入门:使用 Azure 门户创建一个服务总线主题和对此主题的订阅来执行以下任务:Follow steps in the Quickstart: Use the Azure portal to create a Service Bus topic and subscriptions to the topic to do the following tasks:
    1. 创建一个服务总线命名空间Create a Service Bus namespace.
    2. 获取连接字符串Get the connection string.
    3. 在此命名空间中创建一个主题Create a topic in the namespace.
    4. 在此命名空间中创建对此主题的一个订阅Create one subscription to the topic in the namespace.
  3. Visual Studio 2017 Update 3(版本 15.3 (26730.01))或更高版本。Visual Studio 2017 Update 3 (version 15.3, 26730.01) or later.
  4. NET Core SDK 2.0 或更高版本。NET Core SDK, version 2.0 or later.

将消息发送到主题Send messages to the topic

为了将消息发送到主题中,请使用 Visual Studio 编写一个 C# 控制台应用程序。To send messages to the topic, write a C# console application using Visual Studio.

创建控制台应用程序Create a console application

启动 Visual Studio 并创建新的控制台应用 (.NET Core) 项目。Launch Visual Studio and create a new Console App (.NET Core) project.

添加服务总线 NuGet 包Add the Service Bus NuGet package

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。Right-click the newly created project and select Manage NuGet Packages.

  2. 单击“浏览”选项卡,搜索 Microsoft.Azure.ServiceBus,然后选择“Microsoft.Azure.ServiceBus”项。Click the Browse tab, search for Microsoft.Azure.ServiceBus, and then select the Microsoft.Azure.ServiceBus item. 单击“安装”以完成安装,并关闭此对话框。Click Install to complete the installation, then close this dialog box.

    选择 NuGet 包

编写将消息发送到主题的代码Write code to send messages to the topic

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:

    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量。Within the Program class, declare the following variables. ServiceBusConnectionString 变量设置为在创建命名空间时获得的连接字符串,并将 TopicName 设置为在创建主题时使用的名称:Set the ServiceBusConnectionString variable to the connection string that you obtained when creating the namespace, and set TopicName to the name that you used when creating the topic:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string TopicName = "<your_topic_name>";
    static ITopicClient topicClient;
    
  3. Main() 的默认内容替换为以下代码行:Replace the default contents of Main() with the following line of code:

    MainAsync().GetAwaiter().GetResult();
    
  4. Main() 之后直接添加以下异步 MainAsync() 方法,以调用“发送消息”方法:Directly after Main(), add the following asynchronous MainAsync() method that calls the send messages method:

    static async Task MainAsync()
    {
        const int numberOfMessages = 10;
        topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after sending all the messages.");
        Console.WriteLine("======================================================");
    
        // Send messages.
        await SendMessagesAsync(numberOfMessages);
    
        Console.ReadKey();
    
        await topicClient.CloseAsync();
    }
    
  5. MainAsync() 方法后直接添加以下 SendMessagesAsync() 方法,以便执行发送 numberOfMessagesToSend 所指定的消息数(当前设置为 10)的工作:Directly after the MainAsync() method, add the following SendMessagesAsync() method that performs the work of sending the number of messages specified by numberOfMessagesToSend (currently set to 10):

    static async Task SendMessagesAsync(int numberOfMessagesToSend)
    {
        try
        {
            for (var i = 0; i < numberOfMessagesToSend; i++)
            {
                // Create a new message to send to the topic.
                string messageBody = $"Message {i}";
                var message = new Message(Encoding.UTF8.GetBytes(messageBody));
    
                // Write the body of the message to the console.
                Console.WriteLine($"Sending message: {messageBody}");
    
                // Send the message to the topic.
                await topicClient.SendAsync(message);
            }
        }
        catch (Exception exception)
        {
            Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
        }
    }
    
  6. 发件人 Program.cs 文件的内容应如下所示。Here is what your sender Program.cs file should look like.

    namespace CoreSenderApp
    {
        using System;
        using System.Text;
        using System.Threading;
        using System.Threading.Tasks;
        using Microsoft.Azure.ServiceBus;
    
        class Program
        {
            const string ServiceBusConnectionString = "<your_connection_string>";
            const string TopicName = "<your_topic_name>";
            static ITopicClient topicClient;
    
            static void Main(string[] args)
            {
                MainAsync().GetAwaiter().GetResult();
            }
    
            static async Task MainAsync()
            {
                const int numberOfMessages = 10;
                topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after sending all the messages.");
                Console.WriteLine("======================================================");
    
                // Send messages.
                await SendMessagesAsync(numberOfMessages);
    
                Console.ReadKey();
    
                await topicClient.CloseAsync();
            }
    
            static async Task SendMessagesAsync(int numberOfMessagesToSend)
            {
                try
                {
                    for (var i = 0; i < numberOfMessagesToSend; i++)
                    {
                        // Create a new message to send to the topic
                        string messageBody = $"Message {i}";
                        var message = new Message(Encoding.UTF8.GetBytes(messageBody));
    
                        // Write the body of the message to the console
                        Console.WriteLine($"Sending message: {messageBody}");
    
                        // Send the message to the topic
                        await topicClient.SendAsync(message);
                    }
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
                }
            }
        }
    }
    
  7. 运行该程序并检查 Azure 门户:在命名空间“概览”窗口中单击主题的名称。Run the program, and check the Azure portal: click the name of your topic in the namespace Overview window. 此时会显示主题的“基本信息”屏幕。The topic Essentials screen is displayed. 请注意,在列在窗口底部附近的订阅中,订阅的“消息计数”值现在应该为 10In the subscription listed near the bottom of the window, notice that the Message Count value for the subscription is now 10. 每次运行发件人应用程序而没有检索消息(如下一部分所述)时,该值会增加 10。Each time you run the sender application without retrieving the messages (as described in the next section), this value increases by 10. 另请注意,每次该应用将消息添加到主题,主题的当前大小就会递增,增量为“基本信息”窗口中的“当前”值。Also note that the current size of the topic increments the Current value in the Essentials window each time the app adds messages to the topic.

    消息大小

从订阅接收消息Receive messages from the subscription

若要接收你发送的消息,请创建另一个 .NET Core 控制台应用程序并安装 Microsoft.Azure.ServiceBus NuGet 包,类似于前面的发件人应用程序。To receive the messages you sent, create another .NET Core console application and install the Microsoft.Azure.ServiceBus NuGet package, similar to the previous sender application.

编写从订阅接收消息的代码Write code to receive messages from the subscription

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:In Program.cs, add the following using statements at the top of the namespace definition, before the class declaration:

    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量。Within the Program class, declare the following variables. ServiceBusConnectionString 变量设置为在创建命名空间时获得的连接字符串,将 TopicName 设置为在创建主题时使用的名称,将 SubscriptionName 设置为在创建主题的订阅时使用的名称:Set the ServiceBusConnectionString variable to the connection string that you obtained when creating the namespace, set TopicName to the name that you used when creating the topic, and set SubscriptionName to the name that you used when creating the subscription to the topic:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string TopicName = "<your_topic_name>";
    const string SubscriptionName = "<your_subscription_name>";
    static ISubscriptionClient subscriptionClient;
    
  3. Main() 的默认内容替换为以下代码行:Replace the default contents of Main() with the following line of code:

    MainAsync().GetAwaiter().GetResult();
    
  4. Main() 之后直接添加以下异步 MainAsync() 方法,以调用 RegisterOnMessageHandlerAndReceiveMessages() 方法:Directly after Main(), add the following asynchronous MainAsync() method that calls the RegisterOnMessageHandlerAndReceiveMessages() method:

    static async Task MainAsync()
    {
        subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
        Console.WriteLine("======================================================");
    
        // Register subscription message handler and receive messages in a loop
        RegisterOnMessageHandlerAndReceiveMessages();
    
        Console.ReadKey();
    
        await subscriptionClient.CloseAsync();
    }
    
  5. MainAsync() 方法后直接添加以下方法,以便注册消息处理程序并接收发件人应用程序发送的消息:Directly after the MainAsync() method, add the following method that registers the message handler and receives the messages sent by the sender application:

    static void RegisterOnMessageHandlerAndReceiveMessages()
    {
        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
    
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };
    
        // Register the function that processes messages.
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }
    
  6. 直接在前面的方法后添加以下 ProcessMessagesAsync() 方法,以便处理接收的消息:Directly after the previous method, add the following ProcessMessagesAsync() method to process the received messages:

    static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Process the message.
        Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    
        // Complete the message so that it is not received again.
        // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
    
        // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
        // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
        // to avoid unnecessary exceptions.
    }
    
  7. 最后,添加以下方法,用于处理可能发生的任何异常:Finally, add the following method to handle any exceptions that might occur:

    // Use this handler to examine the exceptions received on the message pump.
    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        Console.WriteLine("Exception context for troubleshooting:");
        Console.WriteLine($"- Endpoint: {context.Endpoint}");
        Console.WriteLine($"- Entity Path: {context.EntityPath}");
        Console.WriteLine($"- Executing Action: {context.Action}");
        return Task.CompletedTask;
    }    
    
  8. 收件人 Program.cs 文件的内容应如下所示:Here is what your receiver Program.cs file should look like:

    namespace CoreReceiverApp
    {
        using System;
        using System.Text;
        using System.Threading;
        using System.Threading.Tasks;
        using Microsoft.Azure.ServiceBus;
    
        class Program
        {
            const string ServiceBusConnectionString = "<your_connection_string>";
            const string TopicName = "<your_topic_name>";
            const string SubscriptionName = "<your_subscription_name>";
            static ISubscriptionClient subscriptionClient;
    
            static void Main(string[] args)
            {
                MainAsync().GetAwaiter().GetResult();
            }
    
            static async Task MainAsync()
            {
                subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
                Console.WriteLine("======================================================");
    
                // Register subscription message handler and receive messages in a loop.
                RegisterOnMessageHandlerAndReceiveMessages();
    
                Console.ReadKey();
    
                await subscriptionClient.CloseAsync();
            }
    
            static void RegisterOnMessageHandlerAndReceiveMessages()
            {
                // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
                var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                {
                    // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
                    // Set it according to how many messages the application wants to process in parallel.
                    MaxConcurrentCalls = 1,
    
                    // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                    // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                    AutoComplete = false
                };
    
                // Register the function that processes messages.
                subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
            }
    
            static async Task ProcessMessagesAsync(Message message, CancellationToken token)
            {
                // Process the message.
                Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    
                // Complete the message so that it is not received again.
                // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
                await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
    
                // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
                // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
                // to avoid unnecessary exceptions.
            }
    
            static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
            {
                Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
                var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
                Console.WriteLine("Exception context for troubleshooting:");
                Console.WriteLine($"- Endpoint: {context.Endpoint}");
                Console.WriteLine($"- Entity Path: {context.EntityPath}");
                Console.WriteLine($"- Executing Action: {context.Action}");
                return Task.CompletedTask;
            }
        }
    }
    
  9. 运行该程序,并再次检查门户。Run the program, and check the portal again. 请注意,“消息计数”和“当前”值现在为 0Notice that the Message Count and Current values are now 0.

    主题长度

祝贺你!Congratulations! 现已使用 .NET Standard 库创建主题和订阅,发送了 10 条消息,并接收到了这些消息。Using the .NET Standard library, you have now created a topic and subscription, sent 10 messages, and received those messages.

后续步骤Next steps

查看服务总线 GitHub 存储库中的示例,了解服务总线消息传送的一些更高级的功能。Check out the Service Bus GitHub repository with samples that demonstrate some of the more advanced features of Service Bus messaging.