事件处理程序主机Event processor host

备注

本文适用于旧版 Azure 事件中心 SDK。This article applies to the old version of Azure Event Hubs SDK. 若要了解如何将代码迁移到新版 SDK,请参阅以下迁移指南。To learn how to migrate your code to the newer version of the SDK, see these migration guides.

另请参阅跨应用程序的多个实例均衡分区负载Also, see Balance partition load across multiple instances of your application.

Azure 事件中心是强大的遥测引入服务,使用它能以较低的成本流式传输数百万个事件。Azure Event Hubs is a powerful telemetry ingestion service that can be used to stream millions of events at low cost. 本文介绍如何通过事件处理程序主机 (EPH) 使用引用的事件;EPH 是一个智能使用者代理,可以简化检查点、租用和并行事件读取器的管理。This article describes how to consume ingested events using the Event Processor Host (EPH); an intelligent consumer agent that simplifies the management of checkpointing, leasing, and parallel event readers.

缩放事件中心的关键在于分区使用者的思路。The key to scale for Event Hubs is the idea of partitioned consumers. 竞争性使用者模式相比,分区使用者模式能够通过消除争用瓶颈和简化端到端的并行度,来实现较高的缩放度。In contrast to the competing consumers pattern, the partitioned consumer pattern enables high scale by removing the contention bottleneck and facilitating end to end parallelism.

家庭保安方案Home security scenario

作为示例方案,假设有一个家庭保安公司需要监控 100,000 个家庭。As an example scenario, consider a home security company that monitors 100,000 homes. 该公司每隔一分钟就要从每个家庭中安装的各种传感器(例如运动检测器、门/窗打开传感器、玻璃破碎检测器等)获取数据。Every minute, it gets data from various sensors such as a motion detector, door/window open sensor, glass break detector, etc., installed in each home. 该公司为居民提供一个网站,让他们近乎实时地监控家庭中的活动。The company provides a web site for residents to monitor the activity of their home in near real time.

每个传感器将数据推送到事件中心。Each sensor pushes data to an event hub. 在事件中心配置了 16 个分区。The event hub is configured with 16 partitions. 在使用端,需要通过某个机制读取这些事件、合并事件(筛选器、聚合等)、将聚合数据转储到存储 Blob,然后投影到用户友好的网页上。On the consuming end, you need a mechanism that can read these events, consolidate them (filter, aggregate, etc.) and dump the aggregate to a storage blob, which is then projected to a user-friendly web page.

编写使用者应用程序Write the consumer application

在分布式环境中设计时使用者时,方案必须处理以下要求:When designing the consumer in a distributed environment, the scenario must handle the following requirements:

  1. 缩放: 创建多个使用者,每个使用者获取若干事件中心分区的读取所有权。Scale: Create multiple consumers, with each consumer taking ownership of reading from a few Event Hubs partitions.
  2. 负载均衡: 动态增加或减少使用者。Load balance: Increase or reduce the consumers dynamically. 例如,将新的传感器类型(例如一氧化碳检测器)添加到每个家庭后,事件数会增多。For example, when a new sensor type (for example, a carbon monoxide detector) is added to each home, the number of events increases. 在这种情况下,操作员(人类)会增加使用者实例的数目。In that case, the operator (a human) increases the number of consumer instances. 然后,使用者池可以重新均衡它们拥有的分区数,以便与新添加的使用者分担负载。Then, the pool of consumers can rebalance the number of partitions they own, to share the load with the newly added consumers.
  3. 故障时无缝恢复: 如果某个使用者(使用者 A)发生故障(例如,托管使用者的虚拟机突然崩溃),其他使用者必须能够拾取使用者 A 拥有的分区并继续 。Seamless resume on failures: If a consumer (consumer A) fails (for example, the virtual machine hosting the consumer suddenly crashes), then other consumers must be able to pick up the partitions owned by consumer A and continue. 此外,称作“检查点”或“偏移量”的延续点应该位于使用者 A 发生故障时的确切位置,或者略微在该位置的前面。 Also, the continuation point, called a checkpoint or offset, should be at the exact point at which consumer A failed, or slightly before that.
  4. 使用事件: 尽管前面三个要点能够应对使用者的管理,但还必须提供代码来使用事件并对其执行有用的操作;例如,聚合事件并将其上传到 Blob 存储。Consume events: While the previous three points deal with the management of the consumer, there must be code to consume the events and do something useful with it; for example, aggregate it and upload it to blob storage.

你无需为此生成自己的解决方案,事件中心会通过 IEventProcessor 接口和 EventProcessorHost 类提供此功能。Instead of building your own solution for this, Event Hubs provides this functionality through the IEventProcessor interface and the EventProcessorHost class.

IEventProcessor 接口IEventProcessor interface

首先,使用事件的应用程序会实现 IEventProcessor 接口,该接口有四种方法:OpenAsync、CloseAsync、ProcessErrorAsync 和 ProcessEventsAsyncFirst, consuming applications implement the IEventProcessor interface, which has four methods: OpenAsync, CloseAsync, ProcessErrorAsync, and ProcessEventsAsync. 此接口包含实际的代码用于使用事件中心发送的事件。This interface contains the actual code to consume the events that Event Hubs sends. 以下代码演示了一个简单的实现:The following code shows a simple implementation:

public class SimpleEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
       Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
       return Task.CompletedTask;
    }

    public Task OpenAsync(PartitionContext context)
    {
       Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
       return Task.CompletedTask;
     }

    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
       Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
       return Task.CompletedTask;
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
       foreach (var eventData in messages)
       {
          var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
             Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
       }
       return context.CheckpointAsync();
    }
}

接下来,实例化 EventProcessorHost 实例。Next, instantiate an EventProcessorHost instance. 根据具体的重载,在构造函数中创建 EventProcessorHost 实例时,将使用以下参数:Depending on the overload, when creating the EventProcessorHost instance in the constructor, the following parameters are used:

  • hostName: 每个使用者实例的名称。hostName: the name of each consumer instance. EventProcessorHost 的每个实例必须在使用者组中对此变量使用唯一值,因此,请不要对此值进行硬编码。Each instance of EventProcessorHost must have a unique value for this variable within a consumer group, so don't hard code this value.
  • eventHubPath: 事件中心的名称。eventHubPath: The name of the event hub.
  • consumerGroupName: 事件中心使用 $Default 作为默认使用者组的名称,但合理的做法是创建一个使用者组,以进行特定方面的处理。consumerGroupName: Event Hubs uses $Default as the name of the default consumer group, but it is a good practice to create a consumer group for your specific aspect of processing.
  • eventHubConnectionString: 事件中心的连接字符串,可从 Azure 门户中检索。eventHubConnectionString: The connection string to the event hub, which can be retrieved from the Azure portal. 此连接字符串应该对事件中心拥有“侦听”权限。This connection string should have Listen permissions on the event hub.
  • storageConnectionString: 用于内部资源管理的存储帐户。storageConnectionString: The storage account used for internal resource management.

最后,使用者将 EventProcessorHost 实例注册到事件中心服务。Finally, consumers register the EventProcessorHost instance with the Event Hubs service. 向 EventProcessorHost 实例注册事件处理程序类会启动事件处理。Registering an event processor class with an instance of EventProcessorHost starts event processing. 注册操作告知事件中心服务预期使用者应用会使用其某些分区发送的事件,并且每当推送要使用的事件时,都要调用 IEventProcessor 实现代码。Registering instructs the Event Hubs service to expect that the consumer app consumes events from some of its partitions, and to invoke the IEventProcessor implementation code whenever it pushes events to consume.

示例Example

例如,假设有 5 个虚拟机 (VM) 专门用于使用事件,每个 VM 中的简单控制台应用程序执行实际的使用工作。As an example, imagine that there are 5 virtual machines (VMs) dedicated to consuming events, and a simple console application in each VM, which does the actual consumption work. 每个控制台应用程序创建一个 EventProcessorHost 实例,并将其注册到事件中心服务。Each console application then creates one EventProcessorHost instance and registers it with the Event Hubs service.

在此示例场景中,我们假设已将 16 个分区分配到了 5 个 EventProcessorHost 实例。In this example scenario, let's say that 16 partitions are allocated to the 5 EventProcessorHost instances. 某些 EventProcessorHost 实例拥有的分区可能比其他实例要多。Some EventProcessorHost instances might own a few more partitions than others. 对于 EventProcessorHost 实例拥有的每个分区,该应用程序会创建 SimpleEventProcessor 类的实例。For each partition that an EventProcessorHost instance owns, it creates an instance of the SimpleEventProcessor class. 因此,总共有 16 个 SimpleEventProcessor 实例,并有一个实例分配到了每个分区。Therefore, there are 16 instances of SimpleEventProcessor overall, with one assigned to each partition.

以下列表对此示例做了总结:The following list summarizes this example:

  • 16 个事件中心分区。16 Event Hubs partitions.
  • 5 个 VM,每个 VM 中有 1 个使用者应用(例如 Consumer.exe)。5 VMs, 1 consumer app (for example, Consumer.exe) in each VM.
  • Consumer.exe 注册的 5 个 EPH 实例,每个 VM 中各注册了 1 个。5 EPH instances registered, 1 in each VM by Consumer.exe.
  • 5 个 EPH 实例创建的 16 个 SimpleEventProcessor 对象。16 SimpleEventProcessor objects created by the 5 EPH instances.
  • 1 个 Consumer.exe 应用可能包含 4 个 SimpleEventProcessor 对象,因为 1 个 EPH 实例可以拥有 4 个分区。1 Consumer.exe app might contain 4 SimpleEventProcessor objects, since the 1 EPH instance may own 4 partitions.

分区所有权跟踪Partition ownership tracking

通过提供用于跟踪的 Azure 存储帐户来跟踪 EPH 实例(或使用者)的分区所有权。Ownership of a partition to an EPH instance (or a consumer) is tracked through the Azure Storage account that is provided for tracking. 可以通过一个简单的表将跟踪信息可视化,如下所示。You can visualize the tracking as a simple table, as follows. 可以通过检查所提供的存储帐户下的 Blob 来查看实际的实现:You can see the actual implementation by examining the blobs under the Storage account provided:

使用者组名称Consumer group name 分区 IDPartition ID 主机名(所有者)Host name (owner) 租约(或所有权)获取时间Lease (or ownership) acquired time 分区(检查点)中的偏移量Offset in partition (checkpoint)
$Default$Default 00 Consumer_VM3Consumer_VM3 2018-04-15T01:23:452018-04-15T01:23:45 156156
$Default$Default 11 Consumer_VM4Consumer_VM4 2018-04-15T01:22:132018-04-15T01:22:13 734734
$Default$Default 22 Consumer_VM0Consumer_VM0 2018-04-15T01:22:562018-04-15T01:22:56 122122
解码的字符::
解码的字符::
$Default$Default 1515 Consumer_VM3Consumer_VM3 2018-04-15T01:22:562018-04-15T01:22:56 976976

此处,每个主机按特定的持续时间(租约持续时间)获取分区所有权。Here, each host acquires ownership of a partition for a certain duration (the lease duration). 如果某个主机发生故障(VM 关闭),则租约将会过期。If a host fails (VM shuts down), then the lease expires. 其他主机尝试获取分区所有权,其中一个主机会成功。Other hosts try to get ownership of the partition, and one of the hosts succeeds. 此过程会重置具有新所有者的分区上的租约。This process resets the lease on the partition with a new owner. 这样,每次只会有一个读取者可以从使用者组中任意给定的分区读取事件。This way, only a single reader at a time can read from any given partition within a consumer group.

接收消息Receive messages

每次调用 ProcessEventsAsync 都会提供事件的集合。Each call to ProcessEventsAsync delivers a collection of events. 你需要负责处理这些事件。It is your responsibility to handle these events. 如果要确保处理器主机将每条消息至少处理一次,则需要编写自己的继续重试代码。If you want to make sure the processor host processes every message at least once, you need to write your own keep retrying code. 但请注意有害消息。But be cautious about poisoned messages.

建议以相对较快的速度执行操作;也就是说,尽量减少处理量。It is recommended that you do things relatively fast; that is, do as little processing as possible. 改用使用者组。Instead, use consumer groups. 如果需要写入存储并执行某种路由,最好是使用两个使用者组,并使用两个可以单独运行的 IEventProcessor 实现。If you need to write to storage and do some routing, it is better to use two consumer groups and have two IEventProcessor implementations that run separately.

在处理过程中的某个阶段,你可能想要跟踪已读取和已完成哪些信息。At some point during your processing, you might want to keep track of what you have read and completed. 如果必须重新开始读取,以免返回到流的开头,则保持跟踪至关重要。Keeping track is critical if you must restart reading, so you don't return to the beginning of the stream. EventProcessorHost 使用检查点简化了这种跟踪。EventProcessorHost simplifies this tracking by using checkpoints. 检查点是给定使用者组中给定分区的位置或偏移量,你希望在此位置处理消息。A checkpoint is a location, or offset, for a given partition, within a given consumer group, at which point you are satisfied that you have processed the messages. EventProcessorHost 中标记检查点的过程是通过在 PartitionContext 对象中调用 CheckpointAsync 方法实现的。Marking a checkpoint in EventProcessorHost is accomplished by calling the CheckpointAsync method on the PartitionContext object. 此操作是在 ProcessEventsAsync 方法中完成,但也可以在 CloseAsync 中完成。This operation is done within the ProcessEventsAsync method but can also be done in CloseAsync.

检查点Checkpointing

CheckpointAsync 方法有两个重载:首先,是在不使用任何参数的情况下,将检查点设置为 ProcessEventsAsync 返回的集合中最高的事件偏移量。The CheckpointAsync method has two overloads: the first, with no parameters, checkpoints to the highest event offset within the collection returned by ProcessEventsAsync. 此偏移量是一个“高水印”;它假设在调用此方法时已处理所有最近事件。This offset is a "high water" mark; it assumes you have processed all recent events when you call it. 如果按这种方式使用此方法,请注意,应在返回其他事件处理代码之后再调用此方法。If you use this method in this way, be aware that you are expected to call it after your other event processing code has returned. 第二个重载用于指定要设置检查点的 EventData 实例。The second overload lets you specify an EventData instance to checkpoint. 通过此方法可以使用不同类型的水印来设置检查点。This method enables you to use a different type of watermark to checkpoint. 使用此水印可以实现一个“低水印”:已处理你确认的最低序列化事件。With this watermark, you can implement a "low water" mark: the lowest sequenced event you are certain has been processed. 提供此重载的目的是实现偏移量管理的灵活性。This overload is provided to enable flexibility in offset management.

执行检查点之后,会将一个包含分区特定信息(具体而言,是偏移量)的 JSON 文件写入到通过构造函数提供给 EventProcessorHost 的存储帐户。When the checkpoint is performed, a JSON file with partition-specific information (specifically, the offset), is written to the storage account supplied in the constructor to EventProcessorHost. 此文件不断更新。This file is continually updated. 请务必在上下文中考虑检查点 - 为每条消息设置检查点并不明智。It is critical to consider checkpointing in context - it would be unwise to checkpoint every message. 用于设置检查点的存储帐户可能不会处理此负载,但更重要的是,为每个事件设置检查点意味着对排队消息传递模式使用服务总线队列可能比使用事件中心更好。The storage account used for checkpointing probably would not handle this load, but more importantly checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus queue might be a better option than an event hub. 事件中心背后的理念是大规模实现“至少一次”传递。The idea behind Event Hubs is that you get "at least once" delivery at great scale. 将下游系统指定为幂等,即可方便地在发生故障或重启(导致多次收到相同的事件)后恢复。By making your downstream systems idempotent, it is easy to recover from failures or restarts that result in the same events being received multiple times.

线程安全性和处理程序实例Thread safety and processor instances

默认情况下,EventProcessorHost 是线程安全的,以相对于 IEventProcessor 实例的同步方式运行。By default, EventProcessorHost is thread safe and behaves in a synchronous manner with respect to the instance of IEventProcessor. 当事件抵达某个分区时,会在 IEventProcessor 实例上针对该分区调用 ProcessEventsAsync,并阻止针对该分区进一步调用 ProcessEventsAsyncWhen events arrive for a partition, ProcessEventsAsync is called on the IEventProcessor instance for that partition and will block further calls to ProcessEventsAsync for the partition. 后续消息和 ProcessEventsAsync 调用在幕后排队,因为消息泵持续在其他线程上后台运行。Subsequent messages and calls to ProcessEventsAsync queue up behind the scenes as the message pump continues to run in the background on other threads. 此线程安全性消除了线程安全集合的需要,并显著提高了性能。This thread safety removes the need for thread-safe collections and dramatically increases performance.

正常关闭Shut down gracefully

最后,EventProcessorHost.UnregisterEventProcessorAsync 能够干净关闭所有分区读取器,始终应该在关闭 EventProcessorHost 的实例时调用它。Finally, EventProcessorHost.UnregisterEventProcessorAsync enables a clean shutdown of all partition readers and should always be called when shutting down an instance of EventProcessorHost. 否则,在由于租约过期和时期冲突而启动 EventProcessorHost 的其他实例时可能导致延迟。Failure to do so can cause delays when starting other instances of EventProcessorHost due to lease expiration and Epoch conflicts. 本文的 Epoch 部分详细介绍了 Epoch 管理。Epoch management is covered in detail in the Epoch section of the article.

租约管理Lease management

向 EventProcessorHost 实例注册事件处理程序类会启动事件处理。Registering an event processor class with an instance of EventProcessorHost starts event processing. 主机实例租用事件中心的一些分区,可能会从其他主机实例中获取一些租用,以实现跨所有主机实例均匀分布分区。The host instance obtains leases on some partitions of the Event Hub, possibly grabbing some from other host instances, in a way that converges on an even distribution of partitions across all host instances. 对于每个租用分区,主机实例先创建所提供事件处理程序类的实例,再从相应分区接收事件,并将它们传递给事件处理程序实例。For each leased partition, the host instance creates an instance of the provided event processor class, then receives events from that partition, and passes them to the event processor instance. 随着添加的实例和获取的租用变多,EventProcessorHost 最终会均衡所有使用者之间的负载。As more instances get added and more leases are grabbed, EventProcessorHost eventually balances the load among all consumers.

如前所述,跟踪表大大简化了 EventProcessorHost.UnregisterEventProcessorAsync 的自动缩放性。As explained previously, the tracking table greatly simplifies the autoscale nature of EventProcessorHost.UnregisterEventProcessorAsync. EventProcessorHost 的实例启动时,会尽可能多地获取租约,并开始读取事件。As an instance of EventProcessorHost starts, it acquires as many leases as possible, and begins reading events. 当租约即将过期时,EventProcessorHost 会尝试通过预留来续订租约。As the leases near expiration, EventProcessorHost attempts to renew them by placing a reservation. 如果租约可续订,则处理程序会继续读取,但如果租约不可续订,则会关闭读取器并调用 CloseAsyncIf the lease is available for renewal, the processor continues reading, but if it is not, the reader is closed and CloseAsync is called. 很适合在调用 CloseAsync 时针对该分区执行任何最终清理。CloseAsync is a good time to perform any final cleanup for that partition.

EventProcessorHost 包含 PartitionManagerOptions 属性。EventProcessorHost includes a PartitionManagerOptions property. 使用此属性可以控制租约管理。This property enables control over lease management. 请在注册 IEventProcessor 实现之前设置这些选项。Set these options before registering your IEventProcessor implementation.

控制事件处理程序主机选项Control Event Processor Host options

此外,RegisterEventProcessorAsync 的一个重载采用 EventProcessorOptions 对象作为参数。Additionally, one overload of RegisterEventProcessorAsync takes an EventProcessorOptions object as a parameter. 使用此参数可以控制 EventProcessorHost.UnregisterEventProcessorAsync 本身的行为。Use this parameter to control the behavior of EventProcessorHost.UnregisterEventProcessorAsync itself. EventProcessorOptions 定义四个属性和一个事件:EventProcessorOptions defines four properties and one event:

  • MaxBatchSize:要在 ProcessEventsAsync 调用中接收的集合的最大大小。MaxBatchSize: The maximum size of the collection you want to receive in an invocation of ProcessEventsAsync. 这不是最小大小,而只是最大大小。This size is not the minimum, only the maximum size. 如果要接收的消息较少,将使用可用数量的消息执行 ProcessEventsAsyncIf there are fewer messages to be received, ProcessEventsAsync executes with as many as were available.
  • PrefetchCount:底层 AMQP 通道在确定客户端应接收的消息数上限时使用的值。PrefetchCount: A value used by the underlying AMQP channel to determine the upper limit of how many messages the client should receive. 此值应大于或等于 MaxBatchSizeThis value should be greater than or equal to MaxBatchSize.
  • InvokeProcessorAfterReceiveTimeout:如果此参数为 true,则当接收分区中事件的底层调用超时时,会调用 ProcessEventsAsync。在分区不活动期间,可以使用此方法执行基于时间的操作。InvokeProcessorAfterReceiveTimeout: If this parameter is true, ProcessEventsAsync is called when the underlying call to receive events on a partition times out. This method is useful for taking time-based actions during periods of inactivity on the partition.
  • InitialOffsetProvider:用于设置函数指针或 lambda 表达式,当读取器开始读取分区时,可以调用该函数或表达式来提供初始偏移量。InitialOffsetProvider: Enables a function pointer or lambda expression to be set, which is called to provide the initial offset when a reader begins reading a partition. 如果未指定此偏移量,读取器会从最旧的事件开始,除非已在提供给 EventProcessorHost 构造函数的存储帐户中保存了包含偏移量的 JSON 文件。Without specifying this offset, the reader starts at the oldest event, unless a JSON file with an offset has already been saved in the storage account supplied to the EventProcessorHost constructor. 若要更改读取器的启动行为,此方法很有用。This method is useful when you want to change the behavior of the reader startup. 调用此方法时,对象参数将包含正在为其启动读取器的分区 ID。When this method is invoked, the object parameter contains the partition ID for which the reader is being started.
  • ExceptionReceivedEventArgs:用于接收 EventProcessorHost 中发生的任何底层异常的通知。ExceptionReceivedEventArgs: Enables you to receive notification of any underlying exceptions that occur in EventProcessorHost. 如果操作未按预期进行,很适合从此事件开始调查。If things are not working as you expect, this event is a good place to start looking.

EpochEpoch

下面是接收 Epoch 的工作原理:Here is how the receive epoch works:

使用 EpochWith Epoch

Epoch 是服务用来强制实施分区/租约所有权的唯一标识符(Epoch 值)。Epoch is a unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. 使用 CreateEpochReceiver 方法创建基于 Epoch 的接收器。You create an Epoch-based receiver using the CreateEpochReceiver method. 此方法创建基于 Epoch 的接收器。This method creates an Epoch-based receiver. 该接收器是针对指定使用者组中的特定事件中心分区创建的。The receiver is created for a specific event hub partition from the specified consumer group.

Epoch 功能可让用户确保在任意时间点使用者组中只有一个接收器,并附带以下规则:The epoch feature provides users the ability to ensure that there is only one receiver on a consumer group at any point in time, with the following rules:

  • 如果使用者组中没有任何现有的接收器,则用户可以使用任何 Epoch 值创建接收器。If there is no existing receiver on a consumer group, the user can create a receiver with any epoch value.
  • 如果某个接收器的 Epoch 值为 e1,创建的新接收器的 Epoch 值为 e2,而 e1 <= e2,那么,使用 e1 值的接收器将自动断开连接,使用 e2 值的接收器将成功创建。If there is a receiver with an epoch value e1 and a new receiver is created with an epoch value e2 where e1 <= e2, the receiver with e1 will be disconnected automatically, receiver with e2 is created successfully.
  • 如果某个接收器的 Epoch 值为 e1,创建的新接收器的 Epoch 值为 e2,而 e1 > e2,那么,创建 e2 将会失败并出现错误:使用 Epoch e1 的接收器已存在。If there is a receiver with an epoch value e1 and a new receiver is created with an epoch value e2 where e1 > e2, then creation of e2 with fail with the error: A receiver with epoch e1 already exists.

无 EpochNo Epoch

使用 CreateReceiver 方法创建不是基于 Epoch 的接收器。You create a non-Epoch-based receiver using the CreateReceiver method.

在流处理中,用户有时想要在单个使用者组中创建多个接收器。There are some scenarios in stream processing where users would like to create multiple receivers on a single consumer group. 若要支持此类方案,我们确实可以创建一个不带 Epoch 的接收器;在本例中,我们最多允许在使用者组中创建 5 个并发的接收器。To support such scenarios, we do have ability to create a receiver without epoch and in this case we allow upto 5 concurrent receivers on the consumer group.

混合模式Mixed Mode

我们不建议这种应用方案:创建一个带有 Epoch 的接收器,然后在同一使用者组中切换为非 Epoch,反之亦然。We don’t recommend application usage where you create a receiver with epoch and then switch to no-epoch or vice-versa on the same consumer group. 但是,如果发生这种行为,服务将使用以下规则进行处理:However, when this behavior occurs, the service handles it using the following rules:

  • 如果已创建一个使用 Epoch e1 的接收器,并且该接收器正在接收事件;同时,创建的新接收器不带 Epoch,那么,创建新接收器的操作将会失败。If there is a receiver already created with epoch e1 and is actively receiving events and a new receiver is created with no epoch, the creation of new receiver will fail. Epoch 接收器始终在系统中优先。Epoch receivers always take precedence in the system.
  • 如果已创建一个使用 Epoch e1 的接收器,并且该接收器已断开连接;同时,在新 MessagingFactory 中创建的新接收器不带 Epoch,那么,创建新接收器的操作将会成功。If there was a receiver already created with epoch e1 and got disconnected, and a new receiver is created with no epoch on a new MessagingFactory, the creation of new receiver will succeed. 此处需要注意一点:系统将在大约 10 分钟后检测“接收器断开连接”。There is a caveat here that our system will detect the “receiver disconnection” after ~10 minutes.
  • 如果创建了一个或多个不带 Epoch 的接收器,并且创建了使用 Epoch e1 的新接收器,那么,所有旧接收器将断开连接。If there are one or more receivers created with no epoch, and a new receiver is created with epoch e1, all the old receivers get disconnected.

备注

我们建议对使用 epoch 的应用程序和不使用epoch 的应用程序使用不同的使用者组以避免出错。We recommend using different consumer groups for applications that use epochs and for those that do not use epochs to avoid errors.

后续步骤Next steps

熟悉事件处理程序主机后,请参阅以下文章来详细了解事件中心:Now that you're familiar with the Event Processor Host, see the following articles to learn more about Event Hubs: