使用 Python 向/从事件中心发送/接收事件

本快速入门介绍如何使用 azure-eventhub Python 包向事件中心发送事件以及从事件中心接收事件。

先决条件

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述

若要完成本快速入门,需要具备以下先决条件:

  • Azure 订阅。 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。 如果你没有 Azure 帐户,可以注册试用订阅
  • Python 3.8 或更高版本,装有 pip 且已更新。
  • Visual Studio Code(推荐)或任何其他集成开发环境 (IDE)。
  • 创建事件中心命名空间和事件中心。 第一步是使用 Azure 门户创建事件中心命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。 要创建命名空间和事件中心,请按照此文中的步骤操作。

安装包以发送事件

若要为事件中心安装 Python 包,请打开其路径中包含 Python 的命令提示符。 将目录更改为要在其中保存示例的文件夹。

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

向 Azure 验证应用

本快速入门介绍连接到 Azure 事件中心的两种方法:无密码方法和连接字符串方法。 第一个选项展示了如何使用 Microsoft Entra ID 中的安全主体和基于角色的访问控制 (RBAC) 连接到事件中心命名空间。 无需担心代码、配置文件或安全存储(如 Azure 密钥保管库)中存在硬编码的连接字符串。 第二个选项说明如何使用连接字符串连接到事件中心命名空间。 如果不熟悉 Azure,你可能会感觉连接字符串选项更易于使用。 建议在实际应用程序和生产环境中使用无密码选项。 有关详细信息,请参阅身份验证和授权。 还可以在概述页上阅读有关无密码身份验证的详细信息。

将角色分配到 Microsoft Entra 用户帐户

在本地开发时,请确保连接到 Azure 事件中心的用户帐户具有正确的权限。 你需要拥有 Azure 事件中心数据所有者角色才能发送和接收消息。 若要为自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write 操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可在范围概述页上详细了解角色分配的可用范围。

以下示例将 Azure Event Hubs Data Owner 角色分配给用户帐户,该角色提供对 Azure 事件中心资源的完全访问权限。 在实际方案中,遵循最小特权原则,仅向用户提供更安全的生产环境所需的最小权限。

Azure 事件中心的内置 Azure 角色

对于 Azure 事件中心,通过 Azure 门户和 Azure 资源管理 API 对命名空间和所有相关资源进行的管理已使用 Azure RBAC 模型进行了保护。 Azure 提供以下 Azure 内置角色,用于授予对事件中心命名空间的访问权限:

如果要创建自定义角色,请参阅执行事件中心操作所需的权限

重要

在大多数情况下,角色分配在 Azure 中传播需要一两分钟。 在极少数情况下,最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 在 Azure 门户中,使用主搜索栏或左侧导航找到你的事件中心命名空间。

  2. 在概述页面上,从左侧菜单中选择“访问控制(IAM)”。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。

    显示如何分配角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 对于此示例,请搜索 Azure Event Hubs Data Owner 并选择匹配的结果。 然后选择“下一步” 。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。

  8. 选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。

发送事件

在本部分,创建一个 Python 脚本,用于将事件发送到前面创建的事件中心。

  1. 打开常用的 Python 编辑器,如 Visual Studio Code

  2. 创建名为 send.py 的脚本。 此脚本将一批事件发送到前面创建的事件中心。

  3. 将以下代码粘贴到 send.py 中:

    在代码中,使用实际值替换以下占位符:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    注意

    有关使用连接字符串以异步方式将事件发送到事件中心的其他选项的示例,请参阅 GitHub send_async.py 页。 显示的模式也适用于以无密码方式发送事件。

接收事件

本快速入门使用 Azure Blob 存储作为检查点存储。 检查点存储用于保存检查点(即,上次读取位置)。

使用 Azure Blob 存储作为检查点存储时,请遵循以下建议:

  • 对每个使用者组使用单独的容器。 可以使用同一存储帐户,但每个组使用一个容器。
  • 请勿将容器用于任何其他用途,也不要将存储帐户用于任何其他用途。
  • 存储帐户应位于部署的应用程序所在的同一区域中。 如果应用程序位于本地,请尝试选择最近的区域。

在 Azure 门户的“存储帐户”页上的“Blob 服务”部分,确保禁用以下设置。

  • 分层命名空间
  • Blob 软删除
  • 版本控制

创建 Azure 存储帐户和 Blob 容器

执行以下步骤,创建 Azure 存储帐户并在其中创建 Blob 容器:

  1. 创建 Azure 存储帐户
  2. 创建 Blob 容器
  3. 对 Blob 容器进行身份验证。

请务必记下连接字符串和容器名称,供稍后在接收代码中使用。

在本地开发时,请确保访问 Blob 数据的用户帐户具有正确的权限。 需有“存储 Blob 数据参与者”角色才能读取和写入 Blob 数据。 若要为你自己分配此角色,需要具有“用户访问管理员”角色,或者具有包含 Microsoft.Authorization/roleAssignments/write 操作的其他角色。 可使用 Azure 门户、Azure CLI 或 Azure PowerShell 向用户分配 Azure RBAC 角色。 可以在范围概述页上详细了解角色分配的可用范围。

在此方案中,你将为用户帐户分配权限(范围为存储帐户)以遵循最低权限原则。 这种做法仅为用户提供所需的最低权限,并创建更安全的生产环境。

以下示例将“存储 Blob 数据参与者”角色分配给用户帐户,该角色提供对存储帐户中 Blob 数据的读取和写入访问权限。

重要

在大多数情况下,角色分配在 Azure 中传播需要一两分钟的时间,但极少数情况下最多可能需要 8 分钟。 如果在首次运行代码时收到身份验证错误,请稍等片刻再试。

  1. 在 Azure 门户中,使用主搜索栏或左侧导航找到存储帐户。

  2. 在存储帐户概述页的左侧菜单中选择“访问控制 (IAM)”。

  3. 在“访问控制 (IAM)”页上,选择“角色分配”选项卡。

  4. 从顶部菜单中选择“+ 添加”,然后从出现的下拉菜单中选择“添加角色分配”。

    显示如何分配存储帐户角色的屏幕截图。

  5. 使用搜索框将结果筛选为所需角色。 在此示例中,搜索“存储 Blob 数据参与者”并选择匹配的结果,然后选择“下一步”。

  6. 在“访问权限分配对象”下,选择“用户、组或服务主体”,然后选择“+ 选择成员”。

  7. 在对话框中,搜索 Microsoft Entra ID 用户名(通常是 user@domain 电子邮件地址),然后选中对话框底部的“选择”。

  8. 选择“查看 + 分配”转到最后一页,然后再次选择“查看 + 分配”完成该过程。

安装包以接收事件

对于接收端,需要安装一个或多个包。 本快速入门将使用 Azure Blob 存储来保存检查点,使程序不会读取已读取的事件。 它在 Blob 中按固定的时间间隔对收到的消息执行元数据检查点。 使用此方式可以很容易地在以后的某个时间从退出的位置继续接收消息。

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

创建用于接收事件的 Python 脚本

在本部分,你将创建一个 Python 脚本用于从事件中心接收事件:

  1. 打开常用的 Python 编辑器,如 Visual Studio Code

  2. 创建名为 recv.py 的脚本 。

  3. 将以下代码粘贴到 recv.py 中:

    在代码中,使用实际值替换以下占位符:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    注意

    有关使用连接字符串以异步方式从事件中心接收事件的其他选项的示例,请参阅 GitHub recv_with_checkpoint_store_async.py 页。 显示的模式也适用于以无密码方式接收事件。

运行接收器应用

若要运行此脚本,请打开其路径中包含 Python 的命令提示符,然后运行以下命令:

python recv.py

运行发送器应用

若要运行此脚本,请打开其路径中包含 Python 的命令提示符,然后运行以下命令:

python send.py

接收器窗口应会显示已发送到事件中心的消息。

疑难解答

如果在接收端窗口中未看到事件或者代码报告错误,请尝试按照以下故障排除提示操作:

  • 如果未看到 recy.py 的结果,请多次运行 send.py。

  • 如果在使用无密码代码(结合凭据)时看到有关“协同例程”的错误,请确保使用从 azure.identity.aio 导入功能。

  • 如果无密码代码(使用凭据)出现“未关闭的客户端会话”错误,请确保在完成后关闭凭据。 有关详细信息,请参阅异步凭据

  • 如果在访问存储时看到 recv.py 授权错误,请确保按照创建 Azure 存储帐户和 Blob 容器中的步骤操作,并将“存储 Blob 数据参与者”角色分配给服务主体。

  • 如果收到具有不同分区 ID 的事件,则预期会出现这种结果。 分区是一种数据组织机制,与使用方应用程序中所需的下游并行度相关。 事件中心的分区数与预期会有的并发读取者数直接相关。 有关详细信息,请参阅详细了解分区

后续步骤

在本快速入门中,你以异步方式发送和接收了事件。 若要了解如何以同步方式发送和接收事件,请参阅 GitHub sync_samples 页

有关 GitHub 上的所有示例(包括同步和异步),请参阅适用于 Python 的 Azure 事件中心客户端库示例