教程:使用 Azure 门户和主题/订阅更新清单Tutorial: Update inventory using Azure portal and topics/subscriptions

世纪互联 Azure 服务总线是一种多租户云消息传送服务,可以在应用程序和服务之间发送信息。21Vianet Azure Service Bus is a multi-tenant cloud messaging service that sends information between applications and services. 异步操作可实现灵活的中转消息传送、结构化的先进先出 (FIFO) 消息传送以及发布/订阅功能。Asynchronous operations give you flexible, brokered messaging, along with structured first-in, first-out (FIFO) messaging, and publish/subscribe capabilities. 本教程介绍如何通过 Azure 门户和 .NET 将零售库存方案中的服务总线主题和订阅与发布/订阅渠道配合使用。This tutorial shows how to use Service Bus topics and subscriptions in a retail inventory scenario, with publish/subscribe channels using the Azure portal and .NET.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 使用 Azure 门户创建一个服务总线主题和一个或多个对该主题的订阅Create a Service Bus topic and one or more subscriptions to that topic using the Azure portal
  • 使用 .NET 代码添加主题筛选器Add topic filters using .NET code
  • 创建两条具有不同内容的消息Create two messages with different content
  • 发送消息并验证它们是否已到达预期的订阅Send the messages and verify they arrived in the expected subscriptions
  • 从订阅接收消息Receive messages from the subscriptions

此方案的一个示例是为多个零售店更新库存分类。An example of this scenario is an inventory assortment update for multiple retail stores. 在此方案中,每个商店或商店组都获取适用于它们的消息来更新其分类。In this scenario, each store, or set of stores, gets messages intended for them to update their assortments. 本教程展示了如何使用订阅和筛选器实现此方案。This tutorial shows how to implement this scenario using subscriptions and filters. 首先,创建一个包含 3 个订阅的主题,添加一些规则和筛选器,然后从主题和订阅发送和接收消息。First, you create a topic with 3 subscriptions, add some rules and filters, and then send and receive messages from the topic and subscriptions.

主题

如果没有 Azure 订阅,可在开始前创建一个试用帐户If you don't have an Azure subscription, you can create a trial account before you begin.

先决条件Prerequisites

若要完成本教程,请确保已安装:To complete this tutorial, make sure you have installed:

服务总线主题和订阅Service Bus topics and subscriptions

每个对主题的订阅都可以接收每条消息的副本。Each subscription to a topic can receive a copy of each message. 主题在协议和语义方面与服务总线队列完全兼容。Topics are fully protocol and semantically compatible with Service Bus queues. 服务总线主题支持一系列选择规则,这些规则具有筛选条件和用来设置或修改消息属性的可选操作。Service Bus topics support a wide array of selection rules with filter conditions, with optional actions that set or modify message properties. 规则每次匹配时,都会生成一条消息。Each time a rule matches, it produces a message. 若要深入了解规则、筛选器和操作,请单击此链接To learn more about rules, filters, and actions, follow this link.

若要开始在 Azure 中使用服务总线消息实体,必须先使用在 Azure 中唯一的名称创建一个命名空间。To begin using Service Bus messaging entities in Azure, you must first create a namespace with a name that is unique across Azure. 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。A namespace provides a scoping container for addressing Service Bus resources within your application.

创建命名空间:To create a namespace:

  1. 登录到 Azure 门户Sign in to the Azure portal.

  2. 在门户的左侧导航窗格中,依次单击“+ 创建资源”、“集成”和“服务总线”。In the left navigation pane of the portal, click + Create a resource, then click Integration, and then click Service Bus.

  3. 在“创建命名空间” 对话框中,输入命名空间名称。In the Create namespace dialog, enter a namespace name. 系统会立即检查该名称是否可用。The system immediately checks to see if the name is available.

  4. 在确保命名空间名称可用后,选择定价层(基本版或标准版)。After making sure the namespace name is available, choose the pricing tier (Basic or Standard). 如果要使用主题和订阅,请务必选择“标准”。If you want to use topics and subscriptions, make sure to choose Standard. 基本定价层中不支持主题/订阅。Topics/subscriptions are not supported in the Basic pricing tier.

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。In the Subscription field, choose an Azure subscription in which to create the namespace.

  6. 在“资源组” 字段中,选择用于放置该命名空间的现有资源组,或者创建一个新资源组。In the Resource group field, choose an existing resource group in which the namespace will live, or create a new one.

  7. 在“位置” 中,选择应在其中托管该命名空间的国家或地区。In Location, choose the country or region in which your namespace should be hosted.

    创建命名空间

  8. 单击“创建” 。Click Create. 系统现已创建命名空间并已将其启用。The system now creates your namespace and enables it. 可能需要等待几分钟,因为系统会为你的帐户配置资源。You might have to wait several minutes as the system provisions resources for your account.

获取管理凭据Obtain the management credentials

创建新的命名空间时,会自动生成一项初始的共享访问签名 (SAS) 规则,将一对主密钥和辅助密钥关联到一起,向每个密钥授予对命名空间的所有资产的完全控制权限。Creating a new namespace automatically generates an initial Shared Access Signature (SAS) rule with an associated pair of primary and secondary keys that each grant full control over all aspects of the namespace. 请参阅服务总线身份验证和授权,了解如何创建更多的规则,对常规的发送者和接收者的权限进行更多限制。See Service Bus authentication and authorization for information about how to create further rules with more constrained rights for regular senders and receivers. 若要复制初始规则,请执行以下步骤:To copy the initial rule, follow these steps:

  1. 单击“所有资源”,然后单击新创建的命名空间名称。Click All resources, then click the newly created namespace name.

  2. 在命名空间窗口中,单击“共享访问策略”。In the namespace window, click Shared access policies.

  3. 在“共享访问策略”屏幕中,单击“RootManageSharedAccessKey”。In the Shared access policies screen, click RootManageSharedAccessKey.

    connection-info

  4. 在“策略: RootManageSharedAccessKey”窗口中,单击“主连接字符串”旁边的“复制”按钮,将连接字符串复制到剪贴板供以后使用。In the Policy: RootManageSharedAccessKey window, click the copy button next to Primary Connection String, to copy the connection string to your clipboard for later use. 将此值粘贴到记事本或其他某个临时位置。Paste this value into Notepad or some other temporary location.

    connection-string

  5. 重复上述步骤,将主键的值复制和粘贴到临时位置,以供稍后使用。Repeat the previous step, copying and pasting the value of Primary key to a temporary location for later use.

使用 Azure 门户创建主题Create a topic using the Azure portal

  1. 在“服务总线命名空间”页面上,选择左侧菜单中的“主题”。On the Service Bus Namespace page, select Topics on the left menu.

  2. 在工具栏中选择“+ 主题”。Select + Topic on the toolbar.

  3. 输入主题名称。Enter a name for the topic. 将其他选项保留默认值。Leave the other options with their default values.

  4. 选择“创建” 。Select Create.

    创建主题

创建主题的订阅Create subscriptions to the topic

  1. 选择在上一部分创建的主题。Select the topic that you created in the previous section.

    选择主题

  2. 在“服务总线主题”页面上,从左侧菜单中选择“订阅”,然后工具栏上选择“+ 订阅”。On the Service Bus Topic page, select Subscriptions from the left menu, and then select + Subscription on the toolbar.

    添加订阅按钮

  3. 在“创建订阅”页上,输入“S1”作为订阅名称,然后选择“创建”。On the Create subscription page, enter S1 for name for the subscription, and then select Create.

    创建订阅页面

  4. 重复上述步骤两次,以创建名为 S2S3 的订阅。Repeat the previous step twice to create subscriptions named S2 and S3.

在订阅上创建筛选规则Create filter rules on subscriptions

预配命名空间和主题/订阅并且拥有所需的凭据后,便可以在订阅上创建筛选规则,然后发送和接收消息。After the namespace and topic/subscriptions are provisioned, and you have the necessary credentials, you are ready to create filter rules on the subscriptions, then send and receive messages. 可以在此 GitHub 示例文件夹中检查代码。You can examine the code in this GitHub sample folder.

发送和接收消息Send and receive messages

若要运行此代码,请执行以下操作:To run the code, do the following:

  1. 在命令提示符或 PowerShell 提示符窗口中发出以下命令,克隆服务总线 GitHub 存储库In a command prompt or PowerShell prompt, clone the Service Bus GitHub repository by issuing the following command:

    git clone https://github.com/Azure/azure-service-bus.git
    
  2. 导航到示例文件夹 azure-service-bus\samples\DotNet\GettingStarted\BasicSendReceiveTutorialwithFiltersNavigate to the sample folder azure-service-bus\samples\DotNet\GettingStarted\BasicSendReceiveTutorialwithFilters.

  3. 在本教程的获取管理凭据部分获取复制到记事本的连接字符串。Obtain the connection string you copied to Notepad in the Obtain the management credentials section of this tutorial. 此外还需要在上一部分创建的主题的名称。You also need the name of the topic you created in the previous section.

  4. 在命令提示符窗口中键入以下命令:At the command prompt, type the following command:

    dotnet build
    
  5. 导航到 BasicSendReceiveTutorialwithFilters\bin\Debug\netcoreapp2.0 文件夹。Navigate to the BasicSendReceiveTutorialwithFilters\bin\Debug\netcoreapp2.0 folder.

  6. 键入以下命令以运行程序。Type the following command to run the program. 请务必将 myConnectionString 替换为先前获得的值,将 myTopicName 替换为所创建主题的名称:Be sure to replace myConnectionString with the value you previously obtained, and myTopicName with the name of the topic you created:

    dotnet BasicSendReceiveTutorialwithFilters.dll -ConnectionString "myConnectionString" -TopicName "myTopicName"
    
  7. 首先按控制台中的说明选择筛选器创建操作。Follow the instructions in the console to select filter creation first. 在创建筛选器时,其中一项操作是删除默认筛选器。Part of creating filters is to remove the default filters. 使用 PowerShell 或 CLI 时,不需删除默认筛选器,但如果是在代码中操作,则必须删除它们。When you use PowerShell or CLI you don't need to remove the default filter, but if you do this in code, you must remove them. 控制台命令 1 和 3 用于管理以前创建的订阅上的筛选器:The console commands 1 and 3 help you manage the filters on the subscriptions you previously created:

    • 执行操作 1:删除默认筛选器。Execute 1: to remove the default filters.

    • 执行操作 2:添加自己的筛选器。Execute 2: to add your own filters.

    • 执行操作 3:(可选)删除自己的筛选器。Execute 3: to optionally remove your own filters. 请注意,这不会重新创建默认筛选器。Note that this will not recreate the default filters.

      显示 2 的输出

  8. 创建筛选器以后,即可发送消息。After filter creation, you can send messages. 按 4 即可观察到 10 条消息发送到主题:Press 4 and observe 10 messages being sent to the topic:

    发送输出

  9. 按 5 即可观察到这些消息被接收。Press 5 and observe the messages being received. 如果没有返回 10 条消息,请按“m”以显示菜单,然后再次按 5。If you did not get 10 messages back, press "m" to display the menu, then press 5 again.

    接收输出

清理资源Clean up resources

不再需要命名空间和队列时,可将其删除。When no longer needed, delete the namespace and queue. 为此,请在门户中选择这些资源,然后单击“删除”。To do so, select these resources on the portal and click Delete.

了解示例代码Understand the sample code

此部分包含有关示例代码功能的更多详细信息。This section contains more details about what the sample code does.

获取连接字符串和主题Get connection string and topic

首先,此代码声明一组变量,这些变量推动程序的剩余执行操作。First, the code declares a set of variables, which drive the remaining execution of the program.

string ServiceBusConnectionString;
string TopicName;

static string[] Subscriptions = { "S1", "S2", "S3" };
static IDictionary<string, string[]> SubscriptionFilters = new Dictionary<string, string[]> {
    { "S1", new[] { "StoreId IN('Store1', 'Store2', 'Store3')", "StoreId = 'Store4'"} },
    { "S2", new[] { "sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'" } },
    { "S3", new[] { "sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')" } }
};
// You can have only have one action per rule and this sample code supports only one action for the first filter, which is used to create the first rule. 
static IDictionary<string, string> SubscriptionAction = new Dictionary<string, string> {
    { "S1", "" },
    { "S2", "" },
    { "S3", "SET sys.Label = 'SalesEvent'"  }
};
static string[] Store = { "Store1", "Store2", "Store3", "Store4", "Store5", "Store6", "Store7", "Store8", "Store9", "Store10" };
static string SysField = "sys.To";
static string CustomField = "StoreId";
static int NrOfMessagesPerStore = 1; // Send at least 1.

连接字符串和主题名称通过所示的命令行参数传入,然后通过 Main() 方法读取:The connection string and topic name are passed in via command line parameters as shown, and then are read in the Main() method:

static void Main(string[] args)
{
    string ServiceBusConnectionString = "";
    string TopicName = "";

    for (int i = 0; i < args.Length; i++)
    {
        if (args[i] == "-ConnectionString")
        {
            Console.WriteLine($"ConnectionString: {args[i + 1]}");
            ServiceBusConnectionString = args[i + 1]; // Alternatively enter your connection string here.
        }
        else if (args[i] == "-TopicName")
        {
            Console.WriteLine($"TopicName: {args[i + 1]}");
            TopicName = args[i + 1]; // Alternatively enter your queue name here.
        }
    }

    if (ServiceBusConnectionString != "" && TopicName != "")
    {
        Program P = StartProgram(ServiceBusConnectionString, TopicName);
        P.PresentMenu().GetAwaiter().GetResult();
    }
    else
    {
        Console.WriteLine("Specify -Connectionstring and -TopicName to execute the example.");
        Console.ReadKey();
    }
}

删除默认筛选器Remove default filters

创建订阅时,服务总线会为每个订阅创建默认筛选器。When you create a subscription, Service Bus creates a default filter per subscription. 该筛选器允许接收发送到主题的每条消息。This filter enables receiving every message sent to the topic. 若要使用自定义筛选器,则可删除默认筛选器,如以下代码所示:If you want to use custom filters, you can remove the default filter, as shown in the following code:

private async Task RemoveDefaultFilters()
{
    Console.WriteLine($"Starting to remove default filters.");

    try
    {
        foreach (var subscription in Subscriptions)
        {
            SubscriptionClient s = new SubscriptionClient(ServiceBusConnectionString, TopicName, subscription);
            await s.RemoveRuleAsync(RuleDescription.DefaultRuleName);
            Console.WriteLine($"Default filter for {subscription} has been removed.");
            await s.CloseAsync();
        }

        Console.WriteLine("All default Rules have been removed.\n");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }

    await PresentMenu();
}

创建筛选器Create filters

以下代码添加在本教程中定义的自定义筛选器:The following code adds the custom filters defined in this tutorial:

private async Task CreateCustomFilters()
{
    try
    {
        for (int i = 0; i < Subscriptions.Length; i++)
        {
            SubscriptionClient s = new SubscriptionClient(ServiceBusConnectionString, TopicName, Subscriptions[i]);
            string[] filters = SubscriptionFilters[Subscriptions[i]];
            if (filters[0] != "")
            {
                int count = 0;
                foreach (var myFilter in filters)
                {
                    count++;

                    string action = SubscriptionAction[Subscriptions[i]];
                    if (action != "")
                    {
                        await s.AddRuleAsync(new RuleDescription
                        {
                            Filter = new SqlFilter(myFilter),
                            Action = new SqlRuleAction(action),
                            Name = $"MyRule{count}"
                        });
                    }
                    else
                    {
                        await s.AddRuleAsync(new RuleDescription
                        {
                            Filter = new SqlFilter(myFilter),
                            Name = $"MyRule{count}"
                        });
                    }
                }
            }

            Console.WriteLine($"Filters and actions for {Subscriptions[i]} have been created.");
        }

        Console.WriteLine("All filters and actions have been created.\n");
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }

    await PresentMenu();
}

删除创建的自定义筛选器Remove your custom created filters

若要删除订阅上的所有筛选器,请执行下述演示了详细操作方法的代码:If you want to remove all filters on your subscription, the following code shows how to do that:

private async Task CleanUpCustomFilters()
{
    foreach (var subscription in Subscriptions)
    {
        try
        {
            SubscriptionClient s = new SubscriptionClient(ServiceBusConnectionString, TopicName, subscription);
            IEnumerable<RuleDescription> rules = await s.GetRulesAsync();
            foreach (RuleDescription r in rules)
            {
                await s.RemoveRuleAsync(r.Name);
                Console.WriteLine($"Rule {r.Name} has been removed.");
            }
            await s.CloseAsync();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
    }
    Console.WriteLine("All default filters have been removed.\n");

    await PresentMenu();
}

发送消息Send messages

向主题发送消息类似于向队列发送消息。Sending messages to a topic is similar to sending messages to a queue. 此示例演示如何使用任务列表和异步处理来发送消息:This example shows how to send messages, using a task list and asynchronous processing:

public async Task SendMessages()
{
    try
    {
        TopicClient tc = new TopicClient(ServiceBusConnectionString, TopicName);

        var taskList = new List<Task>();
        for (int i = 0; i < Store.Length; i++)
        {
            taskList.Add(SendItems(tc, Store[i]));
        }

        await Task.WhenAll(taskList);
        await tc.CloseAsync();
    }
    catch (Exception ex)
    {
        Console.WriteLine(ex.ToString());
    }
    Console.WriteLine("\nAll messages sent.\n");
}

private async Task SendItems(TopicClient tc, string store)
{
    for (int i = 0; i < NrOfMessagesPerStore; i++)
    {
        Random r = new Random();
        Item item = new Item(r.Next(5), r.Next(5), r.Next(5));

        // Note the extension class which is serializing an deserializing messages
        Message message = item.AsMessage();
        message.To = store;
        message.UserProperties.Add("StoreId", store);
        message.UserProperties.Add("Price", item.getPrice().ToString());
        message.UserProperties.Add("Color", item.getColor());
        message.UserProperties.Add("Category", item.getItemCategory());

        await tc.SendAsync(message);
        Console.WriteLine($"Sent item to Store {store}. Price={item.getPrice()}, Color={item.getColor()}, Category={item.getItemCategory()}"); ;
    }
}

接收消息Receive messages

消息再次通过任务列表接收,代码使用批处理。Messages are again received via a task list, and the code uses batching. 可以使用批处理进行发送和接收,但此示例仅演示如何进行批量接收。You can send and receive using batching, but this example only shows how to batch receive. 事实上,你不会从循环中脱离出来,而是继续进行循环并设置更大的时间跨度,例如一分钟。In reality, you would not break out of the loop, but keep looping and set a higher timespan, such as one minute. 对代理的接收调用在此时段内会始终保持开放状态,消息在到达后会立即返回,然后发出新的接收调用。The receive call to the broker is kept open for this amount of time and if messages arrive, they are returned immediately and a new receive call is issued. 此概念称为长轮询。This concept is called long polling. 更典型的选项是使用接收泵,详见快速入门以及存储库中的多个其他的示例。Using the receive pump which you can see in the quickstart, and in several other samples in the repository, is a more typical option.

public async Task Receive()
{
    var taskList = new List<Task>();
    for (var i = 0; i < Subscriptions.Length; i++)
    {
        taskList.Add(this.ReceiveMessages(Subscriptions[i]));
    }

    await Task.WhenAll(taskList);
}

private async Task ReceiveMessages(string subscription)
{
    var entityPath = EntityNameHelper.FormatSubscriptionPath(TopicName, subscription);
    var receiver = new MessageReceiver(ServiceBusConnectionString, entityPath, ReceiveMode.PeekLock, RetryPolicy.Default, 100);

    while (true)
    {
        try
        {
            IList<Message> messages = await receiver.ReceiveAsync(10, TimeSpan.FromSeconds(2));
            if (messages.Any())
            {
                foreach (var message in messages)
                {
                    lock (Console.Out)
                    {
                        Item item = message.As<Item>();
                        IDictionary<string, object> myUserProperties = message.UserProperties;
                        Console.WriteLine($"StoreId={myUserProperties["StoreId"]}");

                        if (message.Label != null)
                        {
                            Console.WriteLine($"Label={message.Label}");
                        }

                        Console.WriteLine(
                            $"Item data: Price={item.getPrice()}, Color={item.getColor()}, Category={item.getItemCategory()}");
                    }

                    await receiver.CompleteAsync(message.SystemProperties.LockToken);
                }
            }
            else
            {
                break;
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.ToString());
        }
    }

    await receiver.CloseAsync();
}

后续步骤Next steps

本教程介绍了如何使用 Azure 门户预配资源,然后从服务总线主题及其订阅发送并接收消息。In this tutorial, you provisioned resources using the Azure portal, then sent and received messages from a Service Bus topic and its subscriptions. 你已了解如何:You learned how to:

  • 使用 Azure 门户创建一个服务总线主题和一个或多个对该主题的订阅Create a Service Bus topic and one or more subscriptions to that topic using the Azure portal
  • 使用 .NET 代码添加主题筛选器Add topic filters using .NET code
  • 创建两条具有不同内容的消息Create two messages with different content
  • 发送消息并验证它们是否已到达预期的订阅Send the messages and verify they arrived in the expected subscriptions
  • 从订阅接收消息Receive messages from the subscriptions

若要通过更多示例来了解如何发送和接收消息,请从 GitHub 上的服务总线示例着手。For more examples of sending and receiving messages, get started with the Service Bus samples on GitHub.

若要详细了解如何使用服务总线的发布/订阅功能,请转到下一教程。Advance to the next tutorial to learn more about using the publish/subscribe capabilities of Service Bus.