如何通过 PHP 使用服务总线主题和订阅

本文说明如何使用服务总线主题和订阅。 示例采用 PHP 编写并使用 Azure SDK for PHP。 涉及的应用场景包括创建主题和订阅创建订阅筛选器将消息发送到主题从订阅接收消息以及删除主题和订阅

什么是服务总线主题和订阅?

服务总线主题和订阅支持 发布/订阅 消息通信模型。 在使用主题和订阅时,分布式应用程序的组件不会直接相互通信,而是通过充当中介的主题交换消息。

TopicConcepts

与每条消息由单个使用方处理的 Service Bus 队列相比,主题和订阅通过发布/订阅模式提供“一对多”通信方式。 可向一个主题注册多个订阅。 当消息发送到主题时,每个订阅会分别对该消息进行处理。

主题订阅类似于接收发送至该主题的消息副本的虚拟队列。 可以选择基于每个订阅注册主题的筛选规则,这样就可以筛选或限制哪些主题订阅接收发送至某个主题的哪些消息。

利用服务总线主题和订阅,可以扩展并处理跨许多用户和应用程序的海量消息。

创建命名空间

若要开始在 Azure 中使用服务总线主题和订阅,必须先创建一个 服务命名空间。 命名空间提供了用于对应用程序中的 Service Bus 资源进行寻址的范围容器。

创建命名空间:

  1. 登录到 Azure 门户

  2. 在门户的左侧导航窗格中,依次单击“+”新建,搜索“service bus"。

  3. 在“创建命名空间” 对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  4. 在确保命名空间名称可用后,选择定价层(基础版或标准版)。

  5. 在“订阅” 字段中,选择要创建命名空间的 Azure 订阅。

  6. 在“资源组” 字段中,选择该命名空间将存在于的现有资源组,或者创建一个新资源组。

  7. 在“位置” 中,选择将在其中托管该命名空间的国家或地区。

    创建命名空间

  8. 单击“创建” 按钮。 系统现已创建命名空间并已将其启用。 您可能需要等待几分钟,因为系统将为您的帐户配置资源。

获取凭据

  1. 在命名空间列表中,单击新创建的命名空间名称。

  2. 在“服务总线命名空间”边栏选项卡中,单击“共享访问策略”。

  3. 在“共享访问策略”边栏选项卡中,单击“RootManageSharedAccessKey”。

    connection-info

  4. 在“策略: RootManageSharedAccessKey”边栏选项卡中,单击“连接字符串 - 主键”旁边的复制按钮,将连接字符串复制到剪贴板供以后使用。

    connection-string

创建 PHP 应用程序

创建访问 Azure Blob 服务的 PHP 应用程序的唯一要求是从代码中引用用于 PHP 的 Azure SDK 中的类。 可以使用任何开发工具或记事本创建应用程序。

Note

PHP 安装还必须已安装并启用 OpenSSL 扩展

本文说明如何使用服务功能,这些功能可在 PHP 应用程序中本地调用,或通过在 Azure 的 Web 角色、辅助角色或网站中运行的代码调用。

获取 Azure 客户端库

通过 Composer 安装

  1. 安装 Git请注意,在 Windows 上,你还必须向 PATH 环境变量添加 Git 可执行文件。

  2. 在您的项目的根目录中创建一个名为 composer.json 的文件并向其添加以下代码:

    {
        "repositories": [
            {
                "type": "pear",
                "url": "http://pear.php.net"
            }
        ],
        "require": {
            "pear-pear.php.net/mail_mime" : "*",
            "pear-pear.php.net/http_request2" : "*",
            "pear-pear.php.net/mail_mimedecode" : "*",
            "microsoft/windowsazure": "*"
        }
    }
    
  3. composer.phar 下载到你的项目根目录中。

  4. 打开命令提示符并在项目根目录中执行以下命令

    php composer.phar install
    

手动安装

若要手动下载并安装用于 Azure 的 PHP 客户端库,请执行以下步骤:

Note

用于 Azure 的 PHP 客户端库依赖于 HTTP_Request2Mail_mimeMail_mimeDecode PEAR 包。若要处理这些依赖关系,建议使用 PEAR 包管理器安装这些包

  1. 下载包含 GitHub 中的库的 .zip 存档。或者,复制现有存储库并将其克隆到您的本地计算机。后一种选择需要一个 GitHub 帐户并要求已在本地安装 Git。

  2. 将已下载存档的 WindowsAzure 目录复制到你的应用程序目录结构中。

有关安装用于 Azure 的 PHP 客户端库的详细信息(包括安装为 PEAR 包的信息),请参阅下载 Azure SDK for PHP

配置应用程序以使用服务总线

若要使用服务总线 API:

  1. 使用 require_once 语句引用 autoloader 文件。
  2. 引用所用的任意类。

以下示例演示如何包含 autoloader 文件并引用 ServiceBusService 类。

Note

本示例(以及本文中的其他示例)假定你已通过 Composer 安装了用于 Azure 的 PHP 客户端库。 如果已手动安装这些库或将其作为 PEAR 包安装,则必须引用 WindowsAzure.php autoloader 文件。

    require_once 'vendor\autoload.php';
    use WindowsAzure\Common\ServicesBuilder;

在下面的示例中,require_once 语句始终显示,但只引用了执行示例所必需的类。

设置服务总线连接

若要实例化服务总线客户端,必须先设置采用以下格式的有效连接字符串:

Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]

其中,Endpoint 的格式通常为 https://[yourNamespace].servicebus.chinacloudapi.cn

若要创建任何 Azure 服务客户端,必须使用 ServicesBuilder 类。可执行以下操作:

  • 将连接字符串直接传递给它。
  • 使用 CloudConfigurationManager (CCM) 检查多个外部源以获取连接字符串:
    • 默认情况下,它附带了对一个外部源的支持 - 环境变量。
    • 可通过扩展 ConnectionStringSource 类来添加新源。

在此处列出的示例中,将直接传递连接字符串。

require_once 'vendor/autoload.php';

        use WindowsAzure\Common\ServicesBuilder;

$connectionString = "Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]";

    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

创建主题

可以通过 ServiceBusRestProxy 类对服务总线主题执行管理操作。 ServiceBusRestProxy 对象是通过 ServicesBuilder::createServiceBusService 工厂方法构造的,包含一个适当的连接字符串,该字符串封装了令牌权限以对它进行管理。

下列示例演示了如何实例化 ServiceBusRestProxy 并调用 ServiceBusRestProxy->createTopic 以在 MySBNamespace 命名空间中创建名为 mytopic 的主题:

    require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;
    use WindowsAzure\Common\ServiceException;
    use WindowsAzure\ServiceBus\Models\TopicInfo;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {       
        // Create topic.
        $topicInfo = new TopicInfo("mytopic");
        $serviceBusRestProxy->createTopic($topicInfo);
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here: 
        // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

Note

可使用 ServiceBusRestProxy 对象上的 listTopics 方法检查服务命名空间中是否已经存在一个具有指定名称的主题。

创建订阅

主题订阅也是使用 ServiceBusRestProxy->createSubscription 方法创建的。订阅已命名,并且具有一个限制传递到订阅的虚拟队列的消息集的可选筛选器。

创建具有默认 (MatchAll) 筛选器的订阅

MatchAll 筛选器是默认筛选器,在创建新订阅时未指定筛选器的情况下使用。 使用 MatchAll 筛选器时,发布到主题的所有消息都会置于订阅的虚拟队列中。 以下示例创建名为“mysubscription”的订阅,并使用默认的 MatchAll 筛选器。

require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;
    use WindowsAzure\Common\ServiceException;
    use WindowsAzure\ServiceBus\Models\SubscriptionInfo;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {
        // Create subscription.
        $subscriptionInfo = new SubscriptionInfo("mysubscription");
        $serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here: 
        // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

创建具有筛选器的订阅

还可以设置筛选器,以指定发送到主题的哪些消息应该在特定主题订阅中显示。 订阅支持的最灵活的一种筛选器是 SqlFilter,它实现了一部分 SQL92 功能。 SQL 筛选器对发布到主题的消息的属性进行操作。 有关 SqlFilters 的详细信息,请参阅 SqlFilter.SqlExpression 属性

Note

有关订阅的每个规则单独处理传入消息,并将其结果消息添加到订阅。 此外,每个新订阅具有一个默认 Rule 对象,该对象包含将主题中的所有消息添加到订阅的筛选器。 要仅接收与筛选器匹配的消息,必须删除默认规则。 可以使用 ServiceBusRestProxy->deleteRule 方法删除默认规则。

以下示例创建了一个名为 HighMessages 的订阅,该订阅带有只选择自定义 MessageNumber 属性大于 3 的消息的 SqlFilter。 有关向消息添加自定义属性的信息,请参阅将消息发送到主题

    $subscriptionInfo = new SubscriptionInfo("HighMessages");
    $serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);

    $serviceBusRestProxy->deleteRule("mytopic", "HighMessages", '$Default');

    $ruleInfo = new RuleInfo("HighMessagesRule");
    $ruleInfo->withSqlFilter("MessageNumber > 3");
    $ruleResult = $serviceBusRestProxy->createRule("mytopic", "HighMessages", $ruleInfo);

请注意,此代码需要使用另一个命名空间:WindowsAzure\ServiceBus\Models\SubscriptionInfo

同样,以下示例创建一个名为 LowMessages 的订阅,其 SqlFilter 只选择 MessageNumber 属性小于或等于 3 的消息。

    $subscriptionInfo = new SubscriptionInfo("LowMessages");
    $serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);

    $serviceBusRestProxy->deleteRule("mytopic", "LowMessages", '$Default');

    $ruleInfo = new RuleInfo("LowMessagesRule");
    $ruleInfo->withSqlFilter("MessageNumber <= 3");
    $ruleResult = $serviceBusRestProxy->createRule("mytopic", "LowMessages", $ruleInfo);

现在,消息发送到 mytopic 主题后总是会传送给订阅了 mysubscription 订阅的接收方,并且会选择性地传送给订阅了 HighMessagesLowMessages 订阅的接收方(具体取决于消息内容)。

将消息发送到主题

要将消息发送到服务总线主题,应用程序应调用 ServiceBusRestProxy->sendTopicMessage 方法。 下面的代码演示了如何将消息发送到先前在 MySBNamespace 服务命名空间创建的 mytopic 主题。

require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;
    use WindowsAzure\Common\ServiceException;
    use WindowsAzure\ServiceBus\Models\BrokeredMessage;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {
        // Create message.
        $message = new BrokeredMessage();
        $message->setBody("my message");

        // Send message.
        $serviceBusRestProxy->sendTopicMessage("mytopic", $message);
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here: 
    // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

发送到服务总线主题的消息是 BrokeredMessage 类的实例。 BrokeredMessage 对象包含一组标准属性和方法以及用来保存自定义应用程序特定属性的属性。 以下示例演示了如何将 5 条测试消息发送到前面创建的 mytopic 主题。 setProperty 方法用于将自定义属性 (MessageNumber) 添加到每条消息。 请注意,每条消息的 MessageNumber 属性值都会不同(可使用该值确定接收它的订阅,如创建订阅部分所述):

    for($i = 0; $i < 5; $i++){
        // Create message.
        $message = new BrokeredMessage();
        $message->setBody("my message ".$i);

        // Set custom property.
        $message->setProperty("MessageNumber", $i);

        // Send message.
        $serviceBusRestProxy->sendTopicMessage("mytopic", $message);
    }

服务总线主题在标准层中支持的最大消息大小为 256 KB。 标头最大为 64 KB,其中包括标准和自定义应用程序属性。 一个主题中包含的消息数量不受限制,但消息的总大小受限制。 主题大小的上限为 5 GB。 有关配额的详细信息,请参阅[服务总线配额][Service Bus quotas]。

从订阅接收消息

从订阅接收消息的最佳方法是使用 ServiceBusRestProxy->receiveSubscriptionMessage 方法。 可通过两种模式接收消息:ReceiveAndDelete 和 PeekLockPeekLock 是默认设置。

使用 ReceiveAndDelete 模式时,接收是一项单次操作,即,当服务总线收到订阅中某条消息的读取请求时,它会将该消息标记为“已使用”并将其返回给应用程序。 ReceiveAndDelete * 模式是最简单的模式,最适合应用程序允许出现故障时不处理消息的方案。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线会将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它会遗漏在发生崩溃前使用的消息。

在默认 PeekLock 模式下,接收消息会变成一个两阶段操作,这能够支持不能允许丢失消息的应用程序。 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。 应用程序完成消息处理(或可靠地存储消息以供日后处理)后,它会将收到的消息传递到 ServiceBusRestProxy->deleteMessage,从而完成接收过程的第二阶段。 发现 deleteMessage 调用时,服务总线会将消息标记为“已使用”,并将消息从队列中删除。

以下示例演示了如何使用 PeekLock 模式(默认模式)接收和处理消息。

require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;
    use WindowsAzure\Common\ServiceException;
    use WindowsAzure\ServiceBus\Models\ReceiveMessageOptions;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {
        // Set receive mode to PeekLock (default is ReceiveAndDelete)
        $options = new ReceiveMessageOptions();
        $options->setPeekLock();

        // Get message.
        $message = $serviceBusRestProxy->receiveSubscriptionMessage("mytopic", "mysubscription", $options);

        echo "Body: ".$message->getBody()."<br />";
        echo "MessageID: ".$message->getMessageId()."<br />";

        /*---------------------------
            Process message here.
        ----------------------------*/

        // Delete message. Not necessary if peek lock is not set.
        echo "Deleting message...<br />";
        $serviceBusRestProxy->deleteMessage($message);
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here:
        // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

如何:处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序出于某种原因无法处理消息,它可以对收到的消息调用 unlockMessage 方法(而不是 deleteMessage 方法)。 这会导致服务总线解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

队列中的锁定消息还有相关超时,如果应用程序无法在锁定超时期满前处理消息(例如,当应用程序发生故障时),服务总线会自动解除锁定消息,让它再次可供接收。

如果应用程序在处理消息后、但在发出 deleteMessage 请求前发生故障,消息会重新传递给重启的应用程序。 此情况通常称作至少处理一次,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。 这通常可以通过使用消息的 getMessageId 方法来实现,消息在多次传送尝试中保持不变。

删除主题和订阅

若要删除某个主题或订阅,请分别使用 ServiceBusRestProxy->deleteTopicServiceBusRestProxy->deleteSubscripton 方法。 请注意,删除某个主题也会删除向该主题注册的所有订阅。

以下示例演示了如何删除名为 mytopic 的主题及其注册的订阅。

require_once 'vendor/autoload.php';

    use WindowsAzure\ServiceBus\ServiceBusService;
    use WindowsAzure\ServiceBus\ServiceBusSettings;
    use WindowsAzure\Common\ServiceException;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {       
        // Delete topic.
        $serviceBusRestProxy->deleteTopic("mytopic");
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here: 
    // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

通过使用 deleteSubscription 方法可单独删除订阅:

    $serviceBusRestProxy->deleteSubscription("mytopic", "mysubscription");

后续步骤

了解服务总线队列的基础知识后,请参阅队列、主题和订阅以获取更多信息。