Compartir a través de

教程:使用 Azure 门户和主题/订阅更新库存

Azure 服务总线是一种多租户云消息传送服务,可以在应用程序和服务之间发送信息。 异步操作可实现灵活的中转消息传送、结构化的先进先出 (FIFO) 消息传送以及发布/订阅功能。 有关 Azure 服务总线的详细概述,请参阅什么是服务总线?

本教程介绍如何通过 Azure 门户和 .NET 将零售库存方案中的服务总线主题和订阅与发布/订阅渠道配合使用。 此方案的一个示例是为多个零售店更新库存分类。 在此方案中,每个商店或商店组都获取适用于它们的消息来更新其分类。 本教程展示了如何使用订阅和筛选器实现此方案。 首先,创建一个包含三个订阅的主题,添加一些规则和筛选器,然后从主题和订阅发送和接收消息。

此图显示了一个发送者、一个包含三个订阅的主题和三个接收者。

在本教程中,你将了解如何执行以下操作:

  • 使用 Azure 门户创建一个服务总线主题和三个对该主题的订阅
  • 使用 .NET 代码为订阅添加筛选器
  • 创建具有不同内容的消息
  • 发送消息并验证它们是否已到达预期的订阅
  • 从订阅接收消息

先决条件

若要完成本教程,请确保做好以下准备:

  • Azure 订阅。 若要使用 Azure 服务(包括 Azure 服务总线),需要一个订阅。 如果没有 Azure 订阅,可在开始前创建一个试用版订阅
  • Visual Studio 2019 或更高版本。

服务总线主题和订阅

每个对主题的订阅都可以接收每条消息的副本。 主题在协议和语义方面与服务总线队列完全兼容。 服务总线主题支持一系列选择规则,这些规则具有筛选条件和用来设置或修改消息属性的可选操作。 规则每次匹配时,都会生成一条消息。 若要深入了解规则、筛选器和操作,请单击此链接

在 Azure 门户中创建命名空间

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。 命名空间提供了用于应用程序中的服务总线资源(队列、主题等)的范围容器。

创建命名空间:

  1. 登录 Azure 门户

  2. 导航到“所有服务”

  3. 在左侧的导航栏中,从类别列表中选择“集成”,将鼠标悬停在“服务总线”上,然后选择“服务总线”磁贴上的 按钮。+

    图像显示“创建资源”、“集成”以及菜单中的“服务总线”选择。

  4. 在“创建命名空间”页的“基本信息”标记中,执行以下步骤 :

    1. 对于“订阅”,请选择要在其中创建命名空间的 Azure 订阅。

    2. 对于“资源组”,请选择该命名空间驻留到的现有资源组,或创建一个新资源组。

    3. 输入命名空间的名称。 命名空间名称应遵循以下命名约定:

      • 该名称在 Azure 中必须唯一。 系统会立即检查该名称是否可用。
      • 名称长度最少为 6 个字符,最多为 50 个字符。
      • 名称只能包含字母、数字和连字符“-”。
      • 名称必须以字母开头,并且必须以字母或数字结尾。
      • 名称不以“-sb”或“-mgmt”结尾。
    4. 对于“位置”,请选择托管该命名空间的区域。

    5. 对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。

      重要

      若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题/订阅。

      如果选择了“高级”定价层,请指定“消息传送单元”数 。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为消息传送单元。 高级命名空间至少具有一个消息传送单元。 可为每个服务总线高级命名空间选择 1、2、4、8 或 16 个消息传送单元。 有关详细信息,请参阅服务总线高级消息传送

    6. 在页面底部选择“查看 + 创建”。

      图像显示“创建命名空间”页

    7. 在“查看 + 创建”页上,查看设置,然后选择“创建” 。

  5. 资源部署成功后,在部署页上选择“转到资源”。

    图像显示“部署成功”页,其中包括“转到资源”链接。

  6. 将会看到服务总线命名空间的主页。

    图像显示已创建的服务总线命名空间的主页。

获取到命名空间的连接字符串(Azure 门户)

创建新的命名空间会自动生成一个初始共享访问签名 (SAS) 策略,该策略包含主密钥和辅助密钥以及主要连接字符串和辅助连接字符串,每个连接字符串都授予对命名空间所有方面的完全控制权。 请参阅服务总线身份验证和授权,了解如何创建规则来对普通发送者和接收者的权限进行更多限制。

客户端可以使用连接字符串连接到服务总线命名空间。 若要复制命名空间的主要连接字符串,请执行以下步骤:

  1. 在“服务总线命名空间”页中的左侧菜单上,选择“共享访问策略” 。

  2. 在“共享访问策略”页,选择“RootManageSharedAccessKey” 。

  3. 在“策略: RootManageSharedAccessKey”窗口中,选择“主连接字符串”旁边的“复制”按钮,将连接字符串复制到剪贴板供以后使用。 将此值粘贴到记事本或其他某个临时位置。

    屏幕截图显示了名为 RootManageSharedAccessKey 的 S A S 策略,其中包含密钥和连接字符串。

    可使用此页面复制主密钥、辅助密钥、主连接字符串和辅助连接字符串。

使用 Azure 门户创建主题

  1. 在“服务总线命名空间”页面上,选择左侧菜单中的“主题”

  2. 在工具栏中选择“+ 主题”。

  3. 输入主题名称。 将其他选项保留默认值。

  4. 选择“创建”。

    “创建主题”页的屏幕截图。

创建对主题的订阅

  1. 选择在上一部分创建的主题

    “主题”页的屏幕截图,其中选择了主题。

  2. 在“服务总线主题”页面上,从左侧菜单中选择“订阅”,然后工具栏上选择“+ 订阅”

    “订阅”页的屏幕截图,其中选择了“添加订阅”按钮。

  3. 在“创建订阅”页上执行以下步骤:

    1. 对于订阅名称,输入“S1” 。

    2. 然后,选择“创建”以创建订阅。

      “创建订阅”页的屏幕截图。

  4. 重复上述步骤两次,创建名为 S2S3 的订阅。

在订阅上创建筛选规则

预配命名空间和主题/订阅并且拥有到命名空间的连接字符串后,便可以在订阅上创建筛选规则,然后发送和接收消息。 可以在此 GitHub 示例文件夹中检查代码。

发送和接收消息

若要运行代码,请执行以下步骤:

  1. 在命令提示符或 PowerShell 提示符窗口中发出以下命令,克隆服务总线 GitHub 存储库

    git clone https://github.com/Azure/azure-service-bus.git
    
  2. 导航到示例文件夹 azure-service-bus\samples\DotNet\Azure.Messaging.ServiceBus\BasicSendReceiveTutorialwithFilters

  3. 获取本教程前面部分复制到记事本的连接字符串。 此外还需要在上一部分创建的主题的名称。

  4. 在命令提示符窗口中键入以下命令:

    dotnet build
    
  5. 导航到 BasicSendReceiveTutorialwithFilters\bin\Debug\netcoreapp3.1 文件夹。

  6. 键入以下命令以运行程序。 请务必将 myConnectionString 替换为先前获得的值,将 myTopicName 替换为所创建主题的名称:

    dotnet --roll-forward Major BasicSendReceiveTutorialWithFilters.dll -ConnectionString "myConnectionString" -TopicName "myTopicName"
    
  7. 首先按控制台中的说明选择筛选器创建操作。 在创建筛选器时,其中一项操作是删除默认筛选器。 使用 PowerShell 或 CLI 时,不需删除默认筛选器,但如果是在代码中操作,则必须删除它们。 控制台命令 1 和 3 用于管理以前创建的订阅上的筛选器:

    • 执行操作 1:删除默认筛选器。

    • 执行操作 2:添加自己的筛选器。

    • 执行操作 3:本教程跳过该步骤。 此选项可以选择性地删除自己的筛选器。 这不会重新创建默认筛选器。

      显示 2 的输出

  8. 创建筛选器以后,即可发送消息。 按 4 即可观察到 10 条消息发送到主题:

    发送输出

  9. 按 5 即可观察到这些消息被接收。 如果没有返回 10 条消息,请按“m”以显示菜单,然后再次按 5。

    接收输出

清理资源

请按照以下步骤清理不再需要的资源。

  1. 在 Azure 门户中导航到自己的命名空间。
  2. 在“服务总线命名空间”页上,从命令栏中选择“删除”以删除命名空间和其中的资源(队列、主题和订阅)。

了解示例代码

此部分包含有关示例代码功能的更多详细信息。

获取连接字符串和主题

首先,此代码声明一组变量,这些变量推动程序的剩余执行操作。

string ServiceBusConnectionString;
string TopicName;

static string[] Subscriptions = { "S1", "S2", "S3" };
static IDictionary<string, string[]> SubscriptionFilters = new Dictionary<string, string[]> {
    { "S1", new[] { "StoreId IN('Store1', 'Store2', 'Store3')", "StoreId = 'Store4'"} },
    { "S2", new[] { "sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'" } },
    { "S3", new[] { "sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')" } }
};
// You can have only have one action per rule and this sample code supports only one action for the first filter, which is used to create the first rule. 
static IDictionary<string, string> SubscriptionAction = new Dictionary<string, string> {
    { "S1", "" },
    { "S2", "" },
    { "S3", "SET sys.Label = 'SalesEvent'"  }
};
static string[] Store = { "Store1", "Store2", "Store3", "Store4", "Store5", "Store6", "Store7", "Store8", "Store9", "Store10" };
static string SysField = "sys.To";
static string CustomField = "StoreId";
static int NrOfMessagesPerStore = 1; // Send at least 1.

连接字符串和主题名称通过所示的命令行参数传入,然后通过 Main() 方法读取:

static void Main(string[] args)
{
    string ServiceBusConnectionString = "";
    string TopicName = "";

    for (int i = 0; i < args.Length; i++)
    {
        if (args[i] == "-ConnectionString")
        {
            Console.WriteLine($"ConnectionString: {args[i + 1]}");
            ServiceBusConnectionString = args[i + 1]; // Alternatively enter your connection string here.
        }
        else if (args[i] == "-TopicName")
        {
            Console.WriteLine($"TopicName: {args[i + 1]}");
            TopicName = args[i + 1]; // Alternatively enter your queue name here.
        }
    }

    if (ServiceBusConnectionString != "" && TopicName != "")
    {
        Program P = StartProgram(ServiceBusConnectionString, TopicName);
        P.PresentMenu().GetAwaiter().GetResult();
    }
    else
    {
        Console.WriteLine("Specify -Connectionstring and -TopicName to execute the example.");
        Console.ReadKey();
    }
}

删除默认筛选器

创建订阅时,服务总线会为每个订阅创建默认筛选器。 该筛选器允许接收发送到主题的每条消息。 若要使用自定义筛选器,则可删除默认筛选器,如以下代码所示:

private async Task RemoveDefaultFilters()
{
    Console.WriteLine($"Starting to remove default filters.");

    try
    {
        var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
        foreach (var subscription in Subscriptions)
        {
            await client.DeleteRuleAsync(TopicName, subscription, CreateRuleOptions.DefaultRuleName);
            Console.WriteLine($"Default filter for {subscription} has been removed.");
        }

        Console.WriteLine("All default Rules have been removed.\n");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }

    await PresentMenu();
}

创建筛选器

以下代码添加在本教程中定义的自定义筛选器:

private async Task CreateCustomFilters()
{
    try
    {
        for (int i = 0; i < Subscriptions.Length; i++)
        {
            var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
            string[] filters = SubscriptionFilters[Subscriptions[i]];
            if (filters[0] != "")
            {
                int count = 0;
                foreach (var myFilter in filters)
                {
                    count++;

                    string action = SubscriptionAction[Subscriptions[i]];
                    if (action != "")
                    {
                        await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
                        {
                            Filter = new SqlRuleFilter(myFilter),
                            Action = new SqlRuleAction(action),
                            Name = $"MyRule{count}"
                        });
                    }
                    else
                    {
                        await client.CreateRuleAsync(TopicName, Subscriptions[i], new CreateRuleOptions
                        {
                            Filter = new SqlRuleFilter(myFilter),
                            Name = $"MyRule{count}"
                        });
                    }
                }
            }

            Console.WriteLine($"Filters and actions for {Subscriptions[i]} have been created.");
        }

        Console.WriteLine("All filters and actions have been created.\n");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }

    await PresentMenu();
}

删除创建的自定义筛选器

若要删除订阅上的所有筛选器,请执行下述演示了详细操作方法的代码:

private async Task CleanUpCustomFilters()
{
    foreach (var subscription in Subscriptions)
    {
        try
        {
            var client = new ServiceBusAdministrationClient(ServiceBusConnectionString);
            IAsyncEnumerator<RuleProperties> rules = client.GetRulesAsync(TopicName, subscription).GetAsyncEnumerator();
            while (await rules.MoveNextAsync())
            {
                await client.DeleteRuleAsync(TopicName, subscription, rules.Current.Name);
                Console.WriteLine($"Rule {rules.Current.Name} has been removed.");
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
    }
    Console.WriteLine("All default filters have been removed.\n");

    await PresentMenu();
}

发送消息

向主题发送消息类似于向队列发送消息。 此示例演示如何使用任务列表和异步处理来发送消息:

public async Task SendMessages()
{
    try
    {
        await using var client = new ServiceBusClient(ServiceBusConnectionString);
        var taskList = new List<Task>();
        for (int i = 0; i < Store.Length; i++)
        {
            taskList.Add(SendItems(client, Store[i]));
        }

        await Task.WhenAll(taskList);
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
    Console.WriteLine("\nAll messages sent.\n");
}

private async Task SendItems(ServiceBusClient client, string store)
{
    // create the sender
    ServiceBusSender tc = client.CreateSender(TopicName);

    for (int i = 0; i < NrOfMessagesPerStore; i++)
    {
        Random r = new Random();
        Item item = new Item(r.Next(5), r.Next(5), r.Next(5));

        // Note the extension class which is serializing an deserializing messages
        ServiceBusMessage message = item.AsMessage();
        message.To = store;
        message.ApplicationProperties.Add("StoreId", store);
        message.ApplicationProperties.Add("Price", item.GetPrice().ToString());
        message.ApplicationProperties.Add("Color", item.GetColor());
        message.ApplicationProperties.Add("Category", item.GetItemCategory());

        await tc.SendMessageAsync(message);
        Console.WriteLine($"Sent item to Store {store}. Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}"); ;
    }
}

接收消息

消息再次通过任务列表接收,代码使用批处理。 可以使用批处理进行发送和接收,但此示例仅演示如何进行批量接收。 事实上,你不会从循环中脱离出来,而是继续进行循环并设置更大的时间跨度,例如一分钟。 对代理的接收调用在此时段内会始终保持开放状态,消息在到达后会立即返回,然后发出新的接收调用。 此概念称为长轮询。 更典型的选项是使用接收泵,详见快速入门以及存储库中的多个其他的示例。

public async Task Receive()
{
    var taskList = new List<Task>();
    for (var i = 0; i < Subscriptions.Length; i++)
    {
        taskList.Add(this.ReceiveMessages(Subscriptions[i]));
    }

    await Task.WhenAll(taskList);
}

private async Task ReceiveMessages(string subscription)
{
    await using var client = new ServiceBusClient(ServiceBusConnectionString);
    ServiceBusReceiver receiver = client.CreateReceiver(TopicName, subscription);

    // In reality you would not break out of the loop like in this example but would keep looping. The receiver keeps the connection open
    // to the broker for the specified amount of seconds and the broker returns messages as soon as they arrive. The client then initiates
    // a new connection. So in reality you would not want to break out of the loop. 
    // Also note that the code shows how to batch receive, which you would do for performance reasons. For convenience you can also always
    // use the regular receive pump which we show in our Quick Start and in other GitHub samples.
    while (true)
    {
        try
        {
            //IList<Message> messages = await receiver.ReceiveAsync(10, TimeSpan.FromSeconds(2));
            // Note the extension class which is serializing an deserializing messages and testing messages is null or 0.
            // If you think you did not receive all messages, just press M and receive again via the menu.
            IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(maxMessages: 100);

            if (messages.Any())
            {
                foreach (ServiceBusReceivedMessage message in messages)
                {
                    lock (Console.Out)
                    {
                        Item item = message.As<Item>();
                        IReadOnlyDictionary<string, object> myApplicationProperties = message.ApplicationProperties;
                        Console.WriteLine($"StoreId={myApplicationProperties["StoreId"]}");
                        if (message.Subject != null)
                        {
                            Console.WriteLine($"Subject={message.Subject}");
                        }
                        Console.WriteLine(
                            $"Item data: Price={item.GetPrice()}, Color={item.GetColor()}, Category={item.GetItemCategory()}");
                    }

                    await receiver.CompleteMessageAsync(message);
                }
            }
            else
            {
                break;
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
    }
}

注意

可以使用服务总线资源管理器管理服务总线资源。 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。

后续步骤

本教程介绍了如何使用 Azure 门户预配资源,然后从服务总线主题及其订阅发送并接收消息。 你已了解如何执行以下操作:

  • 使用 Azure 门户创建一个服务总线主题和一个或多个对该主题的订阅
  • 使用 .NET 代码添加主题筛选器
  • 创建两条具有不同内容的消息
  • 发送消息并验证它们是否已到达预期的订阅
  • 从订阅接收消息

若要通过更多示例来了解如何发送和接收消息,请从 GitHub 上的服务总线示例着手。

若要详细了解如何使用服务总线的发布/订阅功能,请转到下一教程。