Compartir a través de

设置订阅筛选器(Azure 服务总线)

本文提供了一些有关对“服务总线”主题订阅设置筛选器的示例。 有关筛选器的概念性信息,请参阅筛选器

使用 Azure 门户

若要在 Azure 门户中设置订阅筛选器,请使用“服务总线订阅”页面的“筛选器”部分

显示“服务总线订阅”页面的屏幕截图,其中突出显示了“筛选器”部分。

使用 Azure CLI

使用 az servicebus topic subscription rule create 创建规则或按订阅筛选。

使用 Azure PowerShell

使用 Set-AzServiceBusRule 创建规则或按订阅筛选。

注意

订阅规则由筛选器和操作组成。 可以使用 CLI 和 PowerShell(但不能使用 Azure 门户)指定操作。

按系统属性筛选

若要在筛选器中引用系统属性,请使用以下格式:sys.<system-property-name>

sys.label LIKE '%bus%'
sys.messageid = 'xxxx'
sys.correlationid like 'abc-%'

注意

按消息属性筛选

下面是在筛选器中使用应用程序或用户属性的示例。 可以使用 Azure.Messaging.ServiceBus.ServiceBusMessage.ApplicationProperties(最新)访问应用程序属性集,或使用语法 user.property-name 或仅使用 property-name 通过 Microsoft.Azure.ServiceBus.ServiceBusMessage(已弃用)访问用户属性集。

MessageProperty = 'A'
user.SuperHero like 'SuperMan%'

注意

2026 年 9 月 30 日,我们将停用 Azure 服务总线 SDK 库 WindowsAzure.ServiceBus、Microsoft.Azure.ServiceBus 和 com.microsoft.azure.servicebus,这些库不符合 Azure SDK 准则。 我们还将结束对 SBMP 协议的支持,因此在 2026 年 9 月 30 日之后,你将无法再使用此协议。 请在该日期之前迁移到最新的 Azure SDK 库,新库提供了关键安全更新和改进功能。

尽管旧库在 2026 年 9 月 30 日之后仍可使用,但它们将不再获得 Azure 的官方支持和更新。 有关详细信息,请参阅支持停用公告

按带有特殊字符的消息属性筛选

如果消息属性名称包含特殊字符,请使用双引号 (") 将属性名称括起来。 例如,如果属性名称为 "http://schemas.microsoft.com/xrm/2011/Claims/EntityLogicalName",则在筛选器中使用以下语法。

"http://schemas.microsoft.com/xrm/2011/Claims/EntityLogicalName" = 'account'

按带有数字值的消息属性筛选

下面的示例演示如何在筛选器中使用带有数字值的属性。

MessageProperty = 1
MessageProperty > 1
MessageProperty > 2.08
MessageProperty = 1 AND MessageProperty2 = 3
MessageProperty = 1 OR MessageProperty2 = 3

基于参数的筛选器

下面是使用基于参数的筛选器的几个示例。 在这些示例中,DataTimeMpDateTime 类型的消息属性,@dtParam 是作为 DateTime 对象传递给筛选器的参数。

DateTimeMp < @dtParam
DateTimeMp > @dtParam

(DateTimeMp2-DateTimeMp1) <= @timespan //@timespan is a parameter of type TimeSpan
DateTimeMp2-DateTimeMp1 <= @timespan

使用 IN 和 NOT IN

StoreId IN('Store1', 'Store2', 'Store3')

sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'

sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')

有关 C# 示例,请参阅 GitHub 上的主题筛选器示例

关联筛选器

使用 CorrelationID 的关联筛选器

new CorrelationFilter("Contoso");

它将筛选 CorrelationID 设置为 Contoso 的消息。

注意

.NET 中的 CorrelationRuleFilter 类位于 Azure.Messaging.ServiceBus.Administration 命名空间中。 有关展示如何使用 .NET 创建筛选器的示例代码,请参阅 GitHub 上的此代码

使用系统属性和用户属性的关联筛选器

var filter = new CorrelationRuleFilter();
filter.Label = "Important";
filter.ReplyTo = "johndoe@contoso.com";
filter.Properties["color"] = "Red";

它等效于 sys.ReplyTo = 'johndoe@contoso.com' AND sys.Label = 'Important' AND color = 'Red'

用于创建订阅筛选器的 .NET 示例

下面是用于创建以下服务总线实体的 .NET C# 示例:

  • 名为 topicfiltersampletopic 的服务总线主题
  • 使用 True Rule 筛选器订阅名为 AllOrders 的主题,此筛选器等效于带表达式 1=1 的 SQL 规则筛选器。
  • 名为 ColorBlueSize10Orders 的订阅,带 SQL 筛选器表达式 color='blue' AND quantity=10
  • 名为 ColorRed 的订阅,带 SQL 筛选器表达式 color='red' 和操作
  • 名为 HighPriorityRedOrders 的订阅,带关联筛选器表达式 Subject = "red", CorrelationId = "high"

有关详细信息,请参阅内联代码注释。

namespace CreateTopicsAndSubscriptionsWithFilters
{
    using Azure.Messaging.ServiceBus.Administration;
    using System;
    using System.Threading.Tasks;

    public class Program
    {
        // Service Bus Administration Client object to create topics and subscriptions
        static ServiceBusAdministrationClient adminClient;

        // connection string to the Service Bus namespace
        static readonly string connectionString = "<YOUR SERVICE BUS NAMESPACE - CONNECTION STRING>";

        // name of the Service Bus topic
        static readonly string topicName = "topicfiltersampletopic";

        // names of subscriptions to the topic
        static readonly string subscriptionAllOrders = "AllOrders";
        static readonly string subscriptionColorBlueSize10Orders = "ColorBlueSize10Orders";
        static readonly string subscriptionColorRed = "ColorRed";
        static readonly string subscriptionHighPriorityRedOrders = "HighPriorityRedOrders";

        public static async Task Main()
        {
            try
            {

                Console.WriteLine("Creating the Service Bus Administration Client object");
                adminClient = new ServiceBusAdministrationClient(connectionString);

                Console.WriteLine($"Creating the topic {topicName}");
                await adminClient.CreateTopicAsync(topicName);

                Console.WriteLine($"Creating the subscription {subscriptionAllOrders} for the topic with a True filter ");
                // Create a True Rule filter with an expression that always evaluates to true
                // It's equivalent to using SQL rule filter with 1=1 as the expression
                await adminClient.CreateSubscriptionAsync(
                        new CreateSubscriptionOptions(topicName, subscriptionAllOrders), 
                        new CreateRuleOptions("AllOrders", new TrueRuleFilter()));

                Console.WriteLine($"Creating the subscription {subscriptionColorBlueSize10Orders} with a SQL filter");
                // Create a SQL filter with color set to blue and quantity to 10
                await adminClient.CreateSubscriptionAsync(
                        new CreateSubscriptionOptions(topicName, subscriptionColorBlueSize10Orders), 
                        new CreateRuleOptions("BlueSize10Orders", new SqlRuleFilter("color='blue' AND quantity=10")));

                Console.WriteLine($"Creating the subscription {subscriptionColorRed} with a SQL filter");
                // Create a SQL filter with color equals to red and a SQL action with a set of statements
                await adminClient.CreateSubscriptionAsync(topicName, subscriptionColorRed);
                // remove the $Default rule
                await adminClient.DeleteRuleAsync(topicName, subscriptionColorRed, "$Default");
                // now create the new rule. notice that user. prefix is used for the user/application property
                await adminClient.CreateRuleAsync(topicName, subscriptionColorRed, new CreateRuleOptions 
                                { 
                                    Name = "RedOrdersWithAction",
                                    Filter = new SqlRuleFilter("user.color='red'"),
                                    Action = new SqlRuleAction("SET quantity = quantity / 2; REMOVE priority;SET sys.CorrelationId = 'low';")

                                }
                );

                Console.WriteLine($"Creating the subscription {subscriptionHighPriorityRedOrders} with a correlation filter");
                // Create a correlation filter with color set to Red and priority set to High
                await adminClient.CreateSubscriptionAsync(
                        new CreateSubscriptionOptions(topicName, subscriptionHighPriorityRedOrders), 
                        new CreateRuleOptions("HighPriorityRedOrders", new CorrelationRuleFilter() {Subject = "red", CorrelationId = "high"} ));

                // delete resources
                //await adminClient.DeleteTopicAsync(topicName);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }
    }
}

用于发送接收消息的 .NET 示例

namespace SendAndReceiveMessages
{
    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.ServiceBus;
    using Newtonsoft.Json;

    public class Program 
    {
        const string TopicName = "TopicFilterSampleTopic";
        const string SubscriptionAllMessages = "AllOrders";
        const string SubscriptionColorBlueSize10Orders = "ColorBlueSize10Orders";
        const string SubscriptionColorRed = "ColorRed";
        const string SubscriptionHighPriorityOrders = "HighPriorityRedOrders";

        // connection string to your Service Bus namespace
        static string connectionString = "<YOUR SERVICE BUS NAMESPACE - CONNECTION STRING>";

        // the client that owns the connection and can be used to create senders and receivers
        static ServiceBusClient client;

        // the sender used to publish messages to the topic
        static ServiceBusSender sender;

        // the receiver used to receive messages from the subscription 
        static ServiceBusReceiver receiver;

        public async Task SendAndReceiveTestsAsync(string connectionString)
        {
            // This sample demonstrates how to use advanced filters with ServiceBus topics and subscriptions.
            // The sample creates a topic and 3 subscriptions with different filter definitions.
            // Each receiver will receive matching messages depending on the filter associated with a subscription.

            // Send sample messages.
            await this.SendMessagesToTopicAsync(connectionString);

            // Receive messages from subscriptions.
            await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionAllMessages);
            await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionColorBlueSize10Orders);
            await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionColorRed);
            await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionHighPriorityOrders);
        }

        async Task SendMessagesToTopicAsync(string connectionString)
        {
            // Create the clients that we'll use for sending and processing messages.
            client = new ServiceBusClient(connectionString);
            sender = client.CreateSender(TopicName);

            Console.WriteLine("\nSending orders to topic.");

            // Now we can start sending orders.
            await Task.WhenAll(
                SendOrder(sender, new Order()),
                SendOrder(sender, new Order { Color = "blue", Quantity = 5, Priority = "low" }),
                SendOrder(sender, new Order { Color = "red", Quantity = 10, Priority = "high" }),
                SendOrder(sender, new Order { Color = "yellow", Quantity = 5, Priority = "low" }),
                SendOrder(sender, new Order { Color = "blue", Quantity = 10, Priority = "low" }),
                SendOrder(sender, new Order { Color = "blue", Quantity = 5, Priority = "high" }),
                SendOrder(sender, new Order { Color = "blue", Quantity = 10, Priority = "low" }),
                SendOrder(sender, new Order { Color = "red", Quantity = 5, Priority = "low" }),
                SendOrder(sender, new Order { Color = "red", Quantity = 10, Priority = "low" }),
                SendOrder(sender, new Order { Color = "red", Quantity = 5, Priority = "low" }),
                SendOrder(sender, new Order { Color = "yellow", Quantity = 10, Priority = "high" }),
                SendOrder(sender, new Order { Color = "yellow", Quantity = 5, Priority = "low" }),
                SendOrder(sender, new Order { Color = "yellow", Quantity = 10, Priority = "low" })
                );

            Console.WriteLine("All messages sent.");
        }

        async Task SendOrder(ServiceBusSender sender, Order order)
        {
            var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(order)))
            {
                CorrelationId = order.Priority,
                Subject = order.Color,
                ApplicationProperties =
                {
                    { "color", order.Color },
                    { "quantity", order.Quantity },
                    { "priority", order.Priority }
                }
            };
            await sender.SendMessageAsync(message);

            Console.WriteLine("Sent order with Color={0}, Quantity={1}, Priority={2}", order.Color, order.Quantity, order.Priority);
        }

        async Task ReceiveAllMessageFromSubscription(string connectionString, string subsName)
        {
            var receivedMessages = 0;

            receiver = client.CreateReceiver(TopicName, subsName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete } );

            // Create a receiver from the subscription client and receive all messages.
            Console.WriteLine("\nReceiving messages from subscription {0}.", subsName);

            while (true)
            {
                var receivedMessage = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(10));
                if (receivedMessage != null)
                {
                    foreach (var prop in receivedMessage.ApplicationProperties)
                    {
                        Console.Write("{0}={1},", prop.Key, prop.Value);
                    }
                    Console.WriteLine("CorrelationId={0}", receivedMessage.CorrelationId);
                    receivedMessages++;
                }
                else
                {
                    // No more messages to receive.
                    break;
                }
            }
            Console.WriteLine("Received {0} messages from subscription {1}.", receivedMessages, subsName);
        }

       public static async Task Main()
        {
            try
            {
                Program app = new Program();
                await app.SendAndReceiveTestsAsync(connectionString);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
            }
        }
    }

    class Order
    {
        public string Color
        {
            get;
            set;
        }

        public int Quantity
        {
            get;
            set;
        }

        public string Priority
        {
            get;
            set;
        }
    }
}

后续步骤

请参阅以下示例:

若要了解 Azure 服务总线功能,请尝试使用所选语言的示例。