如何通过 Python 应用程序使用 Azure 事件中心How to use Azure Event Hubs from a Python application

Azure 事件中心是一个大数据流式处理平台和事件引入服务,每秒能够接收和处理数百万个事件。Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. 事件中心可以处理和存储分布式软件和设备生成的事件、数据或遥测。Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到数据中心的数据。Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. 有关详细信息,请参阅事件中心简介For more information, see Introduction to Event Hubs.

本文提供了指向文章的链接,这些文章演示如何在使用 Python 编写的应用程序中执行以下任务 :This article provides links to articles that show you how to do the following tasks from an application written in Python:

先决条件Prerequisites

安装 Python 包Install Python package

若要为事件中心安装 Python 包,请打开其路径中包含 Python 的命令提示符,然后运行以下命令:To install the Python package for Event Hubs, open a command prompt that has Python in its path, and then run this command:

pip install azure-eventhub

将事件发送到事件中心Send events to Event Hubs

下列代码演示如何从 Python 应用程序将事件发送到事件中心:The following code shows you how to send events to an event hub from a Python application:

  1. 创建变量以保存事件中心 URL、密钥名称和密钥值。Create variables to hold URL of the event hub, key name, and key value.

    # Import classes from Event Hubs python package
    from azure.eventhub import EventHubClient, Receiver, Offset
    
    # Address can be in either of these formats:
    # "amqps://<URL-encoded-SAS-policy>:<URL-encoded-SAS-key>@<mynamespace>.servicebus.chinacloudapi.cn/myeventhub"
    # "amqps://<mynamespace>.servicebus.chinacloudapi.cn/myeventhub"
    # For example:
    ADDRESS = "amqps://<MyEventHubNamspaceName>.servicebus.chinacloudapi.cn/<MyEventHubName>"
    
    # SAS policy and key are not required if they are encoded in the URL
    USER = "<Name of the access key. Default name: RootManageSharedAccessKey>"
    KEY = "<The access key>"
    
  2. 使用发件人创建事件中心客户端、添加发件人、运行客户端以及发送事件,完成后关闭客户端。Create an Event Hubs client, add a sender, run the client, send the event using sender, and then stop the client when you are done.

    # Create an Event Hubs client
    client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
    
    # Add a sender to the client
    sender = client.add_sender(partition="0")
    
    # Run the Event Hub client
    client.run()
    
    # Send event to the event hub
    sender.send(EventData("<MyEventData>"))
    
    # Stop the Event Hubs client
    client.stop()
    
    

有关如何从使用 Python 编写的应用程序将事件发送到事件中心的完整教程,请参阅本文For a complete tutorial on how to send events to an event hub from an application written in Python, see this article.

从事件中心接收事件Receive events from Event Hubs

下列代码演示如何接收来自 Python 应用程序事件中心的事件:The following code shows you how to receive events from an event hub from a Python application:


# Create an Event Hubs client
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)

# Add a receiver to the client
receiver = client.add_receiver(
    CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)

# Run the Event Hubs client
client.run()

# Receive event data from the event hub
for event_data in receiver.receive(timeout=100):
    last_offset = event_data.offset
    last_sn = event_data.sequence_number
    print("Received: {}, {}".format(last_offset, last_sn))

# Stop the Event Hubs client
client.stop()

有关如何接收来自 Python 编写的应用程序事件中心的事件的完整教程,请参阅本文For a complete tutorial on how to receive events from an event hub from an application written in Python, see this article

从 Azure 存储读取捕获的事件数据Read capture event data from Azure Storage

下面的代码演示如何使用 Python 应用程序从 Azure Blob 存储读取捕获的事件数据 :按照以下说明为事件中心启用捕获功能 :使用 Azure 门户启用事件中心捕获The following code shows you how to read captured event data that's stored in an Azure blob storage from a Python application: Enable Capture feature for the event hub by following instructions from: Enable Event Hubs Capture using the Azure portal. 然后,在测试代码前将一些事件发送到事件中心。Then, send some events to the event hub before testing the code.

import os
import string
import json
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
from azure.storage.blob import BlockBlobService

def processBlob(filename):
    reader = DataFileReader(open(filename, 'rb'), DatumReader())
    dict = {}
    for reading in reader:
        parsed_json = json.loads(reading["Body"])
        if not 'id' in parsed_json:
            return
        if not dict.has_key(parsed_json['id']):
            list = []
            dict[parsed_json['id']] = list
        else:
            list = dict[parsed_json['id']]
            list.append(parsed_json)
    reader.close()
    for device in dict.keys():
        deviceFile = open(device + '.csv', "a")
        for r in dict[device]:
            deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')

def startProcessing(accountName, key, container):
    print 'Processor started using path: ' + os.getcwd()
    block_blob_service = BlockBlobService(account_name=accountName, account_key=key)
    generator = block_blob_service.list_blobs(container)
    for blob in generator:
        #content_length == 508 is an empty file, so only process content_length > 508 (skip empty files)
        if blob.properties.content_length > 508:
            print('Downloaded a non empty blob: ' + blob.name)
            cleanName = string.replace(blob.name, '/', '_')
            block_blob_service.get_blob_to_path(container, blob.name, cleanName)
            processBlob(cleanName)
            os.remove(cleanName)
        block_blob_service.delete_blob(container, blob.name)
startProcessing('YOUR STORAGE ACCOUNT NAME', 'YOUR KEY', 'capture')

有关如何从使用 Python 编写的应用程序中读取 Azure Blob 存储中捕获的事件中心数据的完成教程,请参阅本文For a complete tutorial on how to read captured Event Hubs data in an Azure blob storage from an application written in Python, see this article

GitHub 示例GitHub samples

可在 azure-event-hubs-python Git 存储库 中找到更多 Python 示例。You can find more Python samples in the azure-event-hubs-python Git repository.

后续步骤Next steps

阅读“概念”部分中的文章,从事件中心功能概述开始。Read through articles in the Concepts section starting from Event Hubs features overview.