快速入门 - 向/从 Azure 服务总线队列 (.NET) 发送/接收消息

在本快速入门中,你将执行以下步骤:

  1. 使用 Azure 门户创建服务总线命名空间。

  2. 使用 Azure 门户创建服务总线队列。

  3. 编写 .NET 控制台应用程序,向队列发送一组消息。

  4. 编写 .NET 控制台应用程序,从队列接收这些消息。

    本快速入门分步介绍了如何实施一个简单方案,将一批消息发送到服务总线队列并接收这些消息。 有关 .NET 客户端库的概述,请参阅适用于 .NET 的 Azure 服务总线客户端库。 有关更多示例,请参阅 GitHub 上的服务总线 .NET 示例

先决条件

如果你是首次使用该服务,请在使用本快速入门之前先参阅服务总线概述

  • Azure 订阅。 若要使用 Azure 服务(包括 Azure 服务总线),需要一个订阅。 如果没有现有的 Azure 帐户,可以注册试用订阅
  • Visual Studio 2022。 示例应用程序利用 C# 10 中引入的新功能。 你仍可使用以前的 C# 语言版本的服务总线客户端库,但语法可能会有所不同。 要使用最新语法,建议安装 .NET 6.0 或更高版本,并将语言版本设置为latest。 如果使用 Visual Studio,Visual Studio 2022 以前的版本与生成 C# 10 项目时所需的工具将不兼容。

在 Azure 门户中创建命名空间

若要开始使用 Azure 中的服务总线消息传送实体,请创建一个名称在 Azure 中唯一的命名空间。 命名空间为应用程序中的服务总线资源(例如队列和主题)提供范围容器。

创建命名空间:

  1. 登录 Azure 门户

  2. 从左上角选择浮出控件菜单,然后导航到 “所有服务 ”页

  3. 在左侧导航栏上,选择 “集成”。

  4. 向下滚动到 消息服务服务>总线 ,然后选择“ 创建”。

    显示选择“创建资源”、“集成”和“服务总线”菜单中的屏幕截图。

  5. “创建命名空间”页的“基本信息”选项卡中,执行以下步骤:

    1. 对于“订阅”,请选择要在其中创建命名空间的 Azure 订阅。

    2. 对于资源组,选择现有资源组或创建新的资源组。

    3. 输入符合以下命名约定的 命名空间名称

      • 该名称在 Azure 中必须唯一。 系统会立即检查该名称是否可用。
      • 名称长度最少为 6 个字符,最多为 50 个字符。
      • 名称只能包含字母、数字、连字符 -
      • 名称必须以字母开头,并且必须以字母或数字结尾。
      • 名称不以 -sb-mgmt 结尾。
    4. 对于 “位置”,请选择要托管命名空间的区域。

    5. 对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。

      如果选择 “高级 ”层,则可以为命名空间启用 异地复制 。 异地复制功能可确保命名空间的元数据和数据从主要区域持续复制到一个或多个次要区域。

      重要

      若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题和订阅。

      如果选择了“高级”定价层,请指定“消息传送单元”数 。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为 消息传送单元。 高级命名空间至少具有一个消息传送单元。 可为每个服务总线高级命名空间选择 1、2、4、8 或 16 个消息传送单元。 有关详细信息,请参阅 服务总线高级消息传送层

    6. 在页面底部选择“查看 + 创建”。

      显示“创建命名空间”页的屏幕截图

    7. 在“查看 + 创建”页上,查看设置,然后选择“创建” 。

  6. 部署资源成功后,在部署页上选择 “转到资源 ”。

    显示部署成功页的屏幕截图,其中显示了“转到资源”链接。

  7. 将会看到服务总线命名空间的主页。

    显示创建的服务总线命名空间主页的屏幕截图。

在 Azure 门户中创建队列

  1. 在“服务总线命名空间”页上,展开左侧导航菜单上的“实体”,然后在左侧菜单中选择“队列”

  2. 在“ 队列 ”页上的工具栏上,选择“ + 队列”。

  3. 输入队列的名称。 将其他值保留为默认值。

  4. 选择 创建

    显示“创建队列”页面的屏幕截图。

重要

如果不熟悉 Azure,你可能会感觉“连接字符串”选项更易于使用。 选择“连接字符串”选项卡,以查看本快速入门中有关使用连接字符串的说明。 建议在实际应用程序和生产环境中使用“无密码”选项

向 Azure 验证应用

本文介绍连接到 Azure 服务总线的两种方法: 无密码连接字符串

第一个选项展示如何使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到服务总线命名空间。 无需担心在代码、配置文件或安全存储(如 Azure Key Vault)中具有硬编码连接字符串。

第二个选项展示如何使用连接字符串连接到服务总线命名空间。 如果不熟悉 Azure,你可能会感觉“连接字符串”选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅 服务总线身份验证和授权。 若要详细了解无密码身份验证,请参阅 “对 .NET 应用进行身份验证”。

获取连接字符串

创建新的命名空间会自动生成具有主密钥和辅助密钥的初始共享访问签名(SAS)策略。 它创建主连接字符串和辅助连接字符串,每个连接字符串都授予对命名空间的各个方面的完全控制。 有关如何为常规发送方和接收方创建具有更多约束权限的规则的详细信息,请参阅 服务总线身份验证和授权

客户端可以使用连接字符串连接到服务总线命名空间。 若要复制命名空间的主要连接字符串,请执行以下步骤:

  1. “服务总线命名空间 ”页上的左侧菜单中,展开 “设置”,然后选择“ 共享访问策略”。

  2. 在“共享访问策略”页,选择“RootManageSharedAccessKey” 。

  3. 在“策略: RootManageSharedAccessKey”窗口中,选择“主连接字符串”旁边的“复制”按钮,将连接字符串复制到剪贴板供以后使用。 将此值粘贴到记事本或其他某个临时位置。

    屏幕截图显示了名为 RootManageSharedAccessKey 的 SAS 策略,其中包含密钥和连接字符串。

    可使用此页面复制主密钥、辅助密钥、主连接字符串和辅助连接字符串。

启动 Visual Studio

启动 Visual Studio。 如果看到“入门”窗口,请在右窗格中选择“在不使用代码的情况下继续”链接。

将消息发送到队列

本部分介绍如何创建一个向服务总线队列发送消息的 .NET 控制台应用程序。

备注

本快速入门分步介绍了如何实施一个简单方案,将一批消息发送到服务总线队列并接收这些消息。 有关其他和高级方案的更多示例,请参阅 GitHub 上的服务总线 .NET 示例

创建控制台应用程序

  1. 在 Visual Studio 中,选择“文件”-“新建”->“项目”菜单。

  2. 在“创建新项目”对话框中执行以下步骤:如果看不到此对话框,请在菜单中选择“文件”,然后依次选择“新建”、“项目”。

    1. 选择“C#”作为编程语言。

    2. 选择“控制台”作为应用程序类型。

    3. 从结果列表中选择“控制台应用”。

    4. 然后,选择“下一步” 。

      显示使用 C# 和所选的控制台创建新项目对话框的图像

  3. 输入 QueueSender 作为项目名称,输入 ServiceBusQueueQuickStart 作为解决方案名称,然后选择下一步

    在“配置你的新项目对话框”中显示解决方案和项目名称的图像

  4. 在“其他信息”页面,选择“创建”来创建解决方案和项目。

向项目添加 NuGet 包

  1. 在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”

  2. 运行以下命令安装 Azure.Messaging.ServiceBus NuGet 包:

    Install-Package Azure.Messaging.ServiceBus
    

添加将消息发送到队列的代码

  1. Program.cs 的内容替换为以下代码。 以下部分概述了重要步骤,并在代码注释中提供了其他信息。

    重要

    使用服务总线命名空间和队列的名称更新代码片段中的占位符值(<NAMESPACE-CONNECTION-STRING><QUEUE-NAME>)。

    using Azure.Messaging.ServiceBus;
    
    // the client that owns the connection and can be used to create senders and receivers
    ServiceBusClient client;
    
    // the sender used to publish messages to the queue
    ServiceBusSender sender;
    
    // number of messages to be sent to the queue
    const int numOfMessages = 3;
    
    // The Service Bus client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when messages are being published or read
    // regularly.
    //
    // set the transport type to AmqpWebSockets so that the ServiceBusClient uses the port 443. 
    // If you use the default AmqpTcp, you will need to make sure that the ports 5671 and 5672 are open
    
    // TODO: Replace the <NAMESPACE-CONNECTION-STRING> and <QUEUE-NAME> placeholders
    var clientOptions = new ServiceBusClientOptions()
    { 
        TransportType = ServiceBusTransportType.AmqpWebSockets
    };
    client = new ServiceBusClient("<NAMESPACE-CONNECTION-STRING>", clientOptions);
    sender = client.CreateSender("<QUEUE-NAME>");
    
    // create a batch 
    using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync();
    
    for (int i = 1; i <= numOfMessages; i++)
    {
        // try adding a message to the batch
        if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}")))
        {
            // if it is too large for the batch
            throw new Exception($"The message {i} is too large to fit in the batch.");
        }
    }
    
    try
    {
        // Use the producer client to send the batch of messages to the Service Bus queue
        await sender.SendMessagesAsync(messageBatch);
        Console.WriteLine($"A batch of {numOfMessages} messages has been published to the queue.");
    }
    finally
    {
        // Calling DisposeAsync on client types is required to ensure that network
        // resources and other unmanaged objects are properly cleaned up.
        await sender.DisposeAsync();
        await client.DisposeAsync();
    }
    
    Console.WriteLine("Press any key to end the application");
    Console.ReadKey();
    
  2. 生成项目并确保没有错误。

  3. 运行程序并等待出现确认消息。

    A batch of 3 messages has been published to the queue
    

    重要

    在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟才能完成。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  4. 在 Azure 门户中按照以下步骤操作:

    1. 导航到服务总线命名空间。

    2. 在“概述”页上,在底部中间的窗格中选择队列。

      按照所选队列在 Azure 门户中显示服务总线命名空间页面的图像。

    3. 请注意“设置”部分中的值

      显示接收的消息数量和队列大小的图像。

    请注意以下值:

    • 队列的活动消息计数值现在为 3。 每次运行此发件人应用而不检索消息时,该值会增加 3。
    • 每次应用向队列添加消息时,队列的当前大小都会递增。
    • 消息图表中的底部指标部分中,可以看到队列有三条传入的消息。

从队列接收消息

在本部分中,你将创建一个 .NET 控制台应用程序,用于接收来自队列的消息。

备注

本快速入门分步介绍了如何实施一个方案,将一批消息发送到服务总线队列并接收这些消息。 有关其他和高级方案的更多示例,请参阅 GitHub 上的服务总线 .NET 示例

为接收器创建项目

  1. 在“解决方案资源管理器”窗口中,右键单击 ServiceBusQueueQuickStart 解决方案,指向添加,然后选择新建项目
  2. 选择控制台应用程序,然后选择下一步
  3. 输入 QueueReceiver 作为项目名称,然后选择创建
  4. 解决方案资源管理器窗口中,右键单击 QueueReceiver,然后选择设为启动项目

向项目添加 NuGet 包

  1. 在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”

  2. 运行以下命令安装 Azure.Messaging.ServiceBus NuGet 包:

    Install-Package Azure.Messaging.ServiceBus
    

    显示在“包管理器”控制台中选择了 QueueReceiver 项目的屏幕截图。

添加从队列接收消息的代码

在本部分中,你将添加从队列检索消息的代码。

  1. Program 类中,添加以下代码:

    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    
    // the client that owns the connection and can be used to create senders and receivers
    ServiceBusClient client;
    
    // the processor that reads and processes messages from the queue
    ServiceBusProcessor processor;
    
  2. 将以下方法追加到 Program 类的末尾。

    // handle received messages
    async Task MessageHandler(ProcessMessageEventArgs args)
    {
        string body = args.Message.Body.ToString();
        Console.WriteLine($"Received: {body}");
    
        // complete the message. message is deleted from the queue. 
        await args.CompleteMessageAsync(args.Message);
    }
    
    // handle any errors when receiving messages
    Task ErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }
    
  3. 将以下代码追加到 Program 类的末尾。 以下部分概述了重要步骤,并在代码注释中提供了其他信息。

    // The Service Bus client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when messages are being published or read
    // regularly.
    //
    // Set the transport type to AmqpWebSockets so that the ServiceBusClient uses port 443. 
    // If you use the default AmqpTcp, make sure that ports 5671 and 5672 are open.
    
    // TODO: Replace the <NAMESPACE-CONNECTION-STRING> and <QUEUE-NAME> placeholders
    var clientOptions = new ServiceBusClientOptions()
    {
        TransportType = ServiceBusTransportType.AmqpWebSockets
    };
    client = new ServiceBusClient("<NAMESPACE-CONNECTION-STRING>", clientOptions);
    
    // create a processor that we can use to process the messages
    // TODO: Replace the <QUEUE-NAME> placeholder
    processor = client.CreateProcessor("<QUEUE-NAME>", new ServiceBusProcessorOptions());
    
    try
    {
        // add handler to process messages
        processor.ProcessMessageAsync += MessageHandler;
    
        // add handler to process any errors
        processor.ProcessErrorAsync += ErrorHandler;
    
        // start processing 
        await processor.StartProcessingAsync();
    
        Console.WriteLine("Wait for a minute and then press any key to end the processing");
        Console.ReadKey();
    
        // stop processing 
        Console.WriteLine("\nStopping the receiver...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Stopped receiving messages");
    }
    finally
    {
        // Calling DisposeAsync on client types is required to ensure that network
        // resources and other unmanaged objects are properly cleaned up.
        await processor.DisposeAsync();
        await client.DisposeAsync();
    }
    
  4. 完成的 Program 类应与以下代码匹配:

    using Azure.Messaging.ServiceBus;
    using System;
    using System.Threading.Tasks;
    
    // the client that owns the connection and can be used to create senders and receivers
    ServiceBusClient client;
    
    // the processor that reads and processes messages from the queue
    ServiceBusProcessor processor;
    
    // The Service Bus client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when messages are being published or read
    // regularly.
    //
    // Set the transport type to AmqpWebSockets so that the ServiceBusClient uses port 443. 
    // If you use the default AmqpTcp, make sure that ports 5671 and 5672 are open.
    
    // TODO: Replace the <NAMESPACE-CONNECTION-STRING> and <QUEUE-NAME> placeholders
    var clientOptions = new ServiceBusClientOptions()
    {
        TransportType = ServiceBusTransportType.AmqpWebSockets
    };
    client = new ServiceBusClient("<NAMESPACE-CONNECTION-STRING>", clientOptions);
    
    // create a processor that we can use to process the messages
    // TODO: Replace the <QUEUE-NAME> placeholder
    processor = client.CreateProcessor("<QUEUE-NAME>", new ServiceBusProcessorOptions());
    
    try
    {
        // add handler to process messages
        processor.ProcessMessageAsync += MessageHandler;
    
        // add handler to process any errors
        processor.ProcessErrorAsync += ErrorHandler;
    
        // start processing 
        await processor.StartProcessingAsync();
    
        Console.WriteLine("Wait for a minute and then press any key to end the processing");
        Console.ReadKey();
    
        // stop processing 
        Console.WriteLine("\nStopping the receiver...");
        await processor.StopProcessingAsync();
        Console.WriteLine("Stopped receiving messages");
    }
    finally
    {
        // Calling DisposeAsync on client types is required to ensure that network
        // resources and other unmanaged objects are properly cleaned up.
        await processor.DisposeAsync();
        await client.DisposeAsync();
    }
    
    // handle received messages
    async Task MessageHandler(ProcessMessageEventArgs args)
    {
        string body = args.Message.Body.ToString();
        Console.WriteLine($"Received: {body}");
    
        // complete the message. message is deleted from the queue. 
        await args.CompleteMessageAsync(args.Message);
    }
    
    // handle any errors when receiving messages
    Task ErrorHandler(ProcessErrorEventArgs args)
    {
        Console.WriteLine(args.Exception.ToString());
        return Task.CompletedTask;
    }
    
  5. 生成项目并确保没有错误。

  6. 运行接收器应用程序。 你应该会看到接收的消息。 按任意键来停止使用接收器和应用程序。

    Wait for a minute and then press any key to end the processing
    Received: Message 1
    Received: Message 2
    Received: Message 3
    
    Stopping the receiver...
    Stopped receiving messages
    
  7. 再次检查门户。 等待几分钟,如果未看到0活动消息的 ,请刷新页面。

    • 活动消息计数和当前大小值现为 0

    • 在“消息”图表底部的“指标”部分中,可以看到队列有三条传入消息和三条传出消息 。

      显示接收后的活动消息和大小的屏幕截图。

其他信息

请参阅以下文档和示例:

清理资源

导航到 Azure 门户中的服务总线命名空间,然后在 Azure 门户上选择“删除”以删除命名空间及其中的队列。

请参阅 Azure 服务总线主题和订阅入门 (.NET)