服务总线主题入门

本教程涵盖以下步骤:

  1. 编写 .NET Core 控制台应用程序,向主题发送一组消息。
  2. 编写 .NET Core 控制台应用程序,从订阅接收这些消息。

先决条件

  1. Azure 订阅。 若要完成本教程,需要一个 Azure 帐户。 可以激活你的 Visual Studio 或 MSDN 订阅者权益或者注册试用订阅
  2. 按照快速入门:使用 Azure 门户创建服务总线主题和主题的订阅来执行以下任务:
    1. 创建一个服务总线命名空间
    2. 获取连接字符串
    3. 在此命名空间中创建一个主题
    4. 在此命名空间中创建对此主题的一个订阅
  3. Visual Studio 2017 Update 3(版本 15.3 (26730.01))或更高版本。
  4. NET Core SDK 2.0 或更高版本。

将消息发送到主题

为了将消息发送到主题中,请使用 Visual Studio 编写一个 C# 控制台应用程序。

创建控制台应用程序

启动 Visual Studio 并创建新的控制台应用 (.NET Core) 项目。

添加服务总线 NuGet 包

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。

  2. 单击“浏览”选项卡,搜索 Microsoft.Azure.ServiceBus,然后选择“Microsoft.Azure.ServiceBus” 项。 单击“安装”以完成安装,并关闭此对话框。

    选择 NuGet 包

编写将消息发送到主题的代码

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:

    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量。 将 ServiceBusConnectionString 变量设置为在创建命名空间时获得的连接字符串,并将 TopicName 设置为在创建主题时使用的名称:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string TopicName = "<your_topic_name>";
    static ITopicClient topicClient;
    
  3. Main() 方法替换为下面的 async Main 方法,该方法使用将在下一步中添加的 SendMessagesAsync 方法异步发送消息。

    public static async Task Main(string[] args)
    {
        const int numberOfMessages = 10;
        topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after sending all the messages.");
        Console.WriteLine("======================================================");
    
        // Send messages.
        await SendMessagesAsync(numberOfMessages);
    
        Console.ReadKey();
    
        await topicClient.CloseAsync();
    }
    
  4. Main 方法后直接添加以下 SendMessagesAsync() 方法,以便执行发送 numberOfMessagesToSend 所指定的消息数(当前设置为 10)的工作:

    static async Task SendMessagesAsync(int numberOfMessagesToSend)
    {
        try
        {
            for (var i = 0; i < numberOfMessagesToSend; i++)
            {
                // Create a new message to send to the topic.
                string messageBody = $"Message {i}";
                var message = new Message(Encoding.UTF8.GetBytes(messageBody));
    
                // Write the body of the message to the console.
                Console.WriteLine($"Sending message: {messageBody}");
    
                // Send the message to the topic.
                await topicClient.SendAsync(message);
            }
        }
        catch (Exception exception)
        {
            Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
        }
    }
    
  5. 发件人 Program.cs 文件的内容应如下所示。

    namespace CoreSenderApp
    {
        using System;
        using System.Text;
        using System.Threading;
        using System.Threading.Tasks;
        using Microsoft.Azure.ServiceBus;
    
        class Program
        {
            const string ServiceBusConnectionString = "<your_connection_string>";
            const string TopicName = "<your_topic_name>";
            static ITopicClient topicClient;
    
            public static async Task Main(string[] args)
            {
                const int numberOfMessages = 10;
                topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after sending all the messages.");
                Console.WriteLine("======================================================");
    
                // Send messages.
                await SendMessagesAsync(numberOfMessages);
    
                Console.ReadKey();
    
                await topicClient.CloseAsync();
            }
    
            static async Task SendMessagesAsync(int numberOfMessagesToSend)
            {
                try
                {
                    for (var i = 0; i < numberOfMessagesToSend; i++)
                    {
                        // Create a new message to send to the topic
                        string messageBody = $"Message {i}";
                        var message = new Message(Encoding.UTF8.GetBytes(messageBody));
    
                        // Write the body of the message to the console
                        Console.WriteLine($"Sending message: {messageBody}");
    
                        // Send the message to the topic
                        await topicClient.SendAsync(message);
                    }
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
                }
            }
        }
    }
    
  6. 运行该程序并检查 Azure 门户:在命名空间“概览”窗口中单击主题的名称。 此时会显示主题的“基本信息”屏幕。 请注意,在列在窗口底部附近的订阅中,订阅的“消息计数”值现在应该为 10。 每次运行发件人应用程序而没有检索消息(如下一部分所述)时,该值会增加 10。 另请注意,每次该应用将消息添加到主题,主题的当前大小就会递增,增量为“基本信息”窗口中的“当前”值。

    消息大小

从订阅接收消息

若要接收你发送的消息,请创建另一个 .NET Core 控制台应用程序并安装 Microsoft.Azure.ServiceBus NuGet 包,类似于前面的发件人应用程序。

编写从订阅接收消息的代码

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:

    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量。 将 ServiceBusConnectionString 变量设置为在创建命名空间时获得的连接字符串,将 TopicName 设置为在创建主题时使用的名称,将 SubscriptionName 设置为在创建主题的订阅时使用的名称:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string TopicName = "<your_topic_name>";
    const string SubscriptionName = "<your_subscription_name>";
    static ISubscriptionClient subscriptionClient;
    
  3. Main() 方法替换为以下 async Main 方法。 它调用将在下一步中添加的 RegisterOnMessageHandlerAndReceiveMessages() 方法。

    public static async Task Main(string[] args)
    {    
        subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
        Console.WriteLine("======================================================");
    
        // Register subscription message handler and receive messages in a loop
        RegisterOnMessageHandlerAndReceiveMessages();
    
        Console.ReadKey();
    
        await subscriptionClient.CloseAsync();    
    }
    
  4. Main() 方法后直接添加以下方法,以便注册消息处理程序并接收发件人应用程序发送的消息:

    static void RegisterOnMessageHandlerAndReceiveMessages()
    {
        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
    
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };
    
        // Register the function that processes messages.
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }
    
  5. 直接在前面的方法后添加以下 ProcessMessagesAsync() 方法,以便处理接收的消息:

    static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Process the message.
        Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    
        // Complete the message so that it is not received again.
        // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
    
        // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
        // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
        // to avoid unnecessary exceptions.
    }
    
  6. 最后,添加以下方法,用于处理可能发生的任何异常:

    // Use this handler to examine the exceptions received on the message pump.
    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        Console.WriteLine("Exception context for troubleshooting:");
        Console.WriteLine($"- Endpoint: {context.Endpoint}");
        Console.WriteLine($"- Entity Path: {context.EntityPath}");
        Console.WriteLine($"- Executing Action: {context.Action}");
        return Task.CompletedTask;
    }    
    
  7. 收件人 Program.cs 文件的内容应如下所示:

    namespace CoreReceiverApp
    {
        using System;
        using System.Text;
        using System.Threading;
        using System.Threading.Tasks;
        using Microsoft.Azure.ServiceBus;
    
        class Program
        {
            const string ServiceBusConnectionString = "<your_connection_string>";
            const string TopicName = "<your_topic_name>";
            const string SubscriptionName = "<your_subscription_name>";
            static ISubscriptionClient subscriptionClient;
    
            public static async Task Main(string[] args)
            {    
                subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
                Console.WriteLine("======================================================");
    
                // Register subscription message handler and receive messages in a loop
                RegisterOnMessageHandlerAndReceiveMessages();
    
                Console.ReadKey();
    
                await subscriptionClient.CloseAsync();    
            }
    
            static void RegisterOnMessageHandlerAndReceiveMessages()
            {
                // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
                var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                {
                    // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
                    // Set it according to how many messages the application wants to process in parallel.
                    MaxConcurrentCalls = 1,
    
                    // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                    // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                    AutoComplete = false
                };
    
                // Register the function that processes messages.
                subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
            }
    
            static async Task ProcessMessagesAsync(Message message, CancellationToken token)
            {
                // Process the message.
                Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    
                // Complete the message so that it is not received again.
                // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
                await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
    
                // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
                // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
                // to avoid unnecessary exceptions.
            }
    
            static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
            {
                Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
                var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
                Console.WriteLine("Exception context for troubleshooting:");
                Console.WriteLine($"- Endpoint: {context.Endpoint}");
                Console.WriteLine($"- Entity Path: {context.EntityPath}");
                Console.WriteLine($"- Executing Action: {context.Action}");
                return Task.CompletedTask;
            }
        }
    }
    
  8. 运行该程序,并再次检查门户。 请注意,“消息计数”和“当前”值现在为 0

    主题长度

祝贺! 现已使用 .NET Standard 库创建主题和订阅,发送了 10 条消息,并接收到了这些消息。

备注

可以使用服务总线资源管理器管理服务总线资源。 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。

后续步骤

查看服务总线 GitHub 存储库中的示例,了解服务总线消息传送的一些更高级的功能。