如何通过 PHP 使用队列存储

Tip

尝试 Azure 存储资源管理器

Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。

概述

本指南演示如何使用 Azure 队列存储服务执行常见方案。 这些示例是通过适用于 PHP 的 Azure 存储客户端库中的类编写的。 介绍的方案包括插入、扫视、获取和删除队列消息以及创建和删除队列。

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。

队列存储的常见用途包括:

  • 创建积压工作以进行异步处理
  • 将消息从 Azure Web 角色传递到 Azure 辅助角色

队列服务概念

队列服务包含以下组件:

队列 1

  • URL 格式: 可使用以下 URL 格式对队列进行寻址:
    http://<storage account>.queue.core.chinacloudapi.cn/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.chinacloudapi.cn/images-to-download

  • 存储帐户: 对 Azure 存储服务的所有访问都要通过存储帐户来完成。 有关存储帐户容量的详细信息,请参阅 Azure 存储可伸缩性和性能目标

  • 队列: 一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的详细信息,请参阅 命名队列和元数据
  • 消息: 一条消息(不管采用何种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的存储资源提供程序客户端库创建 Azure 存储帐户。

如果暂时不想创建存储帐户,也可以使用 Azure 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅 使用 Azure 存储模拟器进行开发和测试

创建 PHP 应用程序

创建用于访问 Azure 队列存储的 PHP 应用程序的唯一要求是从代码中引用适用于 PHP 的 Azure 存储客户端库中的类。 可以使用任何开发工具(包括“记事本”)创建应用程序。

在本指南中,将使用队列存储服务功能,这些功能可在 PHP 应用程序中本地调用,或通过在 Azure 的 Web 角色、辅助角色或网站中运行的代码调用。

获取 Azure 客户端库

通过 Composer 安装

  1. 在项目的根目录中创建一个名为 composer.json 的文件并向其添加以下代码:

    {
      "require": {
        "microsoft/azure-storage-queue": "*"
      }
    }
    
  2. composer.phar 下载到项目根目录中。
  3. 打开命令提示符并在项目根目录中执行以下命令

    php composer.phar install
    

或者转到 GitHub 上的 Azure 存储 PHP 客户端库,然后克隆源代码。

配置应用程序以访问队列存储

若要使用 Azure 队列存储 API,需执行以下操作:

  1. 使用 require_once 语句引用 autoloader 文件。
  2. 引用可使用的所有类。

以下示例演示如何添加 autoloader 文件和引用 QueueRestProxy 类。

require_once 'vendor/autoload.php';
use MicrosoftAzure\Storage\Queue\QueueRestProxy;

以下示例中,require_once 语句将始终显示,但只会引用执行该示例所需的类。

设置 Azure 存储连接

若要实例化 Azure 队列存储客户端,首先必须拥有有效的连接字符串。 队列服务连接字符串的格式如下。

若要访问实时服务:

DefaultEndpointsProtocol=[http|https];AccountName=[yourAccount];AccountKey=[yourKey];EndpointSuffix=core.chinacloudapi.cn

若要访问模拟器存储:

UseDevelopmentStorage=true

若要创建 Azure 队列服务客户端,则需要使用 QueueRestProxy 类。 可使用以下方法之一:

  • 将连接字符串直接传递给它。
  • 在 Web 应用中使用环境变量来存储连接字符串。 要配置连接字符串,请参阅 Azure Web 应用配置设置文档。 在此处列出的示例中,将直接传递连接字符串。
require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";
$queueClient = QueueRestProxy::createQueueService($connectionString);

创建队列

QueueRestProxy 对象允许使用 createQueue 方法创建队列。 创建队列时,可以在该队列上设置选项,但此操作不是必需的。 (下面的示例演示了如何在队列上设置元数据。)

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\CreateQueueOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// OPTIONAL: Set queue metadata.
$createQueueOptions = new CreateQueueOptions();
$createQueueOptions->addMetaData("key1", "value1");
$createQueueOptions->addMetaData("key2", "value2");

try    {
    // Create queue.
    $queueClient->createQueue("myqueue", $createQueueOptions);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

Note

不应依赖元数据密钥区分大小写的性质。 以小写形式从服务中读取所有密钥。

向队列添加消息

若要将消息添加到队列,请使用 QueueRestProxy->createMessage。 此方法接受队列名称、消息文本和消息选项(这些都是可选的)。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\CreateMessageOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Create message.
    $queueClient->createMessage("myqueue", "Hello World!");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

扫视下一条消息

通过调用 QueueRestProxy->peekMessages,可以扫视队列前面的消息,而不会从队列中将其删除。 默认情况下,peekMessage 方法返回单条消息,但可以使用 PeekMessagesOptions->setNumberOfMessages 方法更改该值。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\PeekMessagesOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// OPTIONAL: Set peek message options.
$message_options = new PeekMessagesOptions();
$message_options->setNumberOfMessages(1); // Default value is 1.

try    {
    $peekMessagesResult = $queueClient->peekMessages("myqueue", $message_options);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

$messages = $peekMessagesResult->getQueueMessages();

// View messages.
$messageCount = count($messages);
if($messageCount <= 0){
    echo "There are no messages.<br />";
}
else{
    foreach($messages as $message)    {
        echo "Peeked message:<br />";
        echo "Message Id: ".$message->getMessageId()."<br />";
        echo "Date: ".date_format($message->getInsertionDate(), 'Y-m-d')."<br />";
        echo "Message text: ".$message->getMessageText()."<br /><br />";
    }
}

取消对下一条消息的排队

代码分两步从队列中删除消息。 首先,调用 QueueRestProxy->listMessages,这将使消息对从队列中读取的任何其他代码不可见。 默认情况下,此消息持续 30 秒不可见。 (如果在此时段内未删除该消息,它会在队列上再次可见。)若要从队列中删除消息,必须调用 QueueRestProxy->deleteMessage。 此删除消息的两步过程可确保当代码因硬件或软件故障而无法处理消息时,其他代码实例可以获取同一消息并重试。 代码处理消息后会立即调用 deleteMessage

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// Get message.
$listMessagesResult = $queueClient->listMessages("myqueue");
$messages = $listMessagesResult->getQueueMessages();
$message = $messages[0];

/* ---------------------
    Process message.
   --------------------- */

// Get message ID and pop receipt.
$messageId = $message->getMessageId();
$popReceipt = $message->getPopReceipt();

try    {
    // Delete message.
    $queueClient->deleteMessage("myqueue", $messageId, $popReceipt);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

更改已排队消息的内容

可以调用 QueueRestProxy->updateMessage 来更改队列中已就位消息的内容。 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作的状态,并额外为客户端提供一分钟的时间来继续处理消息。 可使用此方法跟踪队列消息上的多步骤工作流,即使处理步骤因硬件或软件故障而失败,也无需从头开始操作。 通常同时保留重试计数,当消息重试次数超过 n 时再删除该消息。 这可避免每次处理某条消息时都触发应用程序错误。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Get message.
$listMessagesResult = $queueClient->listMessages("myqueue");
$messages = $listMessagesResult->getQueueMessages();
$message = $messages[0];

// Define new message properties.
$new_message_text = "New message text.";
$new_visibility_timeout = 5; // Measured in seconds.

// Get message ID and pop receipt.
$messageId = $message->getMessageId();
$popReceipt = $message->getPopReceipt();

try    {
    // Update message.
    $queueClient->updateMessage("myqueue",
                                $messageId,
                                $popReceipt,
                                $new_message_text,
                                $new_visibility_timeout);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

用于从队列中删除消息的其他选项

可通过两种方式自定义队列中的消息检索。 首先,可获取一批消息(最多 32 条)。 其次,可设置更长或更短的可见超时时间,允许代码使用更长或更短的时间来彻底处理每条消息。 以下代码示例使用 getMessages 方法在一次调用中获取 16 条消息。 然后,它会使用 for 循环处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\ListMessagesOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// Set list message options.
$message_options = new ListMessagesOptions();
$message_options->setVisibilityTimeoutInSeconds(300);
$message_options->setNumberOfMessages(16);

// Get messages.
try{
    $listMessagesResult = $queueClient->listMessages("myqueue",
                                                     $message_options);
    $messages = $listMessagesResult->getQueueMessages();

    foreach($messages as $message){

        /* ---------------------
            Process message.
        --------------------- */

        // Get message Id and pop receipt.
        $messageId = $message->getMessageId();
        $popReceipt = $message->getPopReceipt();

        // Delete message.
        $queueClient->deleteMessage("myqueue", $messageId, $popReceipt);
    }
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

获取队列长度

可以获取队列中消息的估计数。 QueueRestProxy->getQueueMetadata 方法要求队列服务返回有关队列的元数据。 对返回的对象调用 getApproximateMessageCount 方法将提供队列中消息的计数。 此计数仅为近似值,因为只能在队列服务响应的请求后添加或删除消息。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Get queue metadata.
    $queue_metadata = $queueClient->getQueueMetadata("myqueue");
    $approx_msg_count = $queue_metadata->getApproximateMessageCount();
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

echo $approx_msg_count;

删除队列

若要删除队列及其包含的所有消息,请调用 QueueRestProxy->deleteQueue 方法。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>;EndpointSuffix=core.chinacloudapi.cn";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Delete queue.
    $queueClient->deleteQueue("myqueue");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // http://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

后续步骤

既已了解有关 Azure 队列存储的基础知识,可单击以下链接了解更复杂的存储任务。

有关详细信息,另请参阅 PHP 开发人员中心