使用 Python 在 Azure 存储中捕获事件中心数据并读取该数据 (azure-eventhub)Capture Event Hubs data in Azure Storage and read it by using Python (azure-eventhub)
可以配置事件中心,以便在 Azure 存储帐户或 Azure Data Lake Storage Gen 1 或 Gen 2 中捕获发送到事件中心的数据。You can configure an event hub so that the data that's sent to an event hub is captured in an Azure storage account or Azure Data Lake Storage Gen 1 or Gen 2. 本文介绍如何编写 Python 代码,以将事件发送到事件中心,并从 Azure Blob 存储 读取捕获的数据。This article shows you how to write Python code to send events to an event hub and read the captured data from Azure Blob storage. 有关此功能的详细信息,请参阅事件中心捕获功能概述。For more information about this feature, see Event Hubs Capture feature overview.
本快速入门使用 Azure Python SDK 来演示捕获功能。This quickstart uses the Azure Python SDK to demonstrate the Capture feature. sender.py 以 JSON 格式将模拟的环境遥测数据发送到事件中心。The sender.py app sends simulated environmental telemetry to event hubs in JSON format. 事件中心配置为使用捕获功能将此数据成批地写入到 Blob 存储。The event hub is configured to use the Capture feature to write this data to Blob storage in batches. capturereader.py 应用读取这些 Blob,并为每个设备创建一个追加文件。The capturereader.py app reads these blobs and creates an append file for each device. 然后该应用将数据写入 CSV 文件。The app then writes the data into CSV files.
在本快速入门中,请执行以下操作:In this quickstart, you:
- 在 Azure 门户中创建 Azure Blob 存储帐户和容器。Create an Azure Blob storage account and container in the Azure portal.
- 使用 Azure 门户创建事件中心命名空间。Create an Event Hubs namespace by using the Azure portal.
- 创建已启用捕获功能的事件中心,并将其连接到存储帐户。Create an event hub with the Capture feature enabled and connect it to your storage account.
- 使用 Python 脚本将数据发送到事件中心。Send data to your event hub by using a Python script.
- 使用另一个 Python 脚本通过事件中心捕获功能读取并处理文件。Read and process files from Event Hubs Capture by using another Python script.
先决条件Prerequisites
Python 2.7、3.5 或更高版本,已安装 PIP 并已更新。Python 2.7, and 3.5 or later, with PIP installed and updated.
Azure 订阅。An Azure subscription. 如果没有 Azure 订阅,请在开始之前创建一个试用版订阅。If you don't have one, Trial Subscription before you begin.
有效的事件中心命名空间和事件中心。An active Event Hubs namespace and event hub. 创建事件中心命名空间,并在该命名空间中创建事件中心。Create an Event Hubs namespace and an event hub in the namespace. 请记下事件中心命名空间的名称、事件中心的名称,以及命名空间的主访问密钥。Record the name of the Event Hubs namespace, the name of the event hub, and the primary access key for the namespace. 若要获取访问密钥,请参阅获取事件中心连接字符串。To get the access key, see Get an Event Hubs connection string. 默认密钥名称为 RootManageSharedAccessKey。The default key name is RootManageSharedAccessKey. 对于本快速入门,只需获取主密钥。For this quickstart, you need only the primary key. 不需要连接字符串。You don't need the connection string.
Azure 存储帐户、存储帐户中的 Blob 容器,以及存储帐户的连接字符串。An Azure storage account, a blob container in the storage account, and a connection string to the storage account. 如果没有这些项,请执行以下操作:If you don't have these items, do the following:
- 创建 Azure 存储帐户Create an Azure storage account
- 在存储帐户中创建 Blob 容器Create a blob container in the storage account
- 获取存储帐户的连接字符串Get the connection string to the storage account
请务必记下连接字符串和容器名称,因为稍后在本快速入门中需要用到。Be sure to record the connection string and container name for later use in this quickstart.
为事件中心启用捕获功能。Enable the Capture feature for the event hub. 为此,请按照使用 Azure 门户启用事件中心捕获中的说明操作。To do so, follow the instructions in Enable Event Hubs Capture using the Azure portal. 选择在前一步骤中创建的存储帐户和 Blob 容器。Select the storage account and the blob container you created in the preceding step. 也可以在创建事件中心时启用该功能。You can also enable the feature when you create an event hub.
创建用于将事件发送到事件中心的 Python 脚本Create a Python script to send events to your event hub
在本部分,你将创建一个向事件中心发送 200 个事件(10 个设备 * 20 个事件)的 Python 脚本。In this section, you create a Python script that sends 200 events (10 devices * 20 events) to an event hub. 这些事件是以 JSON 格式发送的示例环境读数。These events are a sample environmental reading that's sent in JSON format.
打开常用的 Python 编辑器,如 Visual Studio Code。Open your favorite Python editor, such as Visual Studio Code.
创建名为 sender.py 的脚本。Create a script called sender.py.
将以下代码粘贴到 sender.py 中。Paste the following code into sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESAPCE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)} s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()
替换脚本中的以下值:Replace the following values in the scripts:
- 将
EVENT HUBS NAMESPACE CONNECTION STRING
替换为事件中心命名空间的连接字符串。ReplaceEVENT HUBS NAMESPACE CONNECTION STRING
with the connection string for your Event Hubs namespace. - 将
EVENT HUB NAME
替换为事件中心的名称。ReplaceEVENT HUB NAME
with the name of your event hub.
- 将
运行脚本以将事件发送到事件中心。Run the script to send events to the event hub.
在 Azure 门户中,可以验证事件中心是否已收到消息。In the Azure portal, you can verify that the event hub has received the messages. 在“指标”部分切换到“消息”视图。Switch to Messages view in the Metrics section. 刷新页面以更新图表。Refresh the page to update the chart. 可能需要等待几秒,页面才会显示已收到消息。It might take a few seconds for the page to display that the messages have been received.
创建用于读取捕获文件的 Python 脚本Create a Python script to read your Capture files
在此示例中,捕获的数据存储在 Azure Blob 存储中。In this example, the captured data is stored in Azure Blob storage. 本部分中的脚本从 Azure 存储帐户读取捕获的数据文件,并生成可让你轻松打开和查看的 CSV 文件。The script in this section reads the captured data files from your Azure storage account and generates CSV files for you to easily open and view. 将在应用程序的当前工作目录中看到 10 个文件。You will see 10 files in the current working directory of the application. 这些文件包含 10 台设备的环境读数。These files will contain the environmental readings for the 10 devices.
在 Python 编辑器中,创建名为 capturereader.py 的脚本。In your Python editor, create a script called capturereader.py. 此脚本读取捕获的文件,并为每个设备创建一个文件,用于仅写入该设备的数据。This script reads the captured files and creates a file for each device to write the data only for that device.
将以下代码粘贴到 capturereader.py 中。Paste the following code into capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()
请将
AZURE STORAGE CONNECTION STRING
替换为 Azure 存储帐户的连接字符串。ReplaceAZURE STORAGE CONNECTION STRING
with the connection string for your Azure storage account. 在本快速入门中创建的容器的名称为 capture。The name of the container you created in this quickstart is capture. 如果为该容器使用了不同的名称,请将 capture 替换为存储帐户中容器的名称。If you used a different name for the container, replace capture with the name of the container in the storage account.
运行脚本Run the scripts
打开其路径中包含 Python 的命令提示符,并运行以下命令,安装 Python 必备组件包:Open a command prompt that has Python in its path, and then run these commands to install Python prerequisite packages:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3
将目录更改为保存 sender.py 和 capturereader.py 的目录,然后运行以下命令:Change your directory to the directory where you saved sender.py and capturereader.py, and run this command:
python sender.py
此命令将启动一个新的 Python 进程,用于运行发送程序。This command starts a new Python process to run the sender.
等待几分钟让捕获运行,然后在原始命令窗口中输入以下命令:Wait a few minutes for the capture to run, and then enter the following command in your original command window:
python capturereader.py
此捕获处理器使用本地目录下载存储帐户和容器中的所有 Blob。This capture processor uses the local directory to download all the blobs from the storage account and container. 它将处理任何不为空的内容,并将结果以 CSV 文件的形式写入到本地目录。It processes any that are not empty, and it writes the results as CSV files into the local directory.
后续步骤Next steps
查看 GitHub 上的 Python 示例。Check out Python samples on GitHub.