使用 Python(azure-eventhub 版本 5)向/从事件中心发送/接收事件Send events to or receive events from event hubs by using Python (azure-eventhub version 5)

本快速入门介绍如何使用 azure-eventhub 版本 5 Python 包向事件中心发送事件以及从事件中心接收事件。This quickstart shows how to send events to and receive events from an event hub using the azure-eventhub version 5 Python package.

先决条件Prerequisites

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

若要完成本快速入门,需要具备以下先决条件:To complete this quickstart, you need the following prerequisites:

  • Azure 订阅Azure subscription. 若要使用 Azure 服务(包括 Azure 事件中心),需要一个订阅。To use Azure services, including Azure Event Hubs, you need a subscription. 如果没有现有 Azure 帐户,可以注册 1 元试用版创建帐户If you don't have an existing Azure account, you can sign up for a 1mb trial or create an account.

  • Python 2.7、3.5 或更高版本,装有 PIP 且已更新。Python 2.7 or 3.5 or later, with PIP installed and updated.

  • 事件中心的 Python 包。The Python package for Event Hubs.

    若要安装该包,请在包路径中包含 Python 的命令提示符中运行以下命令:To install the package, run this command in a command prompt that has Python in its path:

    pip install azure-eventhub
    

    安装以下包,以使用 Azure Blob 存储作为检查点存储来接收事件:Install the following package for receiving the events by using Azure Blob storage as the checkpoint store:

    pip install azure-eventhub-checkpointstoreblob-aio
    
  • 创建事件中心命名空间和事件中心Create an Event Hubs namespace and an event hub. 第一步是使用 Azure 门户创建事件中心类型的命名空间,并获取应用程序与事件中心进行通信所需的管理凭据。The first step is to use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. 要创建命名空间和事件中心,请按照此文中的步骤操作。To create a namespace and an event hub, follow the procedure in this article. 然后,按照以下文章中的说明获取事件中心命名空间的连接字符串:获取连接字符串Then, get the connection string for the Event Hubs namespace by following instructions from the article: Get connection string. 稍后将在本快速入门中使用连接字符串。You use the connection string later in this quickstart.

发送事件Send events

在本部分,你将创建一个 Python 脚本,用于将事件发送到前面创建的事件中心。In this section, you create a Python script to send events to the event hub that you created earlier.

  1. 打开常用的 Python 编辑器,如 Visual Studio CodeOpen your favorite Python editor, such as Visual Studio Code.

  2. 创建名为 send.py 的脚本。Create a script called send.py. 此脚本将一批事件发送到前面创建的事件中心。This script sends a batch of events to the event hub that you created earlier.

  3. 将以下代码粘贴到 send.py 中:Paste the following code into send.py:

    import asyncio
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a connection string to your event hubs namespace and
            # the event hub name.
        producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData('First event '))
            event_data_batch.add(EventData('Second event'))
            event_data_batch.add(EventData('Third event'))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    
    

    备注

    有关完整源代码(包括参考性注释),请参阅 GitHub send_async.py 页For the complete source code, including informational comments, go to the GitHub send_async.py page.

接收事件Receive events

本快速入门使用 Azure Blob 存储作为检查点存储。This quickstart uses Azure Blob storage as a checkpoint store. 检查点存储用于保存检查点(即,上次读取位置)。The checkpoint store is used to persist checkpoints (that is, the last read positions).

备注

如果在 Azure Stack Hub 上运行,该平台支持的存储 Blob SDK 版本可能不同于通常在 Azure 上提供的版本。If you are running on Azure Stack Hub, that platform may support a different version of Storage Blob SDK than those typically available on Azure. 例如,如果在 Azure Stack Hub 版本 2002 上运行,则存储服务的最高可用版本为版本 2017-11-09。For example, if you are running on Azure Stack Hub version 2002, the highest available version for the Storage service is version 2017-11-09. 在这种情况下,除了执行本部分中的步骤以外,还需要添加相关代码,将存储服务 API 版本 2017-11-09 作为目标。In this case, besides following steps in this section, you will also need to add code to target the Storage service API version 2017-11-09. 如需通过示例来了解如何以特定的存储 API 版本为目标,请参阅 GitHub 上的同步异步示例。For an example on how to target a specific Storage API version, see the synchronous and asynchronous samples on GitHub. 有关 Azure Stack Hub 上支持的 Azure 存储服务版本的详细信息,请参阅 Azure Stack Hub 存储:差异和注意事项For more information on the Azure Storage service versions supported on Azure Stack Hub, please refer to Azure Stack Hub storage: Differences and considerations.

创建 Azure 存储帐户和 Blob 容器Create an Azure storage account and a blob container

执行以下步骤,创建 Azure 存储帐户并在其中创建 Blob 容器:Create an Azure storage account and a blob container in it by doing the following steps:

  1. 创建 Azure 存储帐户Create an Azure Storage account
  2. 创建一个 blob 容器Create a blob container
  3. 获取存储帐户的连接字符串Get the connection string to the storage account

请务必记下连接字符串和容器名称,供稍后在接收代码中使用。Be sure to record the connection string and container name for later use in the receive code.

创建用于接收事件的 Python 脚本Create a Python script to receive events

在本部分,你将创建一个 Python 脚本用于从事件中心接收事件:In this section, you create a Python script to receive events from your event hub:

  1. 打开常用的 Python 编辑器,如 Visual Studio CodeOpen your favorite Python editor, such as Visual Studio Code.

  2. 创建名为 recv.py 的脚本 。Create a script called recv.py.

  3. 将以下代码粘贴到 recv.py 中:Paste the following code into recv.py:

    import asyncio
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
    
    
    async def on_event(partition_context, event):
        # Print the event data.
        print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
        async with client:
            # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
            await client.receive(on_event=on_event,  starting_position="-1")
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        # Run the main method.
        loop.run_until_complete(main())    
    

    备注

    有关完整源代码(包括附加的参考注释),请转到 GitHub recv_with_checkpoint_store_async.py 页For the complete source code, including additional informational comments, go to the GitHub recv_with_checkpoint_store_async.py page.

运行接收器应用Run the receiver app

若要运行此脚本,请打开其路径中包含 Python 的命令提示符,然后运行以下命令:To run the script, open a command prompt that has Python in its path, and then run this command:

python recv.py

运行发送器应用Run the sender app

若要运行此脚本,请打开其路径中包含 Python 的命令提示符,然后运行以下命令:To run the script, open a command prompt that has Python in its path, and then run this command:

python send.py

接收器窗口应会显示已发送到事件中心的消息。The receiver window should display the messages that were sent to the event hub.

后续步骤Next steps

在本快速入门中,你以异步方式发送和接收了事件。In this quickstart, you've sent and received events asynchronously. 若要了解如何以同步方式发送和接收事件,请参阅 GitHub sync_samples 页To learn how to send and receive events synchronously, go to the GitHub sync_samples page.

有关 GitHub 上的所有示例(包括同步和异步),请参阅适用于 Python 的 Azure 事件中心客户端库示例For all the samples (both synchronous and asynchronous) on GitHub, go to Azure Event Hubs client library for Python samples.