快速入门:事件中心捕获演练:Python(azure-eventhub 版本 1)Quickstart: Event Hubs Capture walkthrough: Python (azure-eventhub version 1)

捕获是 Azure 事件中心的一项功能。Capture is a feature of Azure Event Hubs. 可使用捕获功能自动将事件中心内的流数据传送到所选的 Azure Blob 存储帐户。You can use Capture to automatically deliver the streaming data in your event hub to an Azure Blob storage account of your choice. 使用此功能,可以轻松地对实时流数据执行批处理操作。This capability makes it easy to do batch processing on real-time streaming data. 本文介绍如何通过 Python 使用事件中心捕获功能。This article describes how to use Event Hubs Capture with Python. 有关事件中心捕获功能的详细信息,请参阅通过 Azure 事件中心捕获事件For more information about Event Hubs Capture, see Capture events through Azure Event Hubs.

本演练使用 Azure Python SDK 演示捕获功能。This walkthrough uses the Azure Python SDK to demonstrate the Capture feature. sender.py 程序以 JSON 格式将模拟的环境遥测数据发送到事件中心。The sender.py program sends simulated environmental telemetry to Event Hubs in JSON format. 事件中心使用捕获功能将此数据分批写入到 Blob 存储。The event hub uses the Capture feature to write this data to Blob storage in batches. capturereader.py 应用读取这些 Blob,为每个设备创建一个追加文件,并在每个设备上将数据写入 .csv 文件。The capturereader.py app reads these blobs, creates an append file for each of your devices, and writes the data to .csv files on each device.

警告

本快速入门适用于 Azure 事件中心 Python SDK 版本 1。This quickstart is for version 1 of the Azure Event Hubs Python SDK. 我们建议你将代码迁移Python SDK 版本 5We recommend that you migrate your code to version 5 of the Python SDK.

本演练中的操作:In this walkthrough, you:

  • 在 Azure 门户中创建 Azure Blob 存储帐户和容器。Create an Azure Blob storage account and container in the Azure portal.
  • 启用事件中心捕获,并将其定向到你的存储帐户。Enable Event Hubs Capture and direct it to your storage account.
  • 使用 Python 脚本将数据发送到事件中心。Send data to your event hub by using a Python script.
  • 使用另一个 Python 脚本通过事件中心捕获功能读取并处理文件。Read and process files from Event Hubs Capture by using another Python script.

先决条件Prerequisites

  • Python 3.4 或更高版本,其中已安装并更新 pipPython 3.4 or later, with pip installed and updated.

  • Azure 订阅。An Azure subscription. 如果没有订阅,请在开始之前创建一个试用帐户If you don't have one, create a trial account before you begin.

  • 按照以下文档中的说明创建的活动事件中心命名空间和事件中心:快速入门:使用 Azure 门户创建事件中心An active Event Hubs namespace and event hub, created by following the instructions at Quickstart: Create an event hub using Azure portal. 记下命名空间和事件中心名称,以便稍后在本演练中使用。Make a note of your namespace and event hub names to use later in this walkthrough.

    备注

    如果已有可用的存储容器,可以在创建事件中心时启用捕获并选择存储容器。If you already have a storage container to use, you can enable Capture and select the storage container when you create the Event Hub.

  • 事件中心共享访问密钥名称和主密钥值。Your Event Hubs shared access key name and primary key value. 在“事件中心”页上的“共享访问策略”下查找或创建这些值。 Find or create these values under Shared access policies on your Event Hubs page. 默认访问密钥名称为 RootManageSharedAccessKeyThe default access key name is RootManageSharedAccessKey. 复制访问密钥名称和主密钥值,以便稍后在本演练中使用。Copy the access key name and the primary key value to use later in this walkthrough.

创建 Azure Blob 存储帐户和容器Create an Azure Blob storage account and container

创建用于捕获的存储帐户和容器。Create a storage account and container to use for the capture.

  1. 登录到 Azure 门户Sign in to the Azure portal.

  2. 在左侧导航栏中选择“存储帐户”,然后在“存储帐户”屏幕上选择“添加”。 In the left navigation, select Storage accounts, and on the Storage accounts screen, select Add.

  3. 在存储帐户创建屏幕上选择订阅和资源组,然后为存储帐户指定名称。On the storage account creation screen, select a subscription and resource group, and give the storage account a name. 对于其他选项,可以保留默认值。You can leave the other selections at default. 选择“查看 + 创建”,检查设置,然后选择“创建”。 Select Review + create, review the settings, and then select Create.

    创建存储帐户

  4. 部署完成后,选择“转到资源”,然后在存储帐户的“概述”屏幕上选择“容器”。 When the deployment completes, select Go to resource, and on the storage account Overview screen, select Containers.

  5. 在“容器”屏幕上,选择“+ 容器”。 On the Containers screen, select + Container.

  6. 在“新建容器”屏幕上为容器指定名称,然后选择“确定”。 On the New container screen, give the container a name, and then select OK. 请记下该容器名称,以便稍后在本演练中使用。Make a note of the container name to use later in the walkthrough.

  7. 在“容器”屏幕的左侧导航栏中,选择“访问密钥”。 In the left navigation of the Containers screen, select Access keys. 将“存储帐户名称”以及“密钥 1”下的“密钥”值,以便稍后在本演练中使用。 Copy the Storage account name, and the Key value under key1, to use later in the walkthrough.

启用事件中心捕获Enable Event Hubs Capture

  1. 在 Azure 门户中导航到你的事件中心:从“所有资源”中选择其事件中心命名空间,在左侧导航栏中选择“事件中心”,然后选择你的事件中心。 In the Azure portal, navigate to your event hub by selecting its Event Hubs Namespace from All resources, selecting Event hubs in the left navigation, and then selecting your event hub.
  2. 在事件中心的“概述”屏幕上,选择“捕获事件”。 On the event hub Overview screen, select Capture events.
  3. 在“捕获”屏幕上,选择“打开”。 On the Capture screen, select On. 然后,在“Azure 存储容器”下,选择“选择容器”。 Then, under Azure Storage Container, select Select Container.
  4. 从“容器”屏幕上选择要使用的存储容器,然后选择“选择”。 On the Containers screen, select the storage container you want to use, and then select Select.
  5. 在“捕获”屏幕上,选择“保存更改”。 On the Capture screen, select Save changes.

创建用于将事件发送到事件中心的 Python 脚本Create a Python script to send events to Event Hub

此脚本将向事件中心发送 200 个事件。This script sends 200 events to your event hub. 事件是以 JSON 格式发送的简单环境读数。The events are simple environmental readings sent in JSON.

  1. 打开常用的 Python 编辑器,如 Visual Studio CodeOpen your favorite Python editor, such as Visual Studio Code.

  2. 创建名为 sender.py 的新文件。Create a new file called sender.py.

  3. 将以下代码粘贴到 sender.py 中。Paste the following code into sender.py. 将事件中心的 <namespace>、<AccessKeyName>、<primary key value> 和 <eventhub> 替换为自己的值。Substitute your own values for the Event Hubs <namespace>, <AccessKeyName>, <primary key value>, and <eventhub>.

    import uuid
    import datetime
    import random
    import json
    from azure.servicebus.control_client import ServiceBusService
    
    sbs = ServiceBusService(service_namespace='<namespace>', shared_access_key_name='<AccessKeyName>', shared_access_key_value='<primary key value>')
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    for y in range(0,20):
        for dev in devices:
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading)
            sbs.send_event('<eventhub>', s)
        print(y)
    
  4. 保存文件。Save the file.

创建用于读取捕获文件的 Python 脚本Create a Python script to read Capture files

此脚本读取捕获的文件,并为每个设备创建一个文件,用于仅写入该设备的数据。This script reads the captured files and creates a file for each of your devices to write the data only for that device.

  1. 在 Python 编辑器中,创建名为 capturereader.py 的新文件。In your Python editor, create a new file called capturereader.py.

  2. 将以下代码粘贴到 capturereader.py 中。Paste the following code into capturereader.py. 将 <storageaccount>、<storage account access key> 和 <storagecontainer> 替换为保存的值。Substitute your saved values for your <storageaccount>, <storage account access key>, and <storagecontainer>.

    import os
    import string
    import json
    import avro.schema
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    from azure.storage.blob import BlockBlobService
    
    def processBlob(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            deviceFile = open(device + '.csv', "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing(accountName, key, container):
        print('Processor started using path: ' + os.getcwd())
        block_blob_service = BlockBlobService(account_name=accountName, account_key=key)
        generator = block_blob_service.list_blobs(container)
        for blob in generator:
            #content_length == 508 is an empty file, so only process content_length > 508 (skip empty files)
            if blob.properties.content_length > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                cleanName = str.replace(blob.name, '/', '_')
                block_blob_service.get_blob_to_path(container, blob.name, cleanName)
                processBlob(cleanName)
                os.remove(cleanName)
            block_blob_service.delete_blob(container, blob.name)
    startProcessing('<storageaccount>', '<storage account access key>', '<storagecontainer>')
    

运行 Python 脚本Run the Python scripts

  1. 打开其路径中包含 Python 的命令提示符,并运行以下命令以安装 Python 必备组件包:Open a command prompt that has Python in its path, and run these commands to install the Python prerequisite packages:

    pip install azure-storage
    pip install azure-servicebus
    pip install avro-python3
    

    如果 azure-storageazure 的版本较低,则可能需要使用 --upgrade 选项。If you have an earlier version of azure-storage or azure, you might need to use the --upgrade option.

    此外,可能需要运行以下命令。You might also need to run the following command. 在大多数系统上并不需要运行此命令。Running this command isn't necessary on most systems.

    pip install cryptography
    
  2. sender.pycapturereader.py 保存到的目录运行以下命令:From the directory where you saved sender.py and capturereader.py, run this command:

    start python sender.py
    

    该命令将启动一个新的 Python 进程用于运行发送程序。The command starts a new Python process to run the sender.

  3. 捕获完成运行后,运行以下命令:When the capture finishes running, run this command:

    python capturereader.py
    

    捕获处理器将从存储帐户容器下载所有非空 Blob,并将结果作为 .csv 文件写入本地目录。The capture processor downloads all the non-empty blobs from the storage account container and writes the results as .csv files into the local directory.

后续步骤Next steps

若要详细了解事件中心,请参阅:To learn more about Event Hubs, see: