使用 Python 在 Azure 存储中捕获事件中心数据并读取该数据(azure-eventhub 版本 5)Capture Event Hubs data in Azure Storage and read it by using Python (azure-eventhub version 5)

可以配置事件中心,以便在 Azure 存储帐户或 Azure Data Lake Storage Gen 2 中捕获发送到事件中心的数据。You can configure an event hub so that the data that's sent to an event hub is captured in an Azure storage account or Azure Data Lake Storage Gen 2. 本文介绍如何编写 Python 代码,以将事件发送到事件中心,并从 Azure Blob 存储读取捕获的数据。This article shows you how to write Python code to send events to an event hub and read the captured data from Azure Blob storage. 有关此功能的详细信息,请参阅事件中心捕获功能概述For more information about this feature, see Event Hubs Capture feature overview.

本快速入门使用 Azure Python SDK 来演示捕获功能。This quickstart uses the Azure Python SDK to demonstrate the Capture feature. sender.py 以 JSON 格式将模拟的环境遥测数据发送到事件中心。The sender.py app sends simulated environmental telemetry to event hubs in JSON format. 事件中心配置为使用捕获功能将此数据成批地写入到 Blob 存储。The event hub is configured to use the Capture feature to write this data to Blob storage in batches. capturereader.py 应用读取这些 Blob,并为每个设备创建一个追加文件。The capturereader.py app reads these blobs and creates an append file for each device. 然后该应用将数据写入 CSV 文件。The app then writes the data into CSV files.

重要

本快速入门使用 Azure 事件中心 Python SDK 版本 5。This quickstart uses version 5 of the Azure Event Hubs Python SDK. 有关使用 Python SDK 版本 1 的快速入门,请参阅此文For a quickstart that uses version 1 of the Python SDK, see this article.

在本快速入门中,请执行以下操作:In this quickstart, you:

  • 在 Azure 门户中创建 Azure Blob 存储帐户和容器。Create an Azure Blob storage account and container in the Azure portal.
  • 使用 Azure 门户创建事件中心命名空间。Create an Event Hubs namespace by using the Azure portal.
  • 创建已启用捕获功能的事件中心,并将其连接到存储帐户。Create an event hub with the Capture feature enabled and connect it to your storage account.
  • 使用 Python 脚本将数据发送到事件中心。Send data to your event hub by using a Python script.
  • 使用另一个 Python 脚本通过事件中心捕获功能读取并处理文件。Read and process files from Event Hubs Capture by using another Python script.

先决条件Prerequisites

创建用于将事件发送到事件中心的 Python 脚本Create a Python script to send events to your event hub

在本部分,你将创建一个向事件中心发送 200 个事件(10 个设备 * 20 个事件)的 Python 脚本。In this section, you create a Python script that sends 200 events (10 devices * 20 events) to an event hub. 这些事件是以 JSON 格式发送的示例环境读数。These events are a sample environmental reading that's sent in JSON format.

  1. 打开常用的 Python 编辑器,如 Visual Studio CodeOpen your favorite Python editor, such as Visual Studio Code.

  2. 创建名为 sender.py的脚本。Create a script called sender.py.

  3. 将以下代码粘贴到 sender.py 中。Paste the following code into sender.py.

    import time
    import os
    import uuid
    import datetime
    import random
    import json
    
    from azure.eventhub import EventHubProducerClient, EventData
    
    # This script simulates the production of events for 10 devices.
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    # Create a producer client to produce and publish events to the event hub.
    producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME")
    
    for y in range(0,20):    # For each device, produce 20 events. 
        event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. 
        for dev in devices:
            # Create a dummy reading.
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading) # Convert the reading into a JSON string.
            event_data_batch.add(EventData(s)) # Add event data to the batch.
        producer.send_batch(event_data_batch) # Send the batch of events to the event hub.
    
    # Close the producer.    
    producer.close()
    
  4. 替换脚本中的以下值:Replace the following values in the scripts:

    • EVENT HUBS NAMESPACE CONNECTION STRING 替换为事件中心命名空间的连接字符串。Replace EVENT HUBS NAMESPACE CONNECTION STRING with the connection string for your Event Hubs namespace.
    • EVENT HUB NAME 替换为事件中心的名称。Replace EVENT HUB NAME with the name of your event hub.
  5. 运行脚本以将事件发送到事件中心。Run the script to send events to the event hub.

  6. 在 Azure 门户中,可以验证事件中心是否已收到消息。In the Azure portal, you can verify that the event hub has received the messages. 在“指标”部分切换到“消息”视图。 Switch to Messages view in the Metrics section. 刷新页面以更新图表。Refresh the page to update the chart. 可能需要等待几秒,页面才会显示已收到消息。It might take a few seconds for the page to display that the messages have been received.

    验证事件中心是否已收到消息Verify that the event hub received the messages

创建用于读取捕获文件的 Python 脚本Create a Python script to read your Capture files

在此示例中,捕获的数据存储在 Azure Blob 存储中。In this example, the captured data is stored in Azure Blob storage. 本部分中的脚本从 Azure 存储帐户读取捕获的数据文件,并生成可让你轻松打开和查看的 CSV 文件。The script in this section reads the captured data files from your Azure storage account and generates CSV files for you to easily open and view. 将在应用程序的当前工作目录中看到 10 个文件。You will see 10 files in the current working directory of the application. 这些文件包含 10 台设备的环境读数。These files will contain the environmental readings for the 10 devices.

  1. 在 Python 编辑器中,创建名为 capturereader.py 的脚本。In your Python editor, create a script called capturereader.py. 此脚本读取捕获的文件,并为每个设备创建一个文件,用于仅写入该设备的数据。This script reads the captured files and creates a file for each device to write the data only for that device.

  2. 将以下代码粘贴到 capturereader.py 中。Paste the following code into capturereader.py.

    import os
    import string
    import json
    import uuid
    import avro.schema
    
    from azure.storage.blob import ContainerClient, BlobClient
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    
    
    def processBlob2(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            filename = os.getcwd() + '\\' + str(device) + '.csv'
            deviceFile = open(filename, "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing():
        print('Processor started using path: ' + os.getcwd())
        # Create a blob container client.
        container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME")
        blob_list = container.list_blobs() # List all the blobs in the container.
        for blob in blob_list:
            # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).        
            if blob.size > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                # Create a blob client for the blob.
                blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
                # Construct a file name based on the blob name.
                cleanName = str.replace(blob.name, '/', '_')
                cleanName = os.getcwd() + '\\' + cleanName 
                with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                    my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file.
                processBlob2(cleanName) # Convert the file into a CSV file.
                os.remove(cleanName) # Remove the original downloaded file.
                # Delete the blob from the container after it's read.
                container.delete_blob(blob.name)
    
    startProcessing()    
    
  3. 请将 AZURE STORAGE CONNECTION STRING 替换为 Azure 存储帐户的连接字符串。Replace AZURE STORAGE CONNECTION STRING with the connection string for your Azure storage account. 在本快速入门中创建的容器的名称为 captureThe name of the container you created in this quickstart is capture. 如果为该容器使用了不同的名称,请将 capture 替换为存储帐户中容器的名称。If you used a different name for the container, replace capture with the name of the container in the storage account.

运行脚本Run the scripts

  1. 打开其路径中包含 Python 的命令提示符,并运行以下命令,安装 Python 必备组件包:Open a command prompt that has Python in its path, and then run these commands to install Python prerequisite packages:

    pip install azure-storage-blob
    pip install azure-eventhub
    pip install avro-python3
    
  2. 将目录更改为保存 sender.pycapturereader.py 的目录,然后运行以下命令:Change your directory to the directory where you saved sender.py and capturereader.py, and run this command:

    python sender.py
    

    此命令将启动一个新的 Python 进程,用于运行发送程序。This command starts a new Python process to run the sender.

  3. 等待几分钟让捕获运行,然后在原始命令窗口中输入以下命令:Wait a few minutes for the capture to run, and then enter the following command in your original command window:

    python capturereader.py
    

    此捕获处理器使用本地目录下载存储帐户和容器中的所有 Blob。This capture processor uses the local directory to download all the blobs from the storage account and container. 它将处理任何不为空的内容,并将结果以 CSV 文件的形式写入到本地目录。It processes any that are not empty, and it writes the results as CSV files into the local directory.

后续步骤Next steps

查看 GitHub 上的 Python 示例Check out Python samples on GitHub.