向 Azure 服务总线主题发送消息,并从该主题的订阅接收消息 (Python)Send messages to an Azure Service Bus topic and receive messages from subscriptions to the topic (Python)
本文介绍如何使用 Python 向服务总线主题发送消息,并从该主题的订阅接收消息。This article shows you how to use Python to send messages a Service Bus topic and receive messages from a subscription to the topic.
先决条件Prerequisites
Azure 订阅。An Azure subscription. 可以激活你的 Visual Studio 或 MSDN 订阅者权益或者注册试用版订阅。You can activate your Visual Studio or MSDN subscriber benefits or sign up for a Trial Subscription.
遵循快速入门:使用 Azure 门户创建一个服务总线主题和多个对该主题的订阅。Follow steps in the Quickstart: Use the Azure portal to create a Service Bus topic and subscriptions to the topic. 记下连接字符串、主题名称和订阅名称。Note down the connection string, topic name, and a subscription name. 本快速入门仅需使用一个订阅。You'll use only one subscription for this quickstart.
Python 2.7 或更高版本,且安装了 [Azure Python SDK][Azure Python package] 包。Python 2.7 or higher, with the [Azure Python SDK][Azure Python package] package installed. 有关详细信息,请参阅 Python 安装指南。For more information, see the Python Installation Guide.
将消息发送到主题Send messages to a topic
添加以下 import 语句。Add the following import statement.
from azure.servicebus import ServiceBusClient, ServiceBusMessage
添加以下常量。Add the following constants.
CONNECTION_STR = "<NAMESPACE CONNECTION STRING>" TOPIC_NAME = "<TOPIC NAME>" SUBSCRIPTION_NAME = "<SUBSCRIPTION NAME>"
重要
- 将
<NAMESPACE CONNECTION STRING>
替换为命名空间的连接字符串。Replace<NAMESPACE CONNECTION STRING>
with the connection string for your namespace. - 将
<TOPIC NAME>
替换为主题名称。Replace<TOPIC NAME>
with the name of the topic. - 将
<SUBSCRIPTION NAME>
替换为主题的订阅名称。Replace<SUBSCRIPTION NAME>
with the name of the subscription to the topic.
- 将
添加一个方法以发送一条消息。Add a method to send a single message.
def send_single_message(sender): # create a Service Bus message message = ServiceBusMessage("Single Message") # send the message to the topic sender.send_messages(message) print("Sent a single message")
发送方是一个对象,充当你创建的主题的客户端。The sender is a object that acts as a client for the topic you created. 稍后将创建它,并将其作为参数发送到此函数。You'll create it later and send as an argument to this function.
添加一个方法以发送一列消息。Add a method to send a list of messages.
def send_a_list_of_messages(sender): # create a list of messages messages = [ServiceBusMessage("Message in list") for _ in range(5)] # send the list of messages to the topic sender.send_messages(messages) print("Sent a list of 5 messages")
添加一个方法以发送一批消息。Add a method to send a batch of messages.
def send_batch_message(sender): # create a batch of messages batch_message = sender.create_message_batch() for _ in range(10): try: # add a message to the batch batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch")) except ValueError: # ServiceBusMessageBatch object reaches max_size. # New ServiceBusMessageBatch object can be created here to send more data. break # send the batch of messages to the topic sender.send_messages(batch_message) print("Sent a batch of 10 messages")
创建一个服务总线客户端,然后创建一个主题发送方对象来发送消息。Create a Service Bus client and then a topic sender object to send messages.
# create a Service Bus client using the connection string servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True) with servicebus_client: # get a Topic Sender object to send messages to the topic sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME) with sender: # send one message send_single_message(sender) # send a list of messages send_a_list_of_messages(sender) # send a batch of messages send_batch_message(sender) print("Done sending messages") print("-----------------------")
从订阅接收消息Receive messages from a subscription
在 print 语句的后面添加以下代码。Add the following code after the print statement. 此代码将持续接收新消息,直到在 5 (max_wait_time
) 秒内未收到任何新消息。This code continually receives new messages until it doesn't receive any new messages for 5 (max_wait_time
) seconds.
with servicebus_client:
# get the Subscription Receiver object for the subscription
receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, subscription_name=SUBSCRIPTION_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
print("Received: " + str(msg))
# complete the message so that the message is removed from the subscription
receiver.complete_message(msg)
完整代码Full code
from azure.servicebus import ServiceBusClient, ServiceBusMessage
CONNECTION_STR = "<NAMESPACE CONNECTION STRING>"
TOPIC_NAME = "<TOPIC NAME>"
SUBSCRIPTION_NAME = "<SUBSCRIPTION NAME>"
def send_single_message(sender):
message = ServiceBusMessage("Single Message")
sender.send_messages(message)
print("Sent a single message")
def send_a_list_of_messages(sender):
messages = [ServiceBusMessage("Message in list") for _ in range(5)]
sender.send_messages(messages)
print("Sent a list of 5 messages")
def send_batch_message(sender):
batch_message = sender.create_message_batch()
for _ in range(10):
try:
batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
except ValueError:
# ServiceBusMessageBatch object reaches max_size.
# New ServiceBusMessageBatch object can be created here to send more data.
break
sender.send_messages(batch_message)
print("Sent a batch of 10 messages")
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
with sender:
send_single_message(sender)
send_a_list_of_messages(sender)
send_batch_message(sender)
print("Done sending messages")
print("-----------------------")
with servicebus_client:
receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, subscription_name=SUBSCRIPTION_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
print("Received: " + str(msg))
receiver.complete_message(msg)
运行应用Run the app
运行应用程序时,应显示以下输出:When you run the application, you should see the following output:
Sent a single message
Sent a list of 5 messages
Sent a batch of 10 messages
Done sending messages
-----------------------
Received: Single Message
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
在 Azure 门户中,导航到你的服务总线命名空间。In the Azure portal, navigate to your Service Bus namespace. 在“概述”页上,验证传入和传出消息计数是否为 16 。On the Overview page, verify that the incoming and outgoing message counts are 16. 如果没有看到这些消息,请等待几分钟后再刷新页面。If you don't see the counts, refresh the page after waiting for a few minutes.
在底部窗格中选择主题,以查看主题的“服务总线主题”页。Select the topic in the bottom pane to see the Service Bus Topic page for your topic. 在此页上,应会在“消息”图表中看到三条传入消息和三条传出消息。On this page, you should see three incoming and three outgoing messages in the Messages chart.
在此页上,如果选择一个订阅,则将转到“服务总线订阅”页。On this page, if you select a subscription, you get to the Service Bus Subscription page. 可以在此页上查看活动消息计数、死信消息计数等。You can see the active message count, dead-letter message count, and more on this page. 在此示例中,所有消息均已接收,因此活动消息计数为零。In this example, all the messages have been received, so the active message count is zero.
如果注释掉接收代码,则会看到活动消息计数为 16。If you comment out the receive code, you'll see the active message count as 16.
后续步骤Next steps
请参阅以下文档和示例:See the following documentation and samples:
- 适用于 Python 的 Azure 服务总线客户端库Azure Service Bus client library for Python
- 示例。Samples.
- sync_samples 文件夹包含一些示例,这些示例演示如何以同步方式与服务总线交互。The sync_samples folder has samples that show you how to interact with Service Bus in a synchronous manner. 本快速入门就使用了此方法。In this quick start, you used this method.
- async_samples 文件夹包含一些示例,这些示例演示如何以异步方式与服务总线交互。The async_samples folder has samples that show you how to interact with Service Bus in an asynchronous manner.
- azure-servicebus 参考文档azure-servicebus reference documentation