向 Azure 服务总线主题发送消息,并从该主题的订阅接收消息 (Python)Send messages to an Azure Service Bus topic and receive messages from subscriptions to the topic (Python)

本文介绍如何使用 Python 向服务总线主题发送消息,并从该主题的订阅接收消息。This article shows you how to use Python to send messages a Service Bus topic and receive messages from a subscription to the topic.

先决条件Prerequisites

将消息发送到主题Send messages to a topic

  1. 添加以下 import 语句。Add the following import statement.

    from azure.servicebus import ServiceBusClient, ServiceBusMessage
    
  2. 添加以下常量。Add the following constants.

    CONNECTION_STR = "<NAMESPACE CONNECTION STRING>"
    TOPIC_NAME = "<TOPIC NAME>"
    SUBSCRIPTION_NAME = "<SUBSCRIPTION NAME>"
    

    重要

    • <NAMESPACE CONNECTION STRING> 替换为命名空间的连接字符串。Replace <NAMESPACE CONNECTION STRING> with the connection string for your namespace.
    • <TOPIC NAME> 替换为主题名称。Replace <TOPIC NAME> with the name of the topic.
    • <SUBSCRIPTION NAME> 替换为主题的订阅名称。Replace <SUBSCRIPTION NAME> with the name of the subscription to the topic.
  3. 添加一个方法以发送一条消息。Add a method to send a single message.

    def send_single_message(sender):
        # create a Service Bus message
        message = ServiceBusMessage("Single Message")
        # send the message to the topic
        sender.send_messages(message)
        print("Sent a single message")
    

    发送方是一个对象,充当你创建的主题的客户端。The sender is a object that acts as a client for the topic you created. 稍后将创建它,并将其作为参数发送到此函数。You'll create it later and send as an argument to this function.

  4. 添加一个方法以发送一列消息。Add a method to send a list of messages.

    def send_a_list_of_messages(sender):
        # create a list of messages
        messages = [ServiceBusMessage("Message in list") for _ in range(5)]
        # send the list of messages to the topic
        sender.send_messages(messages)
        print("Sent a list of 5 messages")
    
  5. 添加一个方法以发送一批消息。Add a method to send a batch of messages.

    def send_batch_message(sender):
        # create a batch of messages
        batch_message = sender.create_message_batch()
        for _ in range(10):
            try:
                # add a message to the batch
                batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
            except ValueError:
                # ServiceBusMessageBatch object reaches max_size.
                # New ServiceBusMessageBatch object can be created here to send more data.
                break
        # send the batch of messages to the topic
        sender.send_messages(batch_message)
        print("Sent a batch of 10 messages")
    
  6. 创建一个服务总线客户端,然后创建一个主题发送方对象来发送消息。Create a Service Bus client and then a topic sender object to send messages.

    # create a Service Bus client using the connection string
    servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
    with servicebus_client:
        # get a Topic Sender object to send messages to the topic
        sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
        with sender:
            # send one message        
            send_single_message(sender)
            # send a list of messages
            send_a_list_of_messages(sender)
            # send a batch of messages
            send_batch_message(sender)
    
    print("Done sending messages")
    print("-----------------------")
    

从订阅接收消息Receive messages from a subscription

在 print 语句的后面添加以下代码。Add the following code after the print statement. 此代码将持续接收新消息,直到在 5 (max_wait_time) 秒内未收到任何新消息。This code continually receives new messages until it doesn't receive any new messages for 5 (max_wait_time) seconds.

with servicebus_client:
    # get the Subscription Receiver object for the subscription    
    receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, subscription_name=SUBSCRIPTION_NAME, max_wait_time=5)
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            # complete the message so that the message is removed from the subscription
            receiver.complete_message(msg)

完整代码Full code

from azure.servicebus import ServiceBusClient, ServiceBusMessage

CONNECTION_STR = "<NAMESPACE CONNECTION STRING>"
TOPIC_NAME = "<TOPIC NAME>"
SUBSCRIPTION_NAME = "<SUBSCRIPTION NAME>"

def send_single_message(sender):
    message = ServiceBusMessage("Single Message")
    sender.send_messages(message)
    print("Sent a single message")

def send_a_list_of_messages(sender):
    messages = [ServiceBusMessage("Message in list") for _ in range(5)]
    sender.send_messages(messages)
    print("Sent a list of 5 messages")

def send_batch_message(sender):
    batch_message = sender.create_message_batch()
    for _ in range(10):
        try:
            batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
        except ValueError:
            # ServiceBusMessageBatch object reaches max_size.
            # New ServiceBusMessageBatch object can be created here to send more data.
            break
    sender.send_messages(batch_message)
    print("Sent a batch of 10 messages")

servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

with servicebus_client:
    sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
    with sender:
        send_single_message(sender)
        send_a_list_of_messages(sender)
        send_batch_message(sender)

print("Done sending messages")
print("-----------------------")

with servicebus_client:
    receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, subscription_name=SUBSCRIPTION_NAME, max_wait_time=5)
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            receiver.complete_message(msg)

运行应用Run the app

运行应用程序时,应显示以下输出:When you run the application, you should see the following output:

Sent a single message
Sent a list of 5 messages
Sent a batch of 10 messages
Done sending messages
-----------------------
Received: Single Message
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch

在 Azure 门户中,导航到你的服务总线命名空间。In the Azure portal, navigate to your Service Bus namespace. 在“概述”页上,验证传入和传出消息计数是否为 16 。On the Overview page, verify that the incoming and outgoing message counts are 16. 如果没有看到这些消息,请等待几分钟后再刷新页面。If you don't see the counts, refresh the page after waiting for a few minutes.

传入和传出消息计数

在底部窗格中选择主题,以查看主题的“服务总线主题”页。Select the topic in the bottom pane to see the Service Bus Topic page for your topic. 在此页上,应会在“消息”图表中看到三条传入消息和三条传出消息。On this page, you should see three incoming and three outgoing messages in the Messages chart.

传入和传出消息

在此页上,如果选择一个订阅,则将转到“服务总线订阅”页。On this page, if you select a subscription, you get to the Service Bus Subscription page. 可以在此页上查看活动消息计数、死信消息计数等。You can see the active message count, dead-letter message count, and more on this page. 在此示例中,所有消息均已接收,因此活动消息计数为零。In this example, all the messages have been received, so the active message count is zero.

活动消息计数

如果注释掉接收代码,则会看到活动消息计数为 16。If you comment out the receive code, you'll see the active message count as 16.

活动消息计数 - 无接收

后续步骤Next steps

请参阅以下文档和示例:See the following documentation and samples: