Leer en inglés

Compartir a través de

快速入门:向 Azure 服务总线队列发送消息并从中接收消息(Python)

本文提供有关将消息发送到服务总线队列并接收消息的简单方案的分步说明。 可以在 GitHub 上的 Azure SDK for Python 存储库中找到适用于 Azure 服务总线的预生成 JavaScript 和 TypeScript 示例。

在本快速入门中,请执行以下操作:

  • 使用 Azure 门户创建服务总线命名空间。

  • 使用 Azure 门户创建服务总线队列。

  • 编写 Python 代码来使用 azure-servicebus 包执行以下任务:

    • 将一组消息发送到队列。
    • 从队列接收这些消息。

如果不熟悉该服务,请参阅 服务总线概述 ,然后再开始。

先决条件

通过自己的 Azure 帐户来使用本快速入门:

  • 安装 Azure CLI,它提供向开发人员计算机进行无密码身份验证的功能。
  • 在终端或命令提示符处通过 az login 使用 Azure 帐户登录。
  • 将适当的数据角色添加到资源时,请使用同一帐户。
  • 在同样的终端或命令提示符中运行代码。
  • 记下服务总线命名空间的队列名称。 你需要在代码中包含它。

本快速入门将演练可以使用 Python 复制和运行的示例。 有关如何创建 Python 应用程序的说明,请参阅 快速入门:将 Python Web 应用部署到 Azure 应用服务。 有关安装本快速入门中使用的包的详细信息,请参阅 如何安装适用于 Python 的 Azure 库包

在 Azure 门户中创建命名空间

若要开始使用 Azure 中的服务总线消息传送实体,请创建一个名称在 Azure 中唯一的命名空间。 命名空间为应用程序中的服务总线资源(例如队列和主题)提供范围容器。

创建命名空间:

  1. 登录 Azure 门户

  2. 从左上角选择浮出控件菜单,然后导航到 “所有服务 ”页

  3. 在左侧导航栏上,选择 “集成”。

  4. 向下滚动到 消息服务服务>总线 ,然后选择“ 创建”。

    显示选择“创建资源”、“集成”和“服务总线”菜单中的屏幕截图。

  5. “创建命名空间”页的“基本信息”选项卡中,执行以下步骤:

    1. 对于“订阅”,请选择要在其中创建命名空间的 Azure 订阅。

    2. 对于资源组,选择现有资源组或创建新的资源组。

    3. 输入符合以下命名约定的 命名空间名称

      • 该名称在 Azure 中必须唯一。 系统会立即检查该名称是否可用。
      • 名称长度最少为 6 个字符,最多为 50 个字符。
      • 名称只能包含字母、数字、连字符 -
      • 名称必须以字母开头,并且必须以字母或数字结尾。
      • 名称不以 -sb-mgmt 结尾。
    4. 对于 “位置”,请选择要托管命名空间的区域。

    5. 对于“定价层”,请选择命名空间的定价层(“基本”、“标准”或“高级”)。 对于本快速入门,请选择“标准”。

      如果选择 “高级 ”层,则可以为命名空间启用 异地复制 。 异地复制功能可确保命名空间的元数据和数据从主要区域持续复制到一个或多个次要区域。

      Importante

      若要使用主题和订阅,请选择“标准”或“高级”。 基本定价层不支持主题和订阅。

      如果选择了“高级”定价层,请指定“消息传送单元”数 。 高级层在 CPU 和内存级别提供资源隔离,使每个工作负荷在隔离的环境中运行。 此资源容器称为 消息传送单元。 高级命名空间至少具有一个消息传送单元。 可为每个服务总线高级命名空间选择 1、2、4、8 或 16 个消息传送单元。 有关详细信息,请参阅 服务总线高级消息传送层

    6. 在页面底部选择“查看 + 创建”。

      显示“创建命名空间”页的屏幕截图

    7. 在“查看 + 创建”页上,查看设置,然后选择“创建” 。

  6. 部署资源成功后,在部署页上选择 “转到资源 ”。

    显示部署成功页的屏幕截图,其中显示了“转到资源”链接。

  7. 将会看到服务总线命名空间的主页。

    显示创建的服务总线命名空间主页的屏幕截图。

在 Azure 门户中创建队列

  1. 在“服务总线命名空间”页上,展开左侧导航菜单上的“实体”,然后在左侧菜单中选择“队列”

  2. 在“ 队列 ”页上的工具栏上,选择“ + 队列”。

  3. 输入队列的名称。 将其他值保留为默认值。

  4. 选择 创建

    显示“创建队列”页面的屏幕截图。

向 Azure 验证应用

本文介绍连接到 Azure 服务总线的两种方法: 无密码连接字符串

第一个选项展示如何使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到服务总线命名空间。 无需担心在代码、配置文件或安全存储(如 Azure Key Vault)中具有硬编码连接字符串。

第二个选项展示如何使用连接字符串连接到服务总线命名空间。 如果不熟悉 Azure,你可能会感觉“连接字符串”选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅 服务总线身份验证和授权。 若要详细了解无密码身份验证,请参阅 “对 .NET 应用进行身份验证”。

将角色分配给 Microsoft Entra 用户

在本地开发时,请确保连接到 Azure 服务总线的用户帐户具有正确的权限。 需要 Azure 服务总线数据所有者 角色才能发送和接收消息。 若要将此角色分配给自己,您需要 "User Access Administrator" 角色或其他包含 Microsoft.Authorization/roleAssignments/write 操作的角色。

可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 若要详细了解角色分配的可用范围,请参阅 了解 Azure RBAC 的范围

以下示例将 Azure Service Bus Data Owner 角色分配给用户帐户,该角色提供对 Azure 服务总线资源的完全访问权限。 在实际方案中,遵循 最低特权原则 ,仅向用户授予更安全的生产环境所需的最低权限。

适用于 Azure 服务总线的 Azure 内置角色

对于 Azure 服务总线,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供了以下 Azure 内置角色,用于授予对服务总线命名空间的访问权限:

如果要创建自定义角色,请参阅执行服务总线操作所需的权限

将 Microsoft Entra 用户添加到“Azure 服务总线所有者”角色

将 Microsoft Entra 用户名添加到服务总线命名空间级别的 Azure 服务总线数据所有者角色。 此配置允许在用户帐户上下文中运行的应用将消息发送到队列或主题。 它可以接收来自队列或主题订阅的消息。

Importante

在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟才能完成。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 如果未在 Azure 门户中打开“服务总线命名空间”页,请使用主搜索栏或左侧导航找到你的服务总线命名空间。

  2. “概述 ”页上,从左侧菜单中选择 “访问控制”(IAM )。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择 “+ 添加” ,然后 添加角色分配

    显示如何分配角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,请搜索 Azure Service Bus Data Owner 并选择匹配的结果。 然后选择“下一步” 。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索你的 Microsoft Entra 用户名(通常是你的 user@domain 电子邮件地址),然后在对话框的底部选择“选择”。

  8. 选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。

使用 pip 安装包

  1. 若要安装此服务总线快速入门所需的 Python 包,请打开其路径中具有 Python 的命令提示符窗口。

  2. 将目录更改为要在其中保存示例的文件夹。

  3. 安装以下包:

    pip install azure-servicebus
    pip install azure-identity
    pip install aiohttp
    

向队列发送消息

以下示例代码演示如何向队列发送消息。 打开文本编辑器(如 Visual Studio Code)创建文件 send.py,并将以下代码添加到其中。

  1. 添加 Import 语句。

    import asyncio
    from azure.servicebus.aio import ServiceBusClient
    from azure.servicebus import ServiceBusMessage
    from azure.identity.aio import DefaultAzureCredential
    
  2. 添加常量并定义凭据。

    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    QUEUE_NAME = "QUEUE_NAME"
    
    credential = DefaultAzureCredential()
    

    Importante

    • FULLY_QUALIFIED_NAMESPACE 替换为服务总线命名空间的完全限定命名空间。
    • QUEUE_NAME 替换为该队列的名称。
  3. 添加一个方法以发送一条消息。

    async def send_single_message(sender):
        # Create a Service Bus message and send it to the queue
        message = ServiceBusMessage("Single Message")
        await sender.send_messages(message)
        print("Sent a single message")
    

    发送方是一个对象,充当你创建的队列的客户端。 稍后创建它,并将其作为参数发送到此函数。

  4. 添加一个方法以发送一列消息。

    async def send_a_list_of_messages(sender):
        # Create a list of messages and send it to the queue
        messages = [ServiceBusMessage("Message in list") for _ in range(5)]
        await sender.send_messages(messages)
        print("Sent a list of 5 messages")
    
  5. 添加一个方法以发送一批消息。

    async def send_batch_message(sender):
        # Create a batch of messages
        async with sender:
            batch_message = await sender.create_message_batch()
            for _ in range(10):
                try:
                    # Add a message to the batch
                    batch_message.add_message(ServiceBusMessage("Message inside a ServiceBusMessageBatch"))
                except ValueError:
                    # ServiceBusMessageBatch object reaches max_size.
                    # New ServiceBusMessageBatch object can be created here to send more data.
                    break
            # Send the batch of messages to the queue
            await sender.send_messages(batch_message)
        print("Sent a batch of 10 messages")
    
  6. 创建一个服务总线客户端,然后创建一个队列发送方对象来发送消息。

    async def run():
        # create a Service Bus client using the credential
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
            # get a Queue Sender object to send messages to the queue
            sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
            async with sender:
                # send one message
                await send_single_message(sender)
                # send a list of messages
                await send_a_list_of_messages(sender)
                # send a batch of messages
                await send_batch_message(sender)
    
            # Close credential when no longer needed.
            await credential.close()
    
  7. 调用 run 方法并显示消息。

    asyncio.run(run())
    print("Done sending messages")
    print("-----------------------")
    

从队列接收消息

以下示例代码演示如何从队列接收消息。 显示的代码将持续接收新消息,直到在 5 (max_wait_time) 秒内未收到任何新消息。

打开文本编辑器(如 Visual Studio Code)创建文件 recv.py,并将以下代码添加到其中。

  1. 类似于 send.py 示例,添加 import 语句。 将常量替换为自己的值并定义凭据。

    import asyncio
    
    from azure.servicebus.aio import ServiceBusClient
    from azure.identity.aio import DefaultAzureCredential
    
    FULLY_QUALIFIED_NAMESPACE = "FULLY_QUALIFIED_NAMESPACE"
    QUEUE_NAME = "QUEUE_NAME"
    
    credential = DefaultAzureCredential()
    
  2. 创建一个服务总线客户端,然后创建一个队列接收方对象来接收消息。

    async def run():
        # create a Service Bus client using the connection string
        async with ServiceBusClient(
            fully_qualified_namespace=FULLY_QUALIFIED_NAMESPACE,
            credential=credential,
            logging_enable=True) as servicebus_client:
    
            async with servicebus_client:
                # get the Queue Receiver object for the queue
                receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
                async with receiver:
                    received_msgs = await receiver.receive_messages(max_wait_time=5, max_message_count=20)
                    for msg in received_msgs:
                        print("Received: " + str(msg))
                        # complete the message so that the message is removed from the queue
                        await receiver.complete_message(msg)
    
            # Close credential when no longer needed.
            await credential.close()
    
  3. 调用 run 方法。

    asyncio.run(run())
    

运行应用

打开其路径中包含 Python 的命令提示符,然后运行代码以从队列发送和接收消息。

python send.py; python recv.py

应会看到以下输出:

Sent a single message
Sent a list of 5 messages
Sent a batch of 10 messages
Done sending messages
-----------------------
Received: Single Message
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message in list
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch
Received: Message inside a ServiceBusMessageBatch

在 Azure 门户中,导航到你的服务总线命名空间。 在“概述”页上,验证传入和传出消息计数是否为 16 。 如果未看到计数,请等待几分钟,然后刷新页面。

传入和传出消息计数

在此“概述”页上选择队列,导航到“服务总线队列”页面 。 还可在此页上看到传入和传出消息计数 。 还可看到其他信息,如队列的当前大小和活动消息计数 。

队列详细信息

请参阅以下文档和示例: