如何通过 PHP 使用服务总线主题和订阅

本文说明如何使用服务总线主题和订阅。 示例采用 PHP 编写并使用 Azure SDK for PHP。 涉及的方案包括:

  • 创建主题和订阅
  • 创建订阅筛选器
  • 将消息发送到主题
  • 从订阅接收消息
  • 删除主题和订阅

重要

截至 2021 年 2 月,适用于 PHP 的 Azure SDK 已进入停用阶段,不再受到 Azure 的正式支持。 有关详细信息,请参阅 GitHub 上的此公告。 本文将很快停用。

先决条件

  1. Azure 订阅。 要完成本文中的步骤,需要一个 Azure 帐户。 可以激活 Visual Studio 或 MSDN 订阅者权益

  2. 遵循快速入门:使用 Azure 门户创建服务总线主题以及对该主题的订阅中的步骤创建服务总线命名空间并获取连接字符串

    注意

    在本文中,你将使用 PHP 创建“主题”以及对该主题的“订阅” 。

创建 PHP 应用程序

创建访问 Azure Blob 服务的 PHP 应用程序的唯一要求是从代码中引用用于 PHP 的 Azure SDK 中的类。 可以使用任何开发工具或记事本创建应用程序。

注意

PHP 安装还必须已安装并启用 OpenSSL 扩展

本文说明如何使用服务功能,这些功能可在 PHP 应用程序中本地调用,或通过在 Azure 的 Web 角色、辅助角色或网站中运行的代码调用。

获取 Azure 客户端库

通过 Composer 安装

  1. 在项目的根目录中创建一个名为 composer.json 的文件并向其添加以下代码:

    {
      "require": {
        "microsoft/windowsazure": "*"
      }
    }
    
  2. [composer.phar][composer-phar] 下载到项目根目录中。

  3. 打开命令提示符并在项目根目录中执行以下命令

    php composer.phar install
    

配置应用程序以使用服务总线

若要使用服务总线 API:

  1. 使用 require_once 语句引用 autoloader 文件。
  2. 引用所用的任意类。

以下示例演示如何包含 autoloader 文件并引用 ServiceBusService 类。

注意

本示例(以及本文中的其他示例)假定你已通过 Composer 安装了用于 Azure 的 PHP 客户端库。 如果已手动安装这些库或将其作为 PEAR 包安装,则必须引用 WindowsAzure.php autoloader 文件。

require_once 'vendor/autoload.php';
use WindowsAzure\Common\ServicesBuilder;

在下面的示例中,require_once 语句始终显示,但只引用了执行示例所必需的类。

设置服务总线连接

若要实例化服务总线客户端,必须先设置采用以下格式的有效连接字符串:

Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]

其中,Endpoint 的格式通常为 https://[yourNamespace].servicebus.chinacloudapi.cn

若要创建任何 Azure 服务客户端,必须使用 ServicesBuilder 类。 方法:

  • 将连接字符串直接传递给它。
  • 使用 CloudConfigurationManager (CCM) 检查多个外部源以获取连接字符串:
    • 默认情况下,它附带了对一个外部源的支持 - 环境变量。
    • 可通过扩展 ConnectionStringSource 类来添加新源。

在此处列出的示例中,将直接传递连接字符串。

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;

$connectionString = "Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]";

$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

创建主题

可以通过 ServiceBusRestProxy 类对服务总线主题执行管理操作。 ServiceBusRestProxy 对象是通过 ServicesBuilder::createServiceBusService 工厂方法构造的,包含一个适当的连接字符串,该字符串封装了令牌权限以对它进行管理。

下列示例演示了如何实例化 ServiceBusRestProxy 并调用 ServiceBusRestProxy->createTopic 以在 MySBNamespace 命名空间中创建名为 mytopic 的主题:

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\TopicInfo;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Create topic.
    $topicInfo = new TopicInfo("mytopic");
    $serviceBusRestProxy->createTopic($topicInfo);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://learn.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

注意

可使用 ServiceBusRestProxy 对象上的 listTopics 方法检查服务命名空间中是否已经存在一个具有指定名称的主题。

创建订阅

主题订阅也是使用 ServiceBusRestProxy->createSubscription 方法创建的。 订阅已命名,并且具有一个限制传递到订阅的虚拟队列的消息集的可选筛选器。

创建具有默认 (MatchAll) 筛选器的订阅

如果在创建新订阅时未指定任何筛选器,则将使用默认的 MatchAll 筛选器。 使用 MatchAll 筛选器时,发布到主题的所有消息都会置于订阅的虚拟队列中。 以下示例创建名为 mysubscription 的订阅,并使用默认的 MatchAll 筛选器。

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\SubscriptionInfo;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Create subscription.
    $subscriptionInfo = new SubscriptionInfo("mysubscription");
    $serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://learn.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

创建具有筛选器的订阅

还可以设置筛选器,以指定发送到主题的哪些消息应该在特定主题订阅中显示。 订阅支持的最灵活的一种筛选器是 SqlFilter,它实现了一部分 SQL92 功能。 SQL 筛选器对发布到主题的消息的属性进行操作。 有关 SqlFilter 的详细信息,请参阅 SqlFilter.SqlExpression 属性

注意

有关订阅的每个规则单独处理传入消息,并将其结果消息添加到订阅。 此外,每个新订阅具有一个默认 Rule 对象,该对象包含将主题中的所有消息添加到订阅的筛选器。 要仅接收与筛选器匹配的消息,必须删除默认规则。 可以使用 ServiceBusRestProxy->deleteRule 方法删除默认规则。

以下示例创建了一个名为 HighMessages 的订阅,该订阅带有只选择自定义 MessageNumber 属性大于 3 的消息的 SqlFilter。 有关向消息添加自定义属性的信息,请参阅将消息发送到主题

$subscriptionInfo = new SubscriptionInfo("HighMessages");
$serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);

$serviceBusRestProxy->deleteRule("mytopic", "HighMessages", '$Default');

$ruleInfo = new RuleInfo("HighMessagesRule");
$ruleInfo->withSqlFilter("MessageNumber > 3");
$ruleResult = $serviceBusRestProxy->createRule("mytopic", "HighMessages", $ruleInfo);

此代码需要使用其他命名空间:WindowsAzure\ServiceBus\Models\SubscriptionInfo

同样,以下示例创建一个名为 LowMessages 的订阅,其 SqlFilter 只选择 MessageNumber 属性小于或等于 3 的消息。

$subscriptionInfo = new SubscriptionInfo("LowMessages");
$serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);

$serviceBusRestProxy->deleteRule("mytopic", "LowMessages", '$Default');

$ruleInfo = new RuleInfo("LowMessagesRule");
$ruleInfo->withSqlFilter("MessageNumber <= 3");
$ruleResult = $serviceBusRestProxy->createRule("mytopic", "LowMessages", $ruleInfo);

现在,消息发送到 mytopic 主题后总是会传送给订阅了 mysubscription 订阅的接收方,并且会选择性地传送给订阅了 HighMessagesLowMessages 订阅的接收方(具体取决于消息内容)。

将消息发送到主题

要将消息发送到服务总线主题,应用程序应调用 ServiceBusRestProxy->sendTopicMessage 方法。 下面的代码演示了如何将消息发送到先前在 MySBNamespace 服务命名空间创建的 mytopic 主题。

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\BrokeredMessage;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Create message.
    $message = new BrokeredMessage();
    $message->setBody("my message");

    // Send message.
    $serviceBusRestProxy->sendTopicMessage("mytopic", $message);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://learn.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

发送到服务总线主题的消息是 BrokeredMessage 类的实例。 BrokeredMessage 对象包含一组标准属性和方法以及可用来保存自定义应用程序特定属性的属性。 以下示例演示如何向以前创建的 mytopic 主题发送五条测试消息。 setProperty 方法用于将自定义属性 (MessageNumber) 添加到每条消息。 MessageNumber 属性值在每条消息中都有所不同(你可以使用此值确定哪些订阅接收它,如创建订阅部分中所示):

for($i = 0; $i < 5; $i++){
    // Create message.
    $message = new BrokeredMessage();
    $message->setBody("my message ".$i);

    // Set custom property.
    $message->setProperty("MessageNumber", $i);

    // Send message.
    $serviceBusRestProxy->sendTopicMessage("mytopic", $message);
}

服务总线主题在标准层中支持的最大消息大小为 256 KB,在高级层中则为 100 MB。 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。 一个主题中包含的消息数量不受限制,但消息的总大小受限制。 此主题大小上限为 5 GB。 有关配额的详细信息,请参阅 服务总线配额

从订阅接收消息

从订阅接收消息的最佳方法是使用 ServiceBusRestProxy->receiveSubscriptionMessage 方法。 可在两种不同的模式下接收消息:ReceiveAndDeletePeekLockPeekLock 是默认设置。

使用 ReceiveAndDelete 模式时,接收是一项单次操作,即,当服务总线收到订阅中某条消息的读取请求时,它会将该消息标记为“已使用”并将其返回给应用程序。 ReceiveAndDelete * 模式是最简单的模型,最适合应用程序允许在出现故障时不处理消息的情形。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线已将消息标记为“已使用”,因此当应用程序重新启动并重新开始使用消息时,它会漏掉在发生崩溃前使用的消息。

在默认 PeekLock 模式下,接收消息会变成一个两阶段操作,这能够支持不能允许丢失消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序完成消息处理(或可靠地存储消息以供日后处理)后,它会将收到的消息传递到 ServiceBusRestProxy->deleteMessage,从而完成接收过程的第二阶段。 发现 deleteMessage 调用时,服务总线会将消息标记为“已使用”,并将消息从队列中删除。

以下示例演示了如何使用 PeekLock 模式(默认模式)接收和处理消息。

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\ReceiveMessageOptions;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Set receive mode to PeekLock (default is ReceiveAndDelete)
    $options = new ReceiveMessageOptions();
    $options->setPeekLock();

    // Get message.
    $message = $serviceBusRestProxy->receiveSubscriptionMessage("mytopic", "mysubscription", $options);

    echo "Body: ".$message->getBody()."<br />";
    echo "MessageID: ".$message->getMessageId()."<br />";

    /*---------------------------
        Process message here.
    ----------------------------*/

    // Delete message. Not necessary if peek lock is not set.
    echo "Deleting message...<br />";
    $serviceBusRestProxy->deleteMessage($message);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://learn.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

如何:处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序出于某种原因无法处理消息,它可以对收到的消息调用 unlockMessage 方法(而不是 deleteMessage 方法)。 这会导致服务总线解锁队列中的消息,并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

队列中的锁定消息还有相关超时,如果应用程序无法在锁定超时期满前处理消息(例如,当应用程序发生故障时),服务总线会自动解除锁定消息,让它再次可供接收。

如果应用程序在处理消息后、但在发出 deleteMessage 请求前发生故障,消息会重新传递给重启的应用程序。 这种处理类型通常称作“至少处理一次”,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 这通常可通过使用消息的 getMessageId 方法来实现,该方法在多次传送尝试中保持不变。

删除主题和订阅

若要删除某个主题或订阅,请分别使用 ServiceBusRestProxy->deleteTopicServiceBusRestProxy->deleteSubscripton 方法。 删除某个主题也会删除向该主题注册的所有订阅。

以下示例演示了如何删除名为 mytopic 的主题及其注册的订阅。

require_once 'vendor/autoload.php';

use WindowsAzure\ServiceBus\ServiceBusService;
use WindowsAzure\ServiceBus\ServiceBusSettings;
use WindowsAzure\Common\ServiceException;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Delete topic.
    $serviceBusRestProxy->deleteTopic("mytopic");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://learn.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

通过使用 deleteSubscription 方法可单独删除订阅:

$serviceBusRestProxy->deleteSubscription("mytopic", "mysubscription");

注意

可以使用服务总线资源管理器管理服务总线资源。 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。

后续步骤

有关详细信息,请参阅队列、主题和订阅