Compartilhar via

将消息发送到Azure Service Bus主题,并从主题订阅接收消息(Python)

在本教程中,将完成以下步骤:

  1. 使用 Azure 门户创建Service Bus命名空间。
  2. 使用 Azure 门户创建Service Bus主题。
  3. 使用 Azure 门户为该主题创建 Service Bus 订阅。
  4. 编写Python应用程序以使用 azure-servicebus 包:
    • 将一组消息发送到主题。
    • 从订阅中接收这些消息。

注意

本快速入门指南提供有关在一个简单场景中,将一批消息发送到 Service Bus 主题,以及从该主题的订阅接收这些消息的分步说明。 可以在 GitHub 上的 Azure SDK for Python 存储库中找到 Azure Service Bus 的预生成 Python 示例。

先决条件

注意

本教程适用于可以用Python复制并运行的示例。 有关如何创建Python应用程序的说明,请参阅 创建Python应用程序并将其部署到 Azure 网站。 有关安装本教程中使用的包的详细信息,请参阅 Python 安装指南

在 Azure 门户中创建命名空间

若要开始在Azure中使用Service Bus消息传送实体,请创建名称在Azure中唯一的命名空间。 命名空间为应用程序中Service Bus资源(如队列和主题)提供范围容器。

创建命名空间:

  1. 登录到 Azure 门户

  2. 从左上角选择浮出控件菜单,然后转到“ 所有服务 ”页

  3. 在左侧导航栏上,选择 “集成”。

  4. 向下滚动到 Messaging services,将鼠标悬停在 Service Bus 上,然后选择 Create

    屏幕截图显示选择“创建资源”、“集成”,然后在菜单中选择“服务总线”。

  5. “创建命名空间”页的“基本信息”选项卡中,执行以下步骤:

    1. 对于Subscription,请选择要用来创建命名空间的 Azure 订阅。

    2. 对于资源组,选择现有资源组或创建新的资源组。

    3. 输入符合以下命名约定的 命名空间名称

      • 名称在Azure中必须唯一。 系统会立即检查该名称是否可用。
      • 名称长度最少为 6 个字符,最多为 50 个字符。
      • 名称只能包含字母、数字和连字符 -
      • 名称必须以字母开头,并且必须以字母或数字结尾。
      • 名称不以 -sb-mgmt 结尾。
    4. 对于 “位置”,请选择要托管命名空间的区域。

    5. 对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。

      如果选择 高级 层,可以为命名空间启用 异地复制 。 异地复制功能可确保命名空间的元数据和数据从主要区域持续复制到一个或多个次要区域。

      重要

      若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题和订阅。

      如果选择了“高级”定价层,请指定“消息传送单元”数。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为 消息传送单元。 高级命名空间至少具有一个消息传送单元。 可以为每个Service Bus高级命名空间选择 1、2、4、8 或 16 个消息传送单元。 有关详细信息,请参阅Service Bus高级消息传送层

    6. 在页面底部选择查看 + 创建

      显示“创建命名空间”页的屏幕截图。

    7. “审阅 + 创建 ”页上,查看设置,然后选择“ 创建”。

  6. 部署资源成功后,在部署页上选择 “转到资源 ”。

    显示部署成功页的屏幕截图,其中显示了“转到资源”链接。

  7. 将会看到服务总线命名空间的主页。

    Screenshot 显示 Service Bus 命名空间已创建的主页。

使用 Azure 门户创建主题

  1. Service Bus命名空间页上,展开左侧导航菜单上的Entities,然后在左侧菜单中选择Topics

  2. 在工具栏中选择“+ 主题”。

  3. 输入主题名称。 将其他选项保留默认值。

  4. 选择 创建

    截图,显示 Azure 门户中的“创建主题”页面。

创建主题的订阅

  1. 选择在上一部分创建的“主题”

    屏幕截图显示从主题列表中选择主题。

  2. Service Bus主题页上,选择工具栏上的+ 订阅

    屏幕截图显示“主题”页上的“添加订阅”按钮。

  3. 在“创建订阅”页上执行以下步骤:

    1. 对于订阅名称,输入“S1”

    2. 然后,选择“创建”以创建订阅

      显示“创建订阅”页面的屏幕截图。

将应用程序认证到 Azure

本文介绍了连接到 Azure Service Bus的两种方法:passwordlessconnection string

第一个选项演示如何在Microsoft Entra ID和基于角色的访问控制(RBAC)中使用安全主体连接到Service Bus命名空间。 不必担心在代码、配置文件或安全存储(如 Azure Key Vault)中使用硬编码的连接字符串。

第二个选项演示如何使用connection string连接到Service Bus命名空间。 如果你不熟悉Azure,你可能会发现connection string选项更易于遵循。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅 Service Bus身份验证和授权。 若要详细了解无密码身份验证,请参阅 Authenticate .NET 应用

将角色分配给Microsoft Entra用户

在本地开发时,请确保连接到Azure Service Bus的用户帐户具有正确的权限。 需要Azure Service Bus数据所有者角色才能发送和接收消息。 若要将此角色分配给自己,您需要 "User Access Administrator" 角色或其他包含 Microsoft.Authorization/roleAssignments/write 操作的角色。

可以使用Azure门户、Azure CLI或Azure PowerShell向用户分配Azure RBAC 角色。 若要详细了解角色分配的可用范围,请参阅 Azure RBAC 的可用范围

以下示例将 Azure Service Bus Data Owner 角色分配给用户帐户,该角色提供对Azure Service Bus资源的完全访问权限。 在实际方案中,遵循 最低特权原则 ,仅向用户授予更安全的生产环境所需的最低权限。

Azure Service Bus 的 Azure 内置角色

对于Azure Service Bus,通过 Azure 门户和Azure资源管理 API 管理命名空间和所有相关资源已使用 Azure RBAC 模型进行保护。 Azure提供以下Azure内置角色,用于授权访问Service Bus命名空间:

若要创建自定义角色,请参阅 Service Bus 操作所需的权限

将Microsoft Entra用户添加到Azure Service Bus所有者角色

将您的 Microsoft Entra 用户名添加到 Service Bus 命名空间层级的 Azure Service Bus 数据所有者 角色。 此配置允许在用户帐户上下文中运行的应用将消息发送到队列或主题。 它可以接收来自队列或主题订阅的消息。

重要

在大多数情况下,角色分配在 Azure 中需要一到两分钟才能传播。 在极少数情况下,最多可能需要 8 分钟才能完成。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 如果没有在Azure门户中打开Service Bus命名空间页,请使用主搜索栏或左侧导航找到Service Bus命名空间。

  2. “概述 ”页上,从左侧菜单中选择 “访问控制”(IAM )。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择 “+ 添加” ,然后 添加角色分配

    显示如何分配角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,请搜索 Azure Service Bus Data Owner并选择匹配的结果。 然后选择“下一步”。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索Microsoft Entra用户名(通常是user@domain电子邮件地址),然后选择对话框底部的选择

  8. 选择“审核 + 指派”转到最后一页,然后再次选择“审核 + 指派”以完成该过程。

代码设置

若要遵循本快速入门,请使用无密码身份验证以及您自己的 Azure 账户:

  • 安装 Azure CLI
  • 请在终端或命令提示符下使用 az login 登录您的 Azure 帐户。
  • 请在本教程后续部分将适当角色添加到资源时使用同一帐户。
  • 在同样的终端或命令提示符中运行教程代码。

重要

请确保使用 az login 进行登录。 无密码代码中的 DefaultAzureCredential 类使用Azure CLI凭据对Microsoft Entra ID进行身份验证。

若要使用无密码代码,需要指定:

  • 完全限定的服务总线命名空间,例如:<service-bus-namespace.servicebus.chinacloudapi.cn>
  • 主题名称
  • 订阅名称

使用 pip 安装包

  1. 若要安装此Service Bus教程所需的Python包,请打开其路径中Python的命令提示符。 将目录更改为要在其中保存示例的文件夹。

  2. 安装软件包:

    pip install azure-servicebus
    pip install azure-identity
    pip install aiohttp
    

将消息发送到主题

以下示例代码演示如何将一批消息发送到Service Bus主题。 请参阅代码注释以了解详细信息。

打开喜欢的编辑器(如 Visual Studio Code),创建文件 send.py,并将以下代码添加到其中。

  1. 添加以下 import 语句。

    import asyncio
    from azure.servicebus.aio import ServiceBusClient
    from azure.servicebus import ServiceBusMessage
    from azure.identity.aio import DefaultAzureCredential
    
  2. 添加常量并定义凭据。

    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    TOPIC_NAME = "TOPIC_NAME"
    
    credential = DefaultAzureCredential()
    

    重要

    • FULLY_QUALIFIED_NAMESPACE 替换为该 Service Bus 命名空间的完全限定命名空间。
    • TOPIC_NAME 替换为主题名称。

    在前面的代码中,你使用了 Azure Identity 客户端库的 DefaultAzureCredential 类。 当应用在开发期间在本地运行时,DefaultAzureCredential 将自动发现并使用您登录 Azure CLI 时使用的帐户进行身份验证。 将应用部署到Azure时,DefaultAzureCredential无需更改任何代码即可通过托管标识对应用进行身份验证以Microsoft Entra ID。

  3. 添加一个方法以发送一条消息。

    async def send_single_message(sender):
        # Create a Service Bus message
        message = ServiceBusMessage("Single Message")
        # send the message to the topic
        await sender.send_messages(message)
        print("Sent a single message")
    

    发送方是一个对象,充当你创建的主题的客户端。 稍后将创建它,并将其作为参数发送到此函数。

  4. 添加一个方法以发送一列消息。

    async def send_a_list_of_messages(sender):
        # Create a list of messages
        messages = [ServiceBusMessage("Message in list") for _ in range(5)]
        # send the list of messages to the topic
        await sender.send_messages(messages)
        print("Sent a list of 5 messages")
    
  5. 添加一个方法以发送一批消息。

    async def send_batch_message(sender):
        # Create a batch of messages
        async with sender:
            batch_message = await sender.create_message_batch()
            for _ in range(10):
                try:
                    # Add a message to the batch
                    batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
                except ValueError:
                    # ServiceBusMessageBatch object reaches max_size.
                    # New ServiceBusMessageBatch object can be created here to send more data.
                    break
            # Send the batch of messages to the topic
            await sender.send_messages(batch_message)
        print("Sent a batch of 10 messages")
    
  6. 创建一个Service Bus客户端,然后创建一个用于发送消息的主题发送者对象。

    async def run():
        # create a Service Bus client using the credential.
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
            # Get a Topic Sender object to send messages to the topic
            sender = servicebus_client.get_topic_sender(topic_name=TOPIC_NAME)
            async with sender:
                # Send one message
                await send_single_message(sender)
                # Send a list of messages
                await send_a_list_of_messages(sender)
                # Send a batch of messages
                await send_batch_message(sender)
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    print("Done sending messages")
    print("-----------------------")
    

从订阅接收消息

以下示例代码演示如何从订阅接收消息。 此代码将持续接收新消息,直到在 5 (max_wait_time) 秒内未收到任何新消息。

打开喜欢的编辑器(如 Visual Studio Code),创建文件 recv.py,并将以下代码添加到其中。

  1. 与 send 示例类似,添加 import 语句,定义应替换为自己的值的常量,并定义凭据。

    import asyncio
    from azure.servicebus.aio import ServiceBusClient
    from azure.identity.aio import DefaultAzureCredential
    
    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    SUBSCRIPTION_NAME = "SUBSCRIPTION_NAME"
    TOPIC_NAME = "TOPIC_NAME"
    
    credential = DefaultAzureCredential()
    
  2. 创建Service Bus客户端,然后创建用于接收消息的订阅接收方对象。

    async def run():
        # create a Service Bus client using the credential
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
    
            async with servicebus_client:
                # get the Subscription Receiver object for the subscription
                receiver = servicebus_client.get_subscription_receiver(topic_name=TOPIC_NAME, 
                subscription_name=SUBSCRIPTION_NAME, max_wait_time=5)
                async with receiver:
                    received_msgs = await receiver.receive_messages(max_wait_time=5, max_message_count=20)
                    for msg in received_msgs:
                        print("Received: " + str(msg))
                        # complete the message so that the message is removed from the subscription
                        await receiver.complete_message(msg)
            # Close credential when no longer needed.
            await credential.close()
    
  3. 调用 run 方法。

    asyncio.run(run())
    

运行应用

打开路径中具有Python的命令提示符,然后运行代码以在主题下发送和接收订阅的消息。

python send.py; python recv.py

应会看到以下输出:

Sent a single message
Sent a list of 5 messages
Sent a batch of 10 messages
Done sending messages
-----------------------
Received: Single Message
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch

在 Azure 门户中,导航到 Service Bus 的命名空间。 在“概述”页上,验证传入和传出消息计数是否为 16。 如果没有看到计数,请等待几分钟后再刷新页面。

传入和传出消息计数

选择底部窗格中的主题以查看相关的 Service Bus 主题 页面。 在此页上,应会在“消息”图表中看到三条传入消息和三条传出消息

传入和传出消息

在此页上,如果选择订阅,将访问 Service Bus Subscription 页。 可以在此页上查看活动消息计数、死信消息计数等。 在此示例中,所有消息均已接收,因此活动消息计数为零。

活动消息计数

如果将接收代码注释掉,那么你会看到活动消息计数变为 16。

活动消息计数 - 无接收

后续步骤

请参阅以下文档和示例:

  • Azure Service Bus 客户端库适用于 Python
  • Samples
    • sync_samples 文件夹包含示例,演示如何以同步方式与Service Bus交互。 在本快速入门中,您使用了此方法。
    • async_samples 文件夹包含示例,演示如何以异步方式与Service Bus交互。
  • azure-servicebus 参考文档