快速入门:如何通过 PHP 使用服务总线主题和订阅Quickstart: How to use Service Bus topics and subscriptions with PHP

本文说明如何使用服务总线主题和订阅。This article shows you how to use Service Bus topics and subscriptions. 示例采用 PHP 编写并使用 Azure SDK for PHPThe samples are written in PHP and use the Azure SDK for PHP. 涉及的方案包括:The scenarios covered include:

  • 创建主题和订阅Creating topics and subscriptions
  • 创建订阅筛选器Creating subscription filters
  • 将消息发送到主题Sending messages to a topic
  • 从订阅接收消息Receiving messages from a subscription
  • 删除主题和订阅Deleting topics and subscriptions

必备条件Prerequisites

  1. Azure 订阅。An Azure subscription. 若要完成本教程,需要一个 Azure 帐户。To complete this tutorial, you need an Azure account. 可以激活 Visual Studio 或 MSDN 订阅者权益或者注册试用帐户You can activate your Visual Studio or MSDN subscriber benefits or sign up for a trail account.

  2. 按照快速入门:使用 Azure 门户创建一个服务总线主题和对此主题的订阅来创建服务总线命名空间并获取连接字符串Follow steps in the Quickstart: Use the Azure portal to create a Service Bus topic and subscriptions to the topic to create a Service Bus namespace and get the connection string.

    备注

    在本快速入门中,你将使用 PHP 创建一个主题和对此主题的订阅You will create a topic and a subscription to the topic by using PHP in this quickstart.

创建 PHP 应用程序Create a PHP application

创建访问 Azure Blob 服务的 PHP 应用程序的唯一要求是从代码中引用用于 PHP 的 Azure SDK 中的类。The only requirement for creating a PHP application that accesses the Azure Blob service is to reference classes in the Azure SDK for PHP from within your code. 可以使用任何开发工具或记事本创建应用程序。You can use any development tools to create your application, or Notepad.

备注

PHP 安装还必须已安装并启用 OpenSSL 扩展Your PHP installation must also have the OpenSSL extension installed and enabled.

本文说明如何使用服务功能,这些功能可在 PHP 应用程序中本地调用,或通过在 Azure 的 Web 角色、辅助角色或网站中运行的代码调用。This article describes how to use service features that can be called within a PHP application locally, or in code running within an Azure web role, worker role, or website.

获取 Azure 客户端库Get the Azure client libraries

通过 Composer 安装Install via Composer

  1. 在项目的根目录中创建一个名为 composer.json 的文件并向其添加以下代码:Create a file named composer.json in the root of your project and add the following code to it:

    {
      "require": {
        "microsoft/windowsazure": "*"
      }
    }
    
  2. [composer.phar][composer-phar] 下载到项目根目录中。Download [composer.phar][composer-phar] in your project root.

  3. 打开命令提示符并在项目根目录中执行以下命令Open a command prompt and execute the following command in your project root

    php composer.phar install
    

配置应用程序以使用服务总线Configure your application to use Service Bus

若要使用服务总线 API:To use the Service Bus APIs:

  1. 使用 require_once 语句引用 autoloader 文件。Reference the autoloader file using the require_once statement.
  2. 引用所用的任意类。Reference any classes you might use.

以下示例演示如何包含 autoloader 文件并引用 ServiceBusService 类。The following example shows how to include the autoloader file and reference the ServiceBusService class.

备注

本示例(以及本文中的其他示例)假定你已通过 Composer 安装了用于 Azure 的 PHP 客户端库。This example (and other examples in this article) assumes you have installed the PHP Client Libraries for Azure via Composer. 如果已手动安装这些库或将其作为 PEAR 包安装,则必须引用 WindowsAzure.php autoloader 文件。If you installed the libraries manually or as a PEAR package, you must reference the WindowsAzure.php autoloader file.

require_once 'vendor/autoload.php';
use WindowsAzure\Common\ServicesBuilder;

在下面的示例中,require_once 语句始终显示,但只引用了执行示例所必需的类。In the following examples, the require_once statement is always shown, but only the classes necessary for the example to execute are referenced.

设置服务总线连接Set up a Service Bus connection

若要实例化服务总线客户端,必须先设置采用以下格式的有效连接字符串:To instantiate a Service Bus client, you must first have a valid connection string in this format:

Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]

其中,Endpoint 的格式通常为 https://[yourNamespace].servicebus.chinacloudapi.cnWhere Endpoint is typically of the format https://[yourNamespace].servicebus.chinacloudapi.cn.

若要创建任何 Azure 服务客户端,必须使用 ServicesBuilder 类。To create any Azure service client, you must use the ServicesBuilder class. 方法:You can:

  • 将连接字符串直接传递给它。Pass the connection string directly to it.
  • 使用 CloudConfigurationManager (CCM) 检查多个外部源以获取连接字符串: Use the CloudConfigurationManager (CCM) to check multiple external sources for the connection string:
    • 默认情况下,它附带了对一个外部源的支持 - 环境变量。By default it comes with support for one external source - environmental variables.
    • 可通过扩展 ConnectionStringSource 类来添加新源。You can add new sources by extending the ConnectionStringSource class.

在此处列出的示例中,将直接传递连接字符串。For the examples outlined here, the connection string is passed directly.

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;

$connectionString = "Endpoint=[yourEndpoint];SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=[Primary Key]";

$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

创建主题Create a topic

可以通过 ServiceBusRestProxy 类对服务总线主题执行管理操作。You can perform management operations for Service Bus topics via the ServiceBusRestProxy class. ServiceBusRestProxy 对象是通过 ServicesBuilder::createServiceBusService 工厂方法构造的,包含一个适当的连接字符串,该字符串封装了令牌权限以对它进行管理。A ServiceBusRestProxy object is constructed via the ServicesBuilder::createServiceBusService factory method with an appropriate connection string that encapsulates the token permissions to manage it.

下列示例演示了如何实例化 ServiceBusRestProxy 并调用 ServiceBusRestProxy->createTopic 以在 MySBNamespace 命名空间中创建名为 mytopic 的主题:The following example shows how to instantiate a ServiceBusRestProxy and call ServiceBusRestProxy->createTopic to create a topic named mytopic within a MySBNamespace namespace:

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\TopicInfo;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Create topic.
    $topicInfo = new TopicInfo("mytopic");
    $serviceBusRestProxy->createTopic($topicInfo);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

备注

可使用 ServiceBusRestProxy 对象上的 listTopics 方法检查服务命名空间中是否已经存在一个具有指定名称的主题。You can use the listTopics method on ServiceBusRestProxy objects to check if a topic with a specified name already exists within a service namespace.

创建订阅Create a subscription

主题订阅也是使用 ServiceBusRestProxy->createSubscription 方法创建的。Topic subscriptions are also created with the ServiceBusRestProxy->createSubscription method. 订阅已命名,并且具有一个限制传递到订阅的虚拟队列的消息集的可选筛选器。Subscriptions are named and can have an optional filter that restricts the set of messages passed to the subscription's virtual queue.

创建具有默认 (MatchAll) 筛选器的订阅Create a subscription with the default (MatchAll) filter

如果在创建新订阅时未指定任何筛选器,则将使用默认的 MatchAll 筛选器。If no filter is specified when a new subscription is created, the MatchAll filter (default) is used. 使用 MatchAll 筛选器时,发布到主题的所有消息都会置于订阅的虚拟队列中。When the MatchAll filter is used, all messages published to the topic are placed in the subscription's virtual queue. 以下示例创建名为 mysubscription 的订阅,并使用默认的 MatchAll 筛选器。The following example creates a subscription named mysubscription and uses the default MatchAll filter.

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\SubscriptionInfo;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Create subscription.
    $subscriptionInfo = new SubscriptionInfo("mysubscription");
    $serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

创建具有筛选器的订阅Create subscriptions with filters

还可以设置筛选器,以指定发送到主题的哪些消息应该在特定主题订阅中显示。You can also set up filters that enable you to specify which messages sent to a topic should appear within a specific topic subscription. 订阅支持的最灵活的一种筛选器是 SqlFilter,它实现了一部分 SQL92 功能。The most flexible type of filter supported by subscriptions is the SqlFilter, which implements a subset of SQL92. SQL 筛选器对发布到主题的消息的属性进行操作。SQL filters operate on the properties of the messages that are published to the topic. 有关 SqlFilter 的详细信息,请参阅 SqlFilter.SqlExpression 属性For more information about SqlFilters, see SqlFilter.SqlExpression Property.

备注

有关订阅的每个规则单独处理传入消息,并将其结果消息添加到订阅。Each rule on a subscription processes incoming messages independently, adding their result messages to the subscription. 此外,每个新订阅具有一个默认 Rule 对象,该对象包含将主题中的所有消息添加到订阅的筛选器。In addition, each new subscription has a default Rule object with a filter that adds all messages from the topic to the subscription. 要仅接收与筛选器匹配的消息,必须删除默认规则。To receive only messages matching your filter, you must remove the default rule. 可以使用 ServiceBusRestProxy->deleteRule 方法删除默认规则。You can remove the default rule by using the ServiceBusRestProxy->deleteRule method.

以下示例创建了一个名为 HighMessages 的订阅,该订阅带有只选择自定义 MessageNumber 属性大于 3 的消息的 SqlFilter。 The following example creates a subscription named HighMessages with a SqlFilter that only selects messages that have a custom MessageNumber property greater than 3. 有关向消息添加自定义属性的信息,请参阅将消息发送到主题See Send messages to a topic for information about adding custom properties to messages.

$subscriptionInfo = new SubscriptionInfo("HighMessages");
$serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);

$serviceBusRestProxy->deleteRule("mytopic", "HighMessages", '$Default');

$ruleInfo = new RuleInfo("HighMessagesRule");
$ruleInfo->withSqlFilter("MessageNumber > 3");
$ruleResult = $serviceBusRestProxy->createRule("mytopic", "HighMessages", $ruleInfo);

此代码需要使用其他命名空间:WindowsAzure\ServiceBus\Models\SubscriptionInfoThis code requires the use of an additional namespace: WindowsAzure\ServiceBus\Models\SubscriptionInfo.

同样,以下示例创建一个名为 LowMessages 的订阅,其 SqlFilter 只选择 MessageNumber 属性小于或等于 3 的消息。Similarly, the following example creates a subscription named LowMessages with a SqlFilter that only selects messages that have a MessageNumber property less than or equal to 3.

$subscriptionInfo = new SubscriptionInfo("LowMessages");
$serviceBusRestProxy->createSubscription("mytopic", $subscriptionInfo);

$serviceBusRestProxy->deleteRule("mytopic", "LowMessages", '$Default');

$ruleInfo = new RuleInfo("LowMessagesRule");
$ruleInfo->withSqlFilter("MessageNumber <= 3");
$ruleResult = $serviceBusRestProxy->createRule("mytopic", "LowMessages", $ruleInfo);

现在,消息发送到 mytopic 主题后总是会传送给订阅了 mysubscription 订阅的接收方,并且会选择性地传送给订阅了 HighMessagesLowMessages 订阅的接收方(具体取决于消息内容)。Now, when a message is sent to the mytopic topic, it is always delivered to receivers subscribed to the mysubscription subscription, and selectively delivered to receivers subscribed to the HighMessages and LowMessages subscriptions (depending upon the message content).

将消息发送到主题Send messages to a topic

要将消息发送到服务总线主题,应用程序应调用 ServiceBusRestProxy->sendTopicMessage 方法。To send a message to a Service Bus topic, your application calls the ServiceBusRestProxy->sendTopicMessage method. 下面的代码演示了如何将消息发送到先前在 MySBNamespace 服务命名空间创建的 mytopic 主题。The following code shows how to send a message to the mytopic topic previously created within the MySBNamespace service namespace.

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\BrokeredMessage;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Create message.
    $message = new BrokeredMessage();
    $message->setBody("my message");

    // Send message.
    $serviceBusRestProxy->sendTopicMessage("mytopic", $message);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

发送到服务总线主题的消息是 BrokeredMessage 类的实例。Messages sent to Service Bus topics are instances of the BrokeredMessage class. BrokeredMessage 对象包含一组标准属性和方法以及可用来保存自定义应用程序特定属性的属性。BrokeredMessage objects have a set of standard properties and methods, as well as properties that can be used to hold custom application-specific properties. 以下示例演示如何向以前创建的 mytopic 主题发送五条测试消息。The following example shows how to send five test messages to the mytopic topic previously created. setProperty 方法用于将自定义属性 (MessageNumber) 添加到每条消息。The setProperty method is used to add a custom property (MessageNumber) to each message. MessageNumber 属性值在每条消息中都有所不同(你可以使用此值确定哪些订阅接收它,如创建订阅部分中所示):The MessageNumber property value varies on each message (you can use this value to determine which subscriptions receive it, as shown in the Create a subscription section):

for($i = 0; $i < 5; $i++){
    // Create message.
    $message = new BrokeredMessage();
    $message->setBody("my message ".$i);

    // Set custom property.
    $message->setProperty("MessageNumber", $i);

    // Send message.
    $serviceBusRestProxy->sendTopicMessage("mytopic", $message);
}

服务总线主题在标准层中支持的最大消息大小为 256 KB,在高级层中则为 1 MB。Service Bus topics support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 标头最大大小为 64 KB,其中包括标准和自定义应用程序属性。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 一个主题中包含的消息数量不受限制,但消息的总大小受限制。There is no limit on the number of messages held in a topic but there is a cap on the total size of the messages held by a topic. 此主题大小上限为 5 GB。This upper limit on topic size is 5 GB. 有关配额的详细信息,请参阅 服务总线配额For more information about quotas, see Service Bus quotas.

从订阅接收消息Receive messages from a subscription

从订阅接收消息的最佳方法是使用 ServiceBusRestProxy->receiveSubscriptionMessage 方法。The best way to receive messages from a subscription is to use a ServiceBusRestProxy->receiveSubscriptionMessage method. 可在两种不同的模式下接收消息:ReceiveAndDeletePeekLockMessages can be received in two different modes: ReceiveAndDelete and PeekLock. PeekLock 是默认设置。PeekLock is the default.

使用 ReceiveAndDelete 模式时,接收是一项单次操作,即,当服务总线收到订阅中某条消息的读取请求时,它会将该消息标记为“已使用”并将其返回给应用程序。When using the ReceiveAndDelete mode, receive is a single-shot operation; that is, when Service Bus receives a read request for a message in a subscription, it marks the message as being consumed and returns it to the application. ReceiveAndDelete * 模式是最简单的模型,最适合应用程序允许在出现故障时不处理消息的情形。ReceiveAndDelete * mode is the simplest model and works best for scenarios in which an application can tolerate not processing a message when a failure occurs. 为了理解这一点,可以考虑这样一种情形:使用方发出接收请求,但在处理该请求前发生了崩溃。To understand this, consider a scenario in which the consumer issues the receive request and then crashes before processing it. 由于服务总线已将消息标记为“已使用”,因此当应用程序重新启动并重新开始使用消息时,它会漏掉在发生崩溃前使用的消息。Because Service Bus has marked the message as being consumed, then when the application restarts and begins consuming messages again, it has missed the message that was consumed prior to the crash.

在默认 PeekLock 模式下,接收消息会变成一个两阶段操作,这能够支持不能允许丢失消息的应用程序。In the default PeekLock mode, receiving a message becomes a two stage operation, which makes it possible to support applications that cannot tolerate missing messages. 当 Service Bus 收到请求时,它会查找下一条要使用的消息,锁定该消息以防其他使用者接收,并将该消息返回到应用程序。When Service Bus receives a request, it finds the next message to be consumed, locks it to prevent other consumers receiving it, and then returns it to the application. 应用程序完成消息处理(或可靠地存储消息以供日后处理)后,它会将收到的消息传递到 ServiceBusRestProxy->deleteMessage,从而完成接收过程的第二阶段。After the application finishes processing the message (or stores it reliably for future processing), it completes the second stage of the receive process by passing the received message to ServiceBusRestProxy->deleteMessage. 发现 deleteMessage 调用时,服务总线会将消息标记为“已使用”,并将消息从队列中删除。When Service Bus sees the deleteMessage call, it marks the message as being consumed and remove it from the queue.

以下示例演示了如何使用 PeekLock 模式(默认模式)接收和处理消息。The following example shows how to receive and process a message using PeekLock mode (the default mode).

require_once 'vendor/autoload.php';

use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\ReceiveMessageOptions;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Set receive mode to PeekLock (default is ReceiveAndDelete)
    $options = new ReceiveMessageOptions();
    $options->setPeekLock();

    // Get message.
    $message = $serviceBusRestProxy->receiveSubscriptionMessage("mytopic", "mysubscription", $options);

    echo "Body: ".$message->getBody()."<br />";
    echo "MessageID: ".$message->getMessageId()."<br />";

    /*---------------------------
        Process message here.
    ----------------------------*/

    // Delete message. Not necessary if peek lock is not set.
    echo "Deleting message...<br />";
    $serviceBusRestProxy->deleteMessage($message);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

如何:处理应用程序崩溃和不可读消息How to: handle application crashes and unreadable messages

Service Bus 提供了相关功能来帮助你轻松地从应用程序错误或消息处理问题中恢复。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 如果接收方应用程序出于某种原因无法处理消息,它可以对收到的消息调用 unlockMessage 方法(而不是 deleteMessage 方法)。If a receiver application is unable to process the message for some reason, then it can call the unlockMessage method on the received message (instead of the deleteMessage method). 这会导致服务总线解锁队列中的消息,并使其能够重新被同一个正在使用的应用程序或其他正在使用的应用程序接收。It causes Service Bus to unlock the message within the queue and make it available to be received again, either by the same consuming application or by another consuming application.

队列中的锁定消息还有相关超时,如果应用程序无法在锁定超时期满前处理消息(例如,当应用程序发生故障时),服务总线会自动解除锁定消息,让它再次可供接收。There is also a timeout associated with a message locked within the queue, and if the application fails to process the message before the lock timeout expires (for example, if the application crashes), then Service Bus unlocks the message automatically and make it available to be received again.

如果应用程序在处理消息后、但在发出 deleteMessage 请求前发生故障,消息会重新传递给重启的应用程序。In the event that the application crashes after processing the message but before the deleteMessage request is issued, then the message is redelivered to the application when it restarts. 这种处理类型通常称作“至少处理一次”,即每条消息将至少被处理一次,但在某些情况下,同一消息可能会被重新传送 。This type of processing is often called at least once processing; that is, each message is processed at least once but in certain situations the same message may be redelivered. 如果方案无法容忍重复处理,则应用程序开发人员应向其应用程序添加更多逻辑以处理重复消息传送。If the scenario cannot tolerate duplicate processing, then application developers should add additional logic to applications to handle duplicate message delivery. 这通常可通过使用消息的 getMessageId 方法来实现,该方法在多次传送尝试中保持不变。It is often achieved using the getMessageId method of the message, which remains constant across delivery attempts.

删除主题和订阅Delete topics and subscriptions

若要删除某个主题或订阅,请分别使用 ServiceBusRestProxy->deleteTopicServiceBusRestProxy->deleteSubscripton 方法。To delete a topic or a subscription, use the ServiceBusRestProxy->deleteTopic or the ServiceBusRestProxy->deleteSubscripton methods, respectively. 删除某个主题也会删除向该主题注册的所有订阅。Deleting a topic also deletes any subscriptions that are registered with the topic.

以下示例演示了如何删除名为 mytopic 的主题及其注册的订阅。The following example shows how to delete a topic named mytopic and its registered subscriptions.

require_once 'vendor/autoload.php';

use WindowsAzure\ServiceBus\ServiceBusService;
use WindowsAzure\ServiceBus\ServiceBusSettings;
use WindowsAzure\Common\ServiceException;

// Create Service Bus REST proxy.
$serviceBusRestProxy = ServicesBuilder::getInstance()->createServiceBusService($connectionString);

try {
    // Delete topic.
    $serviceBusRestProxy->deleteTopic("mytopic");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here: 
    // https://docs.microsoft.com/rest/api/storageservices/Common-REST-API-Error-Codes
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

通过使用 deleteSubscription 方法可单独删除订阅:By using the deleteSubscription method, you can delete a subscription independently:

$serviceBusRestProxy->deleteSubscription("mytopic", "mysubscription");

备注

可以使用服务总线资源管理器管理服务总线资源。You can manage Service Bus resources with Service Bus Explorer. 服务总线资源管理器允许用户连接到服务总线命名空间并以一种简单的方式管理消息传送实体。The Service Bus Explorer allows users to connect to a Service Bus namespace and administer messaging entities in an easy manner. 该工具提供高级功能,如导入/导出功能或用于对主题、队列、订阅、中继服务、通知中心和事件中心进行测试的功能。The tool provides advanced features like import/export functionality or the ability to test topic, queues, subscriptions, relay services, notification hubs and events hubs.

后续步骤Next steps

有关详细信息,请参阅队列、主题和订阅For more information, see Queues, topics, and subscriptions.