设置订阅筛选器(Azure 服务总线)
本文提供了一些有关对“服务总线”主题订阅设置筛选器的示例。 有关筛选器的概念性信息,请参阅筛选器。
使用 Azure 门户
若要在 Azure 门户中设置订阅筛选器,请使用“服务总线订阅”页面的“筛选器”部分。
使用 Azure CLI
使用 az servicebus topic subscription rule create
创建规则或按订阅筛选。
使用 Azure PowerShell
使用 Set-AzServiceBusRule
创建规则或按订阅筛选。
注意
订阅规则由筛选器和操作组成。 可以使用 CLI 和 PowerShell(但不能使用 Azure 门户)指定操作。
按系统属性筛选
若要在筛选器中引用系统属性,请使用以下格式:sys.<system-property-name>
。
sys.label LIKE '%bus%'
sys.messageid = 'xxxx'
sys.correlationid like 'abc-%'
注意
- 有关系统属性的列表,请参阅消息、有效负载和序列化。
- 在筛选器中使用来自 Azure.Messaging.ServiceBus.ServiceBusMessage 的系统属性名称。
- Azure.Messaging.ServiceBus.ServiceBusMessage 中的
Subject
映射到已弃用的 Microsoft.Azure.ServiceBus.Message 中的Label
。
按消息属性筛选
下面是在筛选器中使用应用程序或用户属性的示例。 可以使用 Azure.Messaging.ServiceBus.ServiceBusMessage.ApplicationProperties(最新)访问应用程序属性集,或使用语法 user.property-name
或仅使用 property-name
通过 Microsoft.Azure.ServiceBus.ServiceBusMessage(已弃用)访问用户属性集。
MessageProperty = 'A'
user.SuperHero like 'SuperMan%'
注意
2026 年 9 月 30 日,我们将停用 Azure 服务总线 SDK 库 WindowsAzure.ServiceBus、Microsoft.Azure.ServiceBus 和 com.microsoft.azure.servicebus,这些库不符合 Azure SDK 准则。 我们还将结束对 SBMP 协议的支持,因此在 2026 年 9 月 30 日之后,你将无法再使用此协议。 请在该日期之前迁移到最新的 Azure SDK 库,新库提供了关键安全更新和改进功能。
尽管旧库在 2026 年 9 月 30 日之后仍可使用,但它们将不再获得 Azure 的官方支持和更新。 有关详细信息,请参阅支持停用公告。
按带有特殊字符的消息属性筛选
如果消息属性名称包含特殊字符,请使用双引号 ("
) 将属性名称括起来。 例如,如果属性名称为 "http://schemas.microsoft.com/xrm/2011/Claims/EntityLogicalName"
,则在筛选器中使用以下语法。
"http://schemas.microsoft.com/xrm/2011/Claims/EntityLogicalName" = 'account'
按带有数字值的消息属性筛选
下面的示例演示如何在筛选器中使用带有数字值的属性。
MessageProperty = 1
MessageProperty > 1
MessageProperty > 2.08
MessageProperty = 1 AND MessageProperty2 = 3
MessageProperty = 1 OR MessageProperty2 = 3
基于参数的筛选器
下面是使用基于参数的筛选器的几个示例。 在这些示例中,DataTimeMp
是 DateTime
类型的消息属性,@dtParam
是作为 DateTime
对象传递给筛选器的参数。
DateTimeMp < @dtParam
DateTimeMp > @dtParam
(DateTimeMp2-DateTimeMp1) <= @timespan //@timespan is a parameter of type TimeSpan
DateTimeMp2-DateTimeMp1 <= @timespan
使用 IN 和 NOT IN
StoreId IN('Store1', 'Store2', 'Store3')
sys.To IN ('Store5','Store6','Store7') OR StoreId = 'Store8'
sys.To NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8') OR StoreId NOT IN ('Store1','Store2','Store3','Store4','Store5','Store6','Store7','Store8')
有关 C# 示例,请参阅 GitHub 上的主题筛选器示例。
关联筛选器
使用 CorrelationID 的关联筛选器
new CorrelationFilter("Contoso");
它将筛选 CorrelationID
设置为 Contoso
的消息。
注意
.NET 中的 CorrelationRuleFilter 类位于 Azure.Messaging.ServiceBus.Administration 命名空间中。 有关展示如何使用 .NET 创建筛选器的示例代码,请参阅 GitHub 上的此代码。
使用系统属性和用户属性的关联筛选器
var filter = new CorrelationRuleFilter();
filter.Label = "Important";
filter.ReplyTo = "johndoe@contoso.com";
filter.Properties["color"] = "Red";
它等效于 sys.ReplyTo = 'johndoe@contoso.com' AND sys.Label = 'Important' AND color = 'Red'
用于创建订阅筛选器的 .NET 示例
下面是用于创建以下服务总线实体的 .NET C# 示例:
- 名为
topicfiltersampletopic
的服务总线主题 - 使用 True Rule 筛选器订阅名为
AllOrders
的主题,此筛选器等效于带表达式1=1
的 SQL 规则筛选器。 - 名为
ColorBlueSize10Orders
的订阅,带 SQL 筛选器表达式color='blue' AND quantity=10
- 名为
ColorRed
的订阅,带 SQL 筛选器表达式color='red'
和操作 - 名为
HighPriorityRedOrders
的订阅,带关联筛选器表达式Subject = "red", CorrelationId = "high"
有关详细信息,请参阅内联代码注释。
namespace CreateTopicsAndSubscriptionsWithFilters
{
using Azure.Messaging.ServiceBus.Administration;
using System;
using System.Threading.Tasks;
public class Program
{
// Service Bus Administration Client object to create topics and subscriptions
static ServiceBusAdministrationClient adminClient;
// connection string to the Service Bus namespace
static readonly string connectionString = "<YOUR SERVICE BUS NAMESPACE - CONNECTION STRING>";
// name of the Service Bus topic
static readonly string topicName = "topicfiltersampletopic";
// names of subscriptions to the topic
static readonly string subscriptionAllOrders = "AllOrders";
static readonly string subscriptionColorBlueSize10Orders = "ColorBlueSize10Orders";
static readonly string subscriptionColorRed = "ColorRed";
static readonly string subscriptionHighPriorityRedOrders = "HighPriorityRedOrders";
public static async Task Main()
{
try
{
Console.WriteLine("Creating the Service Bus Administration Client object");
adminClient = new ServiceBusAdministrationClient(connectionString);
Console.WriteLine($"Creating the topic {topicName}");
await adminClient.CreateTopicAsync(topicName);
Console.WriteLine($"Creating the subscription {subscriptionAllOrders} for the topic with a True filter ");
// Create a True Rule filter with an expression that always evaluates to true
// It's equivalent to using SQL rule filter with 1=1 as the expression
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions(topicName, subscriptionAllOrders),
new CreateRuleOptions("AllOrders", new TrueRuleFilter()));
Console.WriteLine($"Creating the subscription {subscriptionColorBlueSize10Orders} with a SQL filter");
// Create a SQL filter with color set to blue and quantity to 10
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions(topicName, subscriptionColorBlueSize10Orders),
new CreateRuleOptions("BlueSize10Orders", new SqlRuleFilter("color='blue' AND quantity=10")));
Console.WriteLine($"Creating the subscription {subscriptionColorRed} with a SQL filter");
// Create a SQL filter with color equals to red and a SQL action with a set of statements
await adminClient.CreateSubscriptionAsync(topicName, subscriptionColorRed);
// remove the $Default rule
await adminClient.DeleteRuleAsync(topicName, subscriptionColorRed, "$Default");
// now create the new rule. notice that user. prefix is used for the user/application property
await adminClient.CreateRuleAsync(topicName, subscriptionColorRed, new CreateRuleOptions
{
Name = "RedOrdersWithAction",
Filter = new SqlRuleFilter("user.color='red'"),
Action = new SqlRuleAction("SET quantity = quantity / 2; REMOVE priority;SET sys.CorrelationId = 'low';")
}
);
Console.WriteLine($"Creating the subscription {subscriptionHighPriorityRedOrders} with a correlation filter");
// Create a correlation filter with color set to Red and priority set to High
await adminClient.CreateSubscriptionAsync(
new CreateSubscriptionOptions(topicName, subscriptionHighPriorityRedOrders),
new CreateRuleOptions("HighPriorityRedOrders", new CorrelationRuleFilter() {Subject = "red", CorrelationId = "high"} ));
// delete resources
//await adminClient.DeleteTopicAsync(topicName);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}
}
用于发送接收消息的 .NET 示例
namespace SendAndReceiveMessages
{
using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Newtonsoft.Json;
public class Program
{
const string TopicName = "TopicFilterSampleTopic";
const string SubscriptionAllMessages = "AllOrders";
const string SubscriptionColorBlueSize10Orders = "ColorBlueSize10Orders";
const string SubscriptionColorRed = "ColorRed";
const string SubscriptionHighPriorityOrders = "HighPriorityRedOrders";
// connection string to your Service Bus namespace
static string connectionString = "<YOUR SERVICE BUS NAMESPACE - CONNECTION STRING>";
// the client that owns the connection and can be used to create senders and receivers
static ServiceBusClient client;
// the sender used to publish messages to the topic
static ServiceBusSender sender;
// the receiver used to receive messages from the subscription
static ServiceBusReceiver receiver;
public async Task SendAndReceiveTestsAsync(string connectionString)
{
// This sample demonstrates how to use advanced filters with ServiceBus topics and subscriptions.
// The sample creates a topic and 3 subscriptions with different filter definitions.
// Each receiver will receive matching messages depending on the filter associated with a subscription.
// Send sample messages.
await this.SendMessagesToTopicAsync(connectionString);
// Receive messages from subscriptions.
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionAllMessages);
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionColorBlueSize10Orders);
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionColorRed);
await this.ReceiveAllMessageFromSubscription(connectionString, SubscriptionHighPriorityOrders);
}
async Task SendMessagesToTopicAsync(string connectionString)
{
// Create the clients that we'll use for sending and processing messages.
client = new ServiceBusClient(connectionString);
sender = client.CreateSender(TopicName);
Console.WriteLine("\nSending orders to topic.");
// Now we can start sending orders.
await Task.WhenAll(
SendOrder(sender, new Order()),
SendOrder(sender, new Order { Color = "blue", Quantity = 5, Priority = "low" }),
SendOrder(sender, new Order { Color = "red", Quantity = 10, Priority = "high" }),
SendOrder(sender, new Order { Color = "yellow", Quantity = 5, Priority = "low" }),
SendOrder(sender, new Order { Color = "blue", Quantity = 10, Priority = "low" }),
SendOrder(sender, new Order { Color = "blue", Quantity = 5, Priority = "high" }),
SendOrder(sender, new Order { Color = "blue", Quantity = 10, Priority = "low" }),
SendOrder(sender, new Order { Color = "red", Quantity = 5, Priority = "low" }),
SendOrder(sender, new Order { Color = "red", Quantity = 10, Priority = "low" }),
SendOrder(sender, new Order { Color = "red", Quantity = 5, Priority = "low" }),
SendOrder(sender, new Order { Color = "yellow", Quantity = 10, Priority = "high" }),
SendOrder(sender, new Order { Color = "yellow", Quantity = 5, Priority = "low" }),
SendOrder(sender, new Order { Color = "yellow", Quantity = 10, Priority = "low" })
);
Console.WriteLine("All messages sent.");
}
async Task SendOrder(ServiceBusSender sender, Order order)
{
var message = new ServiceBusMessage(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(order)))
{
CorrelationId = order.Priority,
Subject = order.Color,
ApplicationProperties =
{
{ "color", order.Color },
{ "quantity", order.Quantity },
{ "priority", order.Priority }
}
};
await sender.SendMessageAsync(message);
Console.WriteLine("Sent order with Color={0}, Quantity={1}, Priority={2}", order.Color, order.Quantity, order.Priority);
}
async Task ReceiveAllMessageFromSubscription(string connectionString, string subsName)
{
var receivedMessages = 0;
receiver = client.CreateReceiver(TopicName, subsName, new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.ReceiveAndDelete } );
// Create a receiver from the subscription client and receive all messages.
Console.WriteLine("\nReceiving messages from subscription {0}.", subsName);
while (true)
{
var receivedMessage = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(10));
if (receivedMessage != null)
{
foreach (var prop in receivedMessage.ApplicationProperties)
{
Console.Write("{0}={1},", prop.Key, prop.Value);
}
Console.WriteLine("CorrelationId={0}", receivedMessage.CorrelationId);
receivedMessages++;
}
else
{
// No more messages to receive.
break;
}
}
Console.WriteLine("Received {0} messages from subscription {1}.", receivedMessages, subsName);
}
public static async Task Main()
{
try
{
Program app = new Program();
await app.SendAndReceiveTestsAsync(connectionString);
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
}
class Order
{
public string Color
{
get;
set;
}
public int Quantity
{
get;
set;
}
public string Priority
{
get;
set;
}
}
}
后续步骤
请参阅以下示例:
若要了解 Azure 服务总线功能,请尝试使用所选语言的示例。