事件中心 .NET Standard API 概述Event Hubs .NET Standard API overview

本文汇总了一些重要的 Azure 事件中心 .NET Standard 客户端 APIThis article summarizes some of the key Azure Event Hubs .NET Standard client APIs. 目前,事件中心有两个 .NET Standard 客户端库:There are currently two .NET Standard client libraries for Event Hubs:

事件中心客户端Event Hubs client

EventHubClient 是发送事件、创建接收器,以及获取运行时信息时使用的主对象。EventHubClient is the primary object you use to send events, create receivers, and to get run-time information. 此客户端链接到特定事件中心,并创建与事件中心终结点的新连接。This client is linked to a particular event hub, and creates a new connection to the Event Hubs endpoint.

创建事件中心客户端Create an Event Hubs client

EventHubClient 对象从连接字符串创建。An EventHubClient object is created from a connection string. 以下示例演示实例化新客户端的最简单方法:The simplest way to instantiate a new client is shown in the following example:

var eventHubClient = EventHubClient.CreateFromConnectionString("Event Hubs connection string");

若要以编程方式编辑连接字符串,可以使用 EventHubsConnectionStringBuilder 类,并将连接字符串作为参数传递给 EventHubClient.CreateFromConnectionStringTo programmatically edit the connection string, you can use the EventHubsConnectionStringBuilder class, and pass the connection string as a parameter to EventHubClient.CreateFromConnectionString.

var connectionStringBuilder = new EventHubsConnectionStringBuilder("Event Hubs connection string")
{
    EntityPath = EhEntityPath
};

var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());

发送事件Send events

若要将事件发送到事件中心,请使用 EventData 类。To send events to an event hub, use the EventData class. 主体必须是 byte 数组,或 byte 数组段。The body must be a byte array, or a byte array segment.

// Create a new EventData object by encoding a string as a byte array
var data = new EventData(Encoding.UTF8.GetBytes("This is my message..."));
// Set user properties if needed
data.Properties.Add("Type", "Informational");
// Send single message async
await eventHubClient.SendAsync(data);

接收事件Receive events

从事件中心接收事件的建议方法是使用 Event Processor Host,它提供相关功能来自动跟踪事件中心偏移量和分区信息。The recommended way to receive events from Event Hubs is using the Event Processor Host, which provides functionality to automatically keep track of the event hub offset and partition information. 但是,在某些情况下,可能需要利用核心事件中心库的灵活性来接收事件。However, there are certain situations in which you may want to use the flexibility of the core Event Hubs library to receive events.

创建接收器Create a receiver

接收器将绑定到特定分区,因此为了接收事件中心内的所有事件,必须创建多个实例。Receivers are tied to specific partitions, so in order to receive all events in an event hub, you must create multiple instances. 好的做法是以编程方式获取分区信息,而不是对分区 ID 进行硬编码。It is a good practice to get the partition information programatically, rather than hard-coding the partition IDs. 为此,可以使用 GetRuntimeInformationAsync 方法。In order to do so, you can use the GetRuntimeInformationAsync method.

// Create a list to keep track of the receivers
var receivers = new List<PartitionReceiver>();
// Use the eventHubClient created above to get the runtime information
var runTimeInformation = await eventHubClient.GetRuntimeInformationAsync();
// Loop over the resulting partition IDs
foreach (var partitionId in runTimeInformation.PartitionIds)
{
    // Create the receiver
    var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, PartitionReceiver.EndOfStream);
    // Add the receiver to the list
    receivers.Add(receiver);
}

由于事件永远不会从事件中心删除(而只会过期),因此必须指定适当的起始点。Because events are never removed from an event hub (and only expire), you must specify the proper starting point. 以下示例演示可能的组合:The following example shows possible combinations:

// partitionId is assumed to come from GetRuntimeInformationAsync()

// Using the constant PartitionReceiver.EndOfStream only receives all messages from this point forward.
var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, PartitionReceiver.EndOfStream);

// All messages available
var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, "-1");

// From one day ago
var receiver = eventHubClient.CreateReceiver(PartitionReceiver.DefaultConsumerGroupName, partitionId, DateTime.Now.AddDays(-1));

使用事件Consume an event

// Receive a maximum of 100 messages in this call to ReceiveAsync
var ehEvents = await receiver.ReceiveAsync(100);
// ReceiveAsync can return null if there are no messages
if (ehEvents != null)
{
    // Since ReceiveAsync can return more than a single event you will need a loop to process
    foreach (var ehEvent in ehEvents)
    {
        // Decode the byte array segment
        var message = UnicodeEncoding.UTF8.GetString(ehEvent.Body.Array);
        // Load the custom property that we set in the send example
        var customType = ehEvent.Properties["Type"];
        // Implement processing logic here
    }
}       

事件处理程序主机 APIEvent Processor Host APIs

这些 API 通过在可用工作进程之间分布分区,为可能变为不可用的工作进程提供复原能力:These APIs provide resiliency to worker processes that may become unavailable, by distributing partitions across available workers:

// Checkpointing is done within the SimpleEventProcessor and on a per-consumerGroup per-partition basis, workers resume from where they last left off.

// Read these connection strings from a secure location
var ehConnectionString = "{Event Hubs connection string}";
var ehEntityPath = "{event hub path/name}";
var storageConnectionString = "{Storage connection string}";
var storageContainerName = "{Storage account container name}";

var eventProcessorHost = new EventProcessorHost(
    ehEntityPath,
    PartitionReceiver.DefaultConsumerGroupName,
    ehConnectionString,
    storageConnectionString,
    storageContainerName);

// Start/register an EventProcessorHost
await eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>();

// Disposes the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();

以下是 IEventProcessor 接口的示例实现:The following is a sample implementation of the IEventProcessor interface:

public class SimpleEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
        return Task.CompletedTask;
    }

    public Task OpenAsync(PartitionContext context)
    {
        Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
        return Task.CompletedTask;
    }

    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
        Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
        return Task.CompletedTask;
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var eventData in messages)
        {
            var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
            Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
        }

        return context.CheckpointAsync();
    }
}

后续步骤Next steps

若要了解有关事件中心方案的详细信息,请访问以下链接:To learn more about Event Hubs scenarios, visit these links:

下面提供了 .NET API 参考:The .NET API references are here: