如何使用服务总线队列

本指南说明如何使用服务总线队列。 示例采用 PHP 编写并使用 Azure SDK for PHP。 涉及的任务包括创建队列发送和接收消息以及删除队列

什么是 Service Bus 队列?

服务总线队列支持中转消息传送通信模型。 在使用队列时,分布式应用程序的组件不会直接相互通信,而是通过充当中介(代理)的队列交换消息。 消息创建方(发送方)将消息传送到队列,然后继续对其进行处理。 消息使用方(接收方)以异步方式从队列中提取消息并对其进行处理。 创建方不必等待使用方的答复即可继续处理并发送更多消息。 队列为一个或多个竞争使用方提供先入先出 (FIFO)消息传递方式。 也就是说,接收方通常会按照消息添加到队列中的顺序来接收并处理消息,并且每条消息仅由一个消息使用方接收并处理。

QueueConcepts

服务总线队列是一种可用于各种应用场景的通用技术:

  • 多层 Azure 应用程序中 Web 角色和辅助角色之间的通信。
  • 混合解决方案中本地应用程序和 Azure 托管应用程序之间的通信。
  • 在不同组织或组织的各部门中本地运行的分布式应用程序组件之间的通信。

利用队列,你可以更轻松地缩放应用程序,并增强体系结构的弹性。

创建服务命名空间

若要开始在 Azure 中使用服务总线队列,必须先创建一个服务命名空间。 命名空间提供了用于对应用程序中的服务总线资源进行寻址的范围容器。

创建服务命名空间:

  1. 登录到 Azure 经典门户

  2. 在门户的左侧导航窗格中,单击“服务总线”。

  3. 在门户的下方窗格中,单击“创建”。

  4. 在“添加新命名空间”对话框中,输入命名空间名称。 系统会立即检查该名称是否可用。

  5. 在确保命名空间名称可用后,选择应承载你的命名空间的国家或地区(确保使用在其中部署计算资源的同一国家/地区)。

    Important

    选取要选择用于部署应用程序的相同区域。 这将为你提供最佳性能。

  6. 将对话框中的其他字段保留其默认值(“消息传送”和“标准层”),然后单击“确定”复选标记。 系统现已创建命名空间并已将其启用。 您可能需要等待几分钟,因为系统将为您的帐户配置资源。

创建的命名空间将花费一段时间来激活,然后显示在门户中。 请等到命名空间状态变为“活动”后再继续操作 。

获取命名空间的默认管理凭据

若要在新命名空间上执行管理操作(如创建队列),则必须获取该命名空间的管理凭据。 可以从 Azure 经典门户中获取这些凭据。

从门户中获取管理凭据

  1. 在左侧导航窗格中,单击“服务总线”节点以显示可用命名空间的列表:

  2. 从显示的列表中选择刚刚创建的命名空间:

  3. 单击“连接信息”。

  4. 在“访问连接信息”窗格中,找到包含 SAS 密钥和密钥名称的连接字符串。

  5. 记下该密钥或将其复制到剪贴板。

创建 PHP 应用程序

创建访问 Azure Blob 服务的 PHP 应用程序的唯一要求是从代码内引用用于 PHP 的 Azure SDK 中的类。 可以使用任何开发工具或记事本创建应用程序。

Note

在安装 PHP 的过程中,还必须安装并启用 OpenSSL 扩展

本指南会使用服务功能,这些功能可在 PHP 应用程序中本地调用,或通过在 Azure 的 Web 角色、辅助角色或网站中运行的代码调用。

获取 Azure 客户端库

通过 Composer 安装

  1. 安装 Git请注意,在 Windows 上,你还必须向 PATH 环境变量添加 Git 可执行文件。

  2. 在您的项目的根目录中创建一个名为 composer.json 的文件并向其添加以下代码:

    {
        "repositories": [
            {
                "type": "pear",
                "url": "http://pear.php.net"
            }
        ],
        "require": {
            "pear-pear.php.net/mail_mime" : "*",
            "pear-pear.php.net/http_request2" : "*",
            "pear-pear.php.net/mail_mimedecode" : "*",
            "microsoft/windowsazure": "*"
        }
    }
    
  3. composer.phar 下载到你的项目根目录中。

  4. 打开命令提示符并在项目根目录中执行以下命令

    php composer.phar install
    

手动安装

若要手动下载并安装用于 Azure 的 PHP 客户端库,请执行以下步骤:

Note

用于 Azure 的 PHP 客户端库依赖于 HTTP_Request2Mail_mimeMail_mimeDecode PEAR 包。若要处理这些依赖关系,建议使用 PEAR 包管理器安装这些包

  1. 下载包含 GitHub 中的库的 .zip 存档。或者,复制现有存储库并将其克隆到您的本地计算机。后一种选择需要一个 GitHub 帐户并要求已在本地安装 Git。

  2. 将已下载存档的 WindowsAzure 目录复制到你的应用程序目录结构中。

有关安装用于 Azure 的 PHP 客户端库的详细信息(包括安装为 PEAR 包的信息),请参阅下载 Azure SDK for PHP

配置应用程序以使用服务总线

若要使用服务总线队列 API,请执行以下操作:

  1. 使用 require_once 语句引用 autoloader 文件。
  2. 引用所用的任意类。

下面的示例演示了如何包括 autoloader 文件并引用 ServicesBuilder 类。

Note

本示例(以及本文中的其他示例)假定你已通过 Composer 安装了用于 Azure 的 PHP 客户端库。 如果已手动安装这些库或将其作为 PEAR 包安装,则必须引用 WindowsAzure.php autoloader 文件。

require_once 'vendor/autoload.php';
use WindowsAzure\Common\ServicesBuilder;

在以下示例中, require_once 语句将始终显示,但只会引用执行该示例所需的类。

设置服务总线连接

若要实例化服务总线客户端,必须先设置采用以下格式的有效连接字符串:

Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]

其中,Endpoint 的格式通常为 [yourNamespace].servicebus.chinacloudapi.cn

若要创建任何 Azure 服务客户端,必须使用 ServicesBuilder 类。可执行以下操作:

  • 将连接字符串直接传递给它。
  • 使用 CloudConfigurationManager (CCM) 检查多个外部源以获取连接字符串:
    • 默认情况下,它附带了对一个外部源的支持 - 环境变量
    • 可通过扩展 ConnectionStringSource 类来添加新源

在此处列出的示例中,将直接传递连接字符串。

require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;

$connectionString = "Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]";

    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

创建队列

可以通过 ServiceBusRestProxy 类对服务总线队列执行管理操作。 ServiceBusRestProxy 对象是通过 ServicesBuilder::createServiceBusService 工厂方法构造的,包含一个适当的连接字符串,该字符串封装了令牌权限以对它进行管理。

以下示例演示了如何实例化 ServiceBusRestProxy 并调用 ServiceBusRestProxy->createQueue 以在 MySBNamespace 服务命名空间内创建名为 myqueue 的队列:

require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;
    use WindowsAzure\Common\ServiceException;
    use WindowsAzure\ServiceBus\Models\QueueInfo;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {
        $queueInfo = new QueueInfo("myqueue");

        // Create queue.
        $serviceBusRestProxy->createQueue($queueInfo);
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here: 
        // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

Note

可以对 ServiceBusRestProxy 对象使用 listQueues 方法,以检查具有指定名称的队列是否已位于命名空间中。

向队列发送消息

要将消息发送到服务总线队列,应用程序应调用 ServiceBusRestProxy->sendQueueMessage 方法。 下面的代码演示了如何将消息发送到先前在 MySBNamespace 服务命名空间内创建的 myqueue 队列。

require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;
    use WindowsAzure\Common\ServiceException;
    use WindowsAzure\ServiceBus\Models\BrokeredMessage;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {
        // Create message.
        $message = new BrokeredMessage();
        $message->setBody("my message");

        // Send message.
        $serviceBusRestProxy->sendQueueMessage("myqueue", $message);
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here: 
        // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

发送到服务总线(以及从服务总线收到)的消息是 BrokeredMessage 类的实例。 BrokeredMessage 对象包含一组标准方法、用来保存特定于应用程序的自定义属性的属性,以及大量的任意应用程序数据。

在标准层,服务总线队列支持的最大消息大小为 256 KB。 标头最大为 64 KB,其中包括标准和自定义应用程序属性。 一个队列中包含的消息数量不受限制,但消息的总大小受限制。 队列大小的上限为 5 GB。

从队列接收消息

从队列接收消息的最佳方法是使用 ServiceBusRestProxy->receiveQueueMessage 方法。 可通过两种不同的模式接收消息:ReceiveAndDeletePeekLockPeekLock 是默认设置。

使用 ReceiveAndDelete 模式时,接收是一项单步操作,即当服务总线接收到队列中某条消息的读取请求时,它会将该消息标记为“已使用”并将其返回给应用程序。 ReceiveAndDelete 模式是最简单的模式,最适合在发生故障时应用程序允许不处理消息的情况。 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。 由于服务总线会将消息标记为“已使用”,因此当应用程序重启并重新开始使用消息时,它会遗漏在发生崩溃前使用的消息。

在默认的 PeekLock 模式下,接收消息会变成一个双阶段操作,这将能够支持不能允许丢失消息的应用程序。 当 Service Bus 收到请求时,它会找到要使用的下一个消息,将其锁定以防其他使用方接收它,并将该消息返回给应用程序。 应用程序完成消息处理(或可靠地存储消息以供日后处理)后,它会将收到的消息传递到 ServiceBusRestProxy->deleteMessage,从而完成接收过程的第二阶段。 当服务总线发现 deleteMessage 调用时,它会将消息标记为“已使用”并将其从队列中删除。

以下示例演示了如何使用 PeekLock 模式(默认模式)接收和处理消息。

require_once 'vendor/autoload.php';

    use WindowsAzure\Common\ServicesBuilder;
    use WindowsAzure\Common\ServiceException;
    use WindowsAzure\ServiceBus\Models\ReceiveMessageOptions;

    // Create Service Bus REST proxy.
    $serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

    try {
        // Set the receive mode to PeekLock (default is ReceiveAndDelete).
        $options = new ReceiveMessageOptions();
        $options->setPeekLock();

        // Receive message.
        $message = $serviceBusRestProxy->receiveQueueMessage("myqueue", $options);
        echo "Body: ".$message->getBody()."<br />";
        echo "MessageID: ".$message->getMessageId()."<br />";

        /*---------------------------
            Process message here.
        ----------------------------*/

        // Delete message. Not necessary if peek lock is not set.
        echo "Message deleted.<br />";
        $serviceBusRestProxy->deleteMessage($message);
    }
    catch(ServiceException $e){
        // Handle exception based on error codes and messages.
        // Error codes and messages are here:
        // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
        $code = $e->getCode();
        $error_message = $e->getMessage();
        echo $code.": ".$error_message."<br />";
    }

如何:处理应用程序崩溃和不可读消息

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。 如果接收方应用程序出于某种原因无法处理消息,它可以对收到的消息调用 unlockMessage 方法(而不是 deleteMessage 方法)。 这会导致 Service Bus 解锁队列中的消息并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。

还存在与队列中已锁定的消息相关联的超时,并且如果应用程序未能在锁定超时到期之前处理消息(例如,如果应用程序崩溃),服务总线则将自动解锁该消息,使它可以再次被接收。

如果在处理消息之后,发出 deleteMessage 请求之前,应用程序发生崩溃,则在应用程序重启时会将该消息重新传送给它。 此情况通常称作至少处理一次,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送。 如果某个场景不允许重复处理,则建议向应用程序添加其他逻辑来处理重复消息传送。 这通常可以通过使用消息的 getMessageId 方法来实现,消息在多次传送尝试中保持不变。

后续步骤

现在,已了解服务总线队列的基础知识,请参阅队列、主题和订阅 以获取更多信息。

有关详细信息,另请访问 PHP 开发人员中心