事件中心 .NET Framework API 概述Event Hubs .NET Framework API overview

本文汇总了一些重要的 Azure 事件中心 .NET Framework 客户端 APIThis article summarizes some of the key Azure Event Hubs .NET Framework client APIs. 有两个类别:管理 API 和运行时 API。There are two categories: management and run-time APIs. 运行时 API 包括发送和接收消息所需的全部操作。Run-time APIs consist of all operations needed to send and receive a message. 借助管理操作,可以通过创建、更新和删除实体来管理事件中心实体状态。Management operations enable you to manage an Event Hubs entity state by creating, updating, and deleting entities.

监视方案跨越管理和运行时。Monitoring scenarios span both management and run-time. 有关 .NET API 的详细参考文档,请参阅 .NET Framework.NET StandardEventProcessorHost API 参考。For detailed reference documentation on the .NET APIs, see the .NET Framework, .NET Standard, and EventProcessorHost API references.

管理 APIManagement APIs

若要执行以下管理操作,必须对事件中心命名空间具有 管理 权限:To perform the following management operations, you must have Manage permissions on the Event Hubs namespace:

创建Create

// Create the event hub
var ehd = new EventHubDescription(eventHubName);
ehd.PartitionCount = SampleManager.numPartitions;
await namespaceManager.CreateEventHubAsync(ehd);

更新Update

var ehd = await namespaceManager.GetEventHubAsync(eventHubName);

// Create a customer SAS rule with Manage permissions
ehd.UserMetadata = "Some updated info";
var ruleName = "myeventhubmanagerule";
var ruleKey = SharedAccessAuthorizationRule.GenerateRandomKey();
ehd.Authorization.Add(new SharedAccessAuthorizationRule(ruleName, ruleKey, new AccessRights[] {AccessRights.Manage, AccessRights.Listen, AccessRights.Send} )); 
await namespaceManager.UpdateEventHubAsync(ehd);

DeleteDelete

await namespaceManager.DeleteEventHubAsync("event hub name");

运行时 APIRun-time APIs

创建发布者Create publisher

// EventHubClient model (uses implicit factory instance, so all links on same connection)
var eventHubClient = EventHubClient.Create("event hub name");

发布消息Publish message

// Create the device/temperature metric
var info = new MetricEvent() { DeviceId = random.Next(SampleManager.NumDevices), Temperature = random.Next(100) };
var data = new EventData(new byte[10]); // Byte array
var data = new EventData(Stream); // Stream 
var data = new EventData(info, serializer) //Object and serializer 
{
    PartitionKey = info.DeviceId.ToString()
};

// Set user properties if needed
data.Properties.Add("Type", "Telemetry_" + DateTime.Now.ToLongTimeString());

// Send single message async
await client.SendAsync(data);

创建使用者Create consumer

// Create the Event Hubs client
var eventHubClient = EventHubClient.Create(EventHubName);

// Get the default consumer group
var defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup();

// All messages
var consumer = await defaultConsumerGroup.CreateReceiverAsync(partitionId: index);

// From one day ago
var consumer = await defaultConsumerGroup.CreateReceiverAsync(partitionId: index, startingDateTimeUtc:DateTime.Now.AddDays(-1));

// From specific offset, -1 means oldest
var consumer = await defaultConsumerGroup.CreateReceiverAsync(partitionId: index,startingOffset:-1); 

使用消息Consume message

var message = await consumer.ReceiveAsync();

// Provide a serializer
var info = message.GetBody<Type>(Serializer)

// Get a byte[]
var info = message.GetBytes(); 
msg = UnicodeEncoding.UTF8.GetString(info);

事件处理程序主机 APIEvent Processor Host APIs

这些 API 通过在可用工作进程之间分布分区,为可能变为不可用的工作进程提供复原能力。These APIs provide resiliency to worker processes that may become unavailable, by distributing partitions across available workers.

// Checkpointing is done within the SimpleEventProcessor and on a per-consumerGroup per-partition basis, workers resume from where they last left off.
// Use the EventData.Offset value for checkpointing yourself, this value is unique per partition.

var eventHubConnectionString = System.Configuration.ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"];
var blobConnectionString = System.Configuration.ConfigurationManager.AppSettings["AzureStorageConnectionString"]; // Required for checkpoint/state

var eventHubDescription = new EventHubDescription(EventHubName);
var host = new EventProcessorHost(WorkerName, EventHubName, defaultConsumerGroup.GroupName, eventHubConnectionString, blobConnectionString);
await host.RegisterEventProcessorAsync<SimpleEventProcessor>();

// To close
await host.UnregisterEventProcessorAsync();

IEventProcessor 接口定义如下:The IEventProcessor interface is defined as follows:

public class SimpleEventProcessor : IEventProcessor
{
    IDictionary<string, string> map;
    PartitionContext partitionContext;

    public SimpleEventProcessor()
    {
        this.map = new Dictionary<string, string>();
    }

    public Task OpenAsync(PartitionContext context)
    {
        this.partitionContext = context;

        return Task.FromResult<object>(null);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (EventData message in messages)
        {
            // Process messages here
        }

        // Checkpoint when appropriate
        await context.CheckpointAsync();

    }

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }
}

后续步骤Next steps

若要了解有关事件中心方案的详细信息,请访问以下链接:To learn more about Event Hubs scenarios, visit these links:

下面提供了 .NET API 参考:The .NET API references are here: