服务总线主题入门

本教程涵盖以下步骤:

  1. 使用 Azure 门户创建服务总线命名空间。
  2. 使用 Azure 门户创建服务总线主题。
  3. 使用 Azure 门户创建该主题的服务总线订阅。
  4. 编写 .NET Core 控制台应用程序,向主题发送一组消息。
  5. 编写 .NET Core 控制台应用程序,从订阅接收这些消息。

先决条件

  1. Visual Studio 2017 Update 3(版本 15.3 (26730.01))或更高版本。
  2. NET Core SDK 2.0 或更高版本。
  3. Azure 订阅。

Note

若要完成本教程,你需要一个 Azure 帐户。 你可以注册试用版

1.使用 Azure 门户创建命名空间

Note

也可使用 PowerShell 创建服务总线命名空间和消息实体。 有关详细信息,请参阅使用 PowerShell 管理服务总线资源

如果已创建服务总线消息传递命名空间,请跳转到使用 Azure 门户创建主题部分。

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。

创建命名空间:

  1. 登录到 Azure 门户
  2. 在门户的左侧导航窗格中,依次单击“+”新建,搜索“service bus"。

  3. 在“创建命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  4. 在确保命名空间名称可用后,选择定价层(基础版或标准版)。

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。

  6. 在“资源组” 字段中,选择用于放置该命名空间的现有资源组,或者创建一个新资源组。

  7. 在“位置” 中,选择应在其中托管该命名空间的国家或地区。

    创建命名空间

  8. 单击“创建” 。 系统现已创建命名空间并已将其启用。 可能需要等待几分钟,因为系统会为你的帐户配置资源。

获取管理凭据

创建新的命名空间时,会自动生成一项初始的共享访问签名 (SAS) 规则,将一对主密钥和辅助密钥关联到一起,向每个密钥授予对命名空间的所有资产的完全控制权限。 请参阅服务总线身份验证和授权,了解如何创建更多的规则,对常规的发送者和接收者的权限进行更多限制。 若要复制初始规则,请执行以下步骤:

  1. 单击“所有资源”,然后单击新创建的命名空间名称。
  2. 在命名空间窗口中,单击“共享访问策略”。
  3. 在“共享访问策略”屏幕中,单击“RootManageSharedAccessKey”。

    connection-info

  4. 在“策略: RootManageSharedAccessKey”窗口中,单击“连接字符串 - 主键”旁边的复制按钮,将连接字符串复制到剪贴板供以后使用。 将此值粘贴到记事本或其他某个临时位置。

    connection-string

  5. 重复上述步骤,将主键的值复制和粘贴到临时位置,以供稍后使用。

2.使用 Azure 门户创建主题

  1. 登录到 Azure 门户
  2. 在门户左侧的导航窗格中,单击“服务总线”(如果未看到“服务总线”,请单击“更多服务”或“所有资源”)。 单击要在其中创建主题的命名空间。
  3. 此时会打开“命名空间概览”窗口。 单击“主题”:

    创建主题

  4. 单击“+ 主题”:

    选择主题

  5. 输入主题名称。 将其他选项保留默认值。

    选择“新建”

  6. 单击对话框底部的“创建”。

3.创建主题的订阅

  1. 在门户资源窗格中,依次单击在步骤 1 中创建的命名空间、“主题”、在步骤 2 中创建的主题的名称。
  2. 在概览窗格顶部单击“+ 订阅”,以便添加该主题的订阅。

    创建订阅

  3. 输入订阅的名称。 将其他选项保留默认值。

4.将消息发送到主题

为了将消息发送到主题中,请使用 Visual Studio 编写一个 C# 控制台应用程序。

创建控制台应用程序

启动 Visual Studio 并创建新的控制台应用 (.NET Core) 项目。

添加服务总线 NuGet 包

  1. 右键单击新创建的项目,并选择“管理 NuGet 包” 。
  2. 单击“浏览”选项卡,搜索 Microsoft.Azure.ServiceBus,然后选择“Microsoft.Azure.ServiceBus”项。 单击“安装”以完成安装,并关闭此对话框。

    选择 NuGet 包

编写将消息发送到主题的代码

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:

    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量。 将 ServiceBusConnectionString 变量设置为在创建命名空间时获得的连接字符串,并将 TopicName 设置为在创建主题时使用的名称:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string TopicName = "<your_topic_name>";
    static ITopicClient topicClient;
    
  3. Main() 的默认内容替换为以下代码行:

    MainAsync().GetAwaiter().GetResult();
    
  4. Main() 之后直接添加以下异步 MainAsync() 方法,以调用“发送消息”方法:

    static async Task MainAsync()
    {
        const int numberOfMessages = 10;
        topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after sending all the messages.");
        Console.WriteLine("======================================================");
    
        // Send messages.
        await SendMessagesAsync(numberOfMessages);
    
        Console.ReadKey();
    
        await topicClient.CloseAsync();
    }
    
  5. MainAsync() 方法后直接添加以下 SendMessagesAsync() 方法,以便执行发送 numberOfMessagesToSend 所指定的消息数(当前设置为 10)的工作:

    static async Task SendMessagesAsync(int numberOfMessagesToSend)
    {
        try
        {
            for (var i = 0; i < numberOfMessagesToSend; i++)
            {
                // Create a new message to send to the topic.
                string messageBody = $"Message {i}";
                var message = new Message(Encoding.UTF8.GetBytes(messageBody));
    
                // Write the body of the message to the console.
                Console.WriteLine($"Sending message: {messageBody}");
    
                // Send the message to the topic.
                await topicClient.SendAsync(message);
            }
        }
        catch (Exception exception)
        {
            Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
        }
    }
    
  6. 发件人 Program.cs 文件的内容应如下所示。

    namespace CoreSenderApp
    {
        using System;
        using System.Text;
        using System.Threading;
        using System.Threading.Tasks;
        using Microsoft.Azure.ServiceBus;
    
        class Program
        {
            const string ServiceBusConnectionString = "<your_connection_string>";
            const string TopicName = "<your_topic_name>";
            static ITopicClient topicClient;
    
            static void Main(string[] args)
            {
                MainAsync().GetAwaiter().GetResult();
            }
    
            static async Task MainAsync()
            {
                const int numberOfMessages = 10;
                topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after sending all the messages.");
                Console.WriteLine("======================================================");
    
                // Send messages.
                await SendMessagesAsync(numberOfMessages);
    
                Console.ReadKey();
    
                await topicClient.CloseAsync();
            }
    
            static async Task SendMessagesAsync(int numberOfMessagesToSend)
            {
                try
                {
                    for (var i = 0; i < numberOfMessagesToSend; i++)
                    {
                        // Create a new message to send to the topic
                        string messageBody = $"Message {i}";
                        var message = new Message(Encoding.UTF8.GetBytes(messageBody));
    
                        // Write the body of the message to the console
                        Console.WriteLine($"Sending message: {messageBody}");
    
                        // Send the message to the topic
                        await topicClient.SendAsync(message);
                    }
                }
                catch (Exception exception)
                {
                    Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
                }
            }
        }
    }
    
  7. 运行该程序并检查 Azure 门户:在命名空间“概览”窗口中单击主题的名称。 此时会显示主题的“基本信息”屏幕。 请注意,在列在窗口底部附近的订阅中,订阅的“消息计数”值现在应该为 10。 每次运行发件人应用程序而没有检索消息(如下一部分所述)时,该值会增加 10。 另请注意,每次该应用将消息添加到主题,主题的当前大小就会递增,增量为“基本信息”窗口中的“当前”值。

    消息大小

5.从订阅接收消息

若要接收刚发送的消息,请创建另一 .NET Core 控制台应用程序并安装 Microsoft.Azure.ServiceBus NuGet 包,类似于前面的发件人应用程序。

编写从订阅接收消息的代码

  1. 在 Program.cs 中将以下 using 语句添加到命名空间定义顶部,位于类声明之前:

    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using Microsoft.Azure.ServiceBus;
    
  2. Program 类中声明以下变量。 将 ServiceBusConnectionString 变量设置为在创建命名空间时获得的连接字符串,将 TopicName 设置为在创建主题时使用的名称,将 SubscriptionName 设置为在创建主题的订阅时使用的名称:

    const string ServiceBusConnectionString = "<your_connection_string>";
    const string TopicName = "<your_topic_name>";
    const string SubscriptionName = "<your_subscription_name>";
    static ISubscriptionClient subscriptionClient;
    
  3. Main() 的默认内容替换为以下代码行:

    MainAsync().GetAwaiter().GetResult();
    
  4. Main() 之后直接添加以下异步 MainAsync() 方法,以调用 RegisterOnMessageHandlerAndReceiveMessages() 方法:

    static async Task MainAsync()
    {
        subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
    
        Console.WriteLine("======================================================");
        Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
        Console.WriteLine("======================================================");
    
        // Register subscription message handler and receive messages in a loop
        RegisterOnMessageHandlerAndReceiveMessages();
    
        Console.ReadKey();
    
        await subscriptionClient.CloseAsync();
    }
    
  5. MainAsync() 方法后直接添加以下方法,以便注册消息处理程序并接收发件人应用程序发送的消息:

    static void RegisterOnMessageHandlerAndReceiveMessages()
    {
        // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
        var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
        {
            // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
            // Set it according to how many messages the application wants to process in parallel.
            MaxConcurrentCalls = 1,
    
            // Indicates whether the message pump should automatically complete the messages after returning from user callback.
            // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
            AutoComplete = false
        };
    
        // Register the function that processes messages.
        subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
    }
    
  6. 直接在前面的方法后添加以下 ProcessMessagesAsync() 方法,以便处理接收的消息:

    static async Task ProcessMessagesAsync(Message message, CancellationToken token)
    {
        // Process the message.
        Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    
        // Complete the message so that it is not received again.
        // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
        await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
    
        // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
        // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
        // to avoid unnecessary exceptions.
    }
    
  7. 最后,添加以下方法,用于处理可能发生的任何异常:

    // Use this handler to examine the exceptions received on the message pump.
    static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
    {
        Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
        var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
        Console.WriteLine("Exception context for troubleshooting:");
        Console.WriteLine($"- Endpoint: {context.Endpoint}");
        Console.WriteLine($"- Entity Path: {context.EntityPath}");
        Console.WriteLine($"- Executing Action: {context.Action}");
        return Task.CompletedTask;
    }    
    
  8. 收件人 Program.cs 文件的内容应如下所示:

    namespace CoreReceiverApp
    {
        using System;
        using System.Text;
        using System.Threading;
        using System.Threading.Tasks;
        using Microsoft.Azure.ServiceBus;
    
        class Program
        {
            const string ServiceBusConnectionString = "<your_connection_string>";
            const string TopicName = "<your_topic_name>";
            const string SubscriptionName = "<your_subscription_name>";
            static ISubscriptionClient subscriptionClient;
    
            static void Main(string[] args)
            {
                MainAsync().GetAwaiter().GetResult();
            }
    
            static async Task MainAsync()
            {
                subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);
    
                Console.WriteLine("======================================================");
                Console.WriteLine("Press ENTER key to exit after receiving all the messages.");
                Console.WriteLine("======================================================");
    
                // Register subscription message handler and receive messages in a loop.
                RegisterOnMessageHandlerAndReceiveMessages();
    
                Console.ReadKey();
    
                await subscriptionClient.CloseAsync();
            }
    
            static void RegisterOnMessageHandlerAndReceiveMessages()
            {
                // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
                var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
                {
                    // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
                    // Set it according to how many messages the application wants to process in parallel.
                    MaxConcurrentCalls = 1,
    
                    // Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
                    // False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
                    AutoComplete = false
                };
    
                // Register the function that processes messages.
                subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
            }
    
            static async Task ProcessMessagesAsync(Message message, CancellationToken token)
            {
                // Process the message.
                Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    
                // Complete the message so that it is not received again.
                // This can be done only if the subscriptionClient is created in ReceiveMode.PeekLock mode (which is the default).
                await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
    
                // Note: Use the cancellationToken passed as necessary to determine if the subscriptionClient has already been closed.
                // If subscriptionClient has already been closed, you can choose to not call CompleteAsync() or AbandonAsync() etc.
                // to avoid unnecessary exceptions.
            }
    
            static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
            {
                Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
                var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
                Console.WriteLine("Exception context for troubleshooting:");
                Console.WriteLine($"- Endpoint: {context.Endpoint}");
                Console.WriteLine($"- Entity Path: {context.EntityPath}");
                Console.WriteLine($"- Executing Action: {context.Action}");
                return Task.CompletedTask;
            }
        }
    }
    
  9. 运行该程序,并再次检查门户。 请注意,“消息计数”和“当前”值现在为 0

    主题长度

祝贺! 你现在已使用 .NET Standard 库创建主题和订阅,发送了 10 条消息,并接收到了这些消息。

后续步骤

查看 GitHub 存储库中的示例,了解服务总线消息传送的一些更高级的功能。