基于 EventProcessorHost 消费 Azure 事件中心消息
Azure 事件中心支持多种消费消息的方式,而且也提供了不同语言的 SDK 以方便开发者调用,这里做一个汇总的介绍。
总的来说,有三种方式:
- 直接接收
- 使用事件处理程序主机(EventProcessorHost)
- 基于 Apache Storm
在直接接收的基础上,微软提供了一个更高级别的抽象方式,也就是使用 EventProcessorHost,它为事件处理器实现提供线程安全,多进程安全的运行时环境,该环境还能提供检查点和分区租用管理。使用方式如下:
实现 IEventProcessor 接口(假设 MyEventProcessor):
-
Task IEventProcessor.OpenAsync(PartitionContext context) { Logger.Info("Initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset); this.checkpointStopWatch = new Stopwatch(); this.checkpointStopWatch.Start(); return Task.FromResult<object>(null); }
-
async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) { Logger.Info("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason); if (reason == CloseReason.Shutdown) { await context.CheckpointAsync(); } }
-
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { // process events based on your own logic foreach (EventData eventData in messages) { string data = Encoding.UTF8.GetString(eventData.GetBytes()); Logger.Info(string.Format("Message received. Partition: '{0}', Data: '{1}'", context.Lease.PartitionId, data)); } //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) { await context.CheckpointAsync(); this.checkpointStopWatch.Restart(); } }
-
实例化 EventProcessorHost,并提供事件中心相应的参数:
var eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, eventHubConsumerGroupName, eventHubConnectionString, storageConnectionString);
调用 RegisterEventProcessorAsync 以将 IEventProcessor 的实现注册到运行时:
eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>().Wait();
比起前面直接接收方式,EventProcessorHost 方式不需要开发者写循环去接收消息,也不需要考虑连接哪个分区,只需要专注于对接收消息的处理,方便快捷很多。另外 EventProcessorHost 采用租约的方式来管理分区消息的读取,而获取租约采用的是“贪婪”算法,租约本身是有时间期限的,因此当有新主机加入或者某个主机挂掉的情况下,负载能被转移,建立平衡,最终达到负载均衡。同时,EventProcessorHost 还实现了基于 Azure 存储空间的检查点机制,它将按分区存储偏移量,使每一个消费者都能确定前一个消费者的最后读取位置,进一步的保证负载转移和故障转移。这种转移机制也可以保证消费端的自动缩放。