跨应用程序的多个实例均衡分区负载Balance partition load across multiple instances of your application

若要缩放事件处理应用程序,可以运行应用程序的多个实例,并让这些实例自行进行负载均衡。To scale your event processing application, you can run multiple instances of the application and have it balance the load among themselves. 在旧版本中,EventProcessorHost 允许在接收检查点事件时,在程序的多个实例与这些事件之间进行负载均衡。In the older versions, EventProcessorHost allowed you to balance the load between multiple instances of your program and checkpoint events when receiving. 在新版本(5.0 或以上)中,EventProcessorClient(.NET 和 Java)或 EventHubConsumerClient(Python 和 JavaScript)允许执行相同的操作。In the newer versions (5.0 onwards), EventProcessorClient (.NET and Java), or EventHubConsumerClient (Python and JavaScript) allows you to do the same. 使用事件使开发模型变得更简单。The development model is made simpler by using events. 通过注册事件处理程序来订阅你感兴趣的事件。You subscribe to the events that you're interested in by registering an event handler.

本文介绍使用多个实例从事件中心读取事件的示例方案,然后提供有关事件处理器客户端功能的详细信息。使用该客户端可以一次性从多个分区接收事件,并通过其他使用同一事件中心和使用者组的使用者进行负载均衡。This article describes a sample scenario for using multiple instances to read events from an event hub and then give you details about features of event processor client, which allows you to receive events from multiple partitions at once and load balance with other consumers that use the same event hub and consumer group.

备注

缩放事件中心的关键在于分区使用者的思路。The key to scale for Event Hubs is the idea of partitioned consumers. 竞争性使用者模式相比,分区使用者模式能够通过消除争用瓶颈和简化端到端的并行度,来实现较高的缩放度。In contrast to the competing consumers pattern, the partitioned consumer pattern enables high scale by removing the contention bottleneck and facilitating end to end parallelism.

示例方案Example scenario

作为示例方案,假设有一个家庭保安公司需要监控 100,000 个家庭。As an example scenario, consider a home security company that monitors 100,000 homes. 该公司每隔一分钟就要从每个家庭中安装的各种传感器(例如运动检测器、门/窗打开传感器、玻璃破碎检测器等)获取数据。Every minute, it gets data from various sensors such as a motion detector, door/window open sensor, glass break detector, and so on, installed in each home. 该公司为居民提供一个网站,让他们近乎实时地监控家庭中的活动。The company provides a web site for residents to monitor the activity of their home in near real time.

每个传感器将数据推送到事件中心。Each sensor pushes data to an event hub. 在事件中心配置了 16 个分区。The event hub is configured with 16 partitions. 在使用端,需要通过某个机制读取这些事件、合并事件(筛选器、聚合等)、将聚合数据转储到存储 Blob,然后投影到用户友好的网页上。On the consuming end, you need a mechanism that can read these events, consolidate them (filter, aggregate, and so on) and dump the aggregate to a storage blob, which is then projected to a user-friendly web page.

编写使用者应用程序Write the consumer application

在分布式环境中设计时使用者时,方案必须处理以下要求:When designing the consumer in a distributed environment, the scenario must handle the following requirements:

  1. 缩放: 创建多个使用者,每个使用者获取若干事件中心分区的读取所有权。Scale: Create multiple consumers, with each consumer taking ownership of reading from a few Event Hubs partitions.
  2. 负载均衡: 动态增加或减少使用者。Load balance: Increase or reduce the consumers dynamically. 例如,将新的传感器类型(例如一氧化碳检测器)添加到每个家庭后,事件数会增多。For example, when a new sensor type (for example, a carbon monoxide detector) is added to each home, the number of events increases. 在这种情况下,操作员(人类)会增加使用者实例的数目。In that case, the operator (a human) increases the number of consumer instances. 然后,使用者池可以重新均衡它们拥有的分区数,以便与新添加的使用者分担负载。Then, the pool of consumers can rebalance the number of partitions they own, to share the load with the newly added consumers.
  3. 故障时无缝恢复: 如果某个使用者(使用者 A)发生故障(例如,托管使用者的虚拟机突然崩溃),其他使用者能够拾取使用者 A 拥有的分区并继续。Seamless resume on failures: If a consumer (consumer A) fails (for example, the virtual machine hosting the consumer suddenly crashes), then other consumers can pick up the partitions owned by consumer A and continue. 此外,称作“检查点”或“偏移量”的延续点应该位于使用者 A 发生故障时的确切位置,或者略微在该位置的前面。 Also, the continuation point, called a checkpoint or offset, should be at the exact point at which consumer A failed, or slightly before that.
  4. 使用事件: 尽管前面三个要点能够应对使用者的管理,但还必须提供代码来使用事件并对其执行有用的操作。Consume events: While the previous three points deal with the management of the consumer, there must be code to consume the events and do something useful with it. 例如,聚合事件并将其上传到 Blob 存储。For example, aggregate it and upload it to blob storage.

事件处理器或使用者客户端Event processor or consumer client

无需生成自己的解决方案即可满足这些要求。You don't need to build your own solution to meet these requirements. Azure 事件中心 SDK 提供此功能。The Azure Event Hubs SDKs provide this functionality. 在 .NET 或 Java SDK 中使用事件处理器客户端 (EventProcessorClient),在 Python 和 JavaScript SDK 中使用 EventHubConsumerClient。In .NET or Java SDKs, you use an event processor client (EventProcessorClient), and in Python and JavaScript SDKs, you use EventHubConsumerClient. 在旧版 SDK 中,它是支持这些功能的事件处理器主机 (EventProcessorHost)。In the old version of SDK, it was the event processor host (EventProcessorHost) that supported these features.

对于大多数生产方案,我们建议使用事件处理器客户端来读取和处理事件。For the majority of production scenarios, we recommend that you use the event processor client for reading and processing events. 处理器客户端旨在提供可靠的体验用于以高效且容错的方式跨事件中心的所有分区处理事件,同时提供设置处理进度检查点的方式。The processor client is intended to provide a robust experience for processing events across all partitions of an event hub in a performant and fault tolerant manner while providing a means to checkpoint its progress. 在给定事件中心的使用者组上下文中,事件处理器客户端还能够以协作方式工作。Event processor clients are also capable of working cooperatively within the context of a consumer group for a given event hub. 当实例可用或不可用于组时,客户端会自动管理工作的分配和均衡。Clients will automatically manage distribution and balancing of work as instances become available or unavailable for the group.

分区所有权跟踪Partition ownership tracking

事件处理器实例通常拥有并处理来自一个或多个分区的事件。An event processor instance typically owns and processes events from one or more partitions. 分区所有权在与事件中心和使用者组的组合相关联的所有活动事件处理器实例之间均匀分配。Ownership of partitions is evenly distributed among all the active event processor instances associated with an event hub and consumer group combination.

每个事件处理器具有唯一的标识符,通过在检查点存储中添加或更新条目来声明分区的所有权。Each event processor is given a unique identifier and claims ownership of partitions by adding or updating an entry in a checkpoint store. 所有事件处理器实例定期与此存储通信,以更新自身的处理状态以及了解其他活动实例。All event processor instances communicate with this store periodically to update its own processing state as well as to learn about other active instances. 然后,使用此数据在活动处理器之间均衡负载。This data is then used to balance the load among the active processors. 新实例可以加入处理池以进行纵向扩展。New instances can join the processing pool to scale up. 当实例发生故障时(无论是由于故障还是纵向缩减),分区所有权都会正常转移到其他活动处理器。When instances go down, either due to failures or to scale down, partition ownership is gracefully transferred to other active processors.

检查点存储中的分区所有权记录会跟踪事件中心命名空间、事件中心名称、使用者组、事件处理器标识符(也称为所有者)、分区 ID 和上次修改时间。Partition ownership records in the checkpoint store keep track of Event Hubs namespace, event hub name, consumer group, event processor identifier (also known as owner), partition ID and the last modified time.

事件中心命名空间Event Hubs namespace 事件中心名称Event Hub name 使用者组Consumer group 所有者Owner 分区 IDPartition ID 上次修改时间Last modified time
mynamespace.servicebus.chinacloudapi.cnmynamespace.servicebus.chinacloudapi.cn myeventhubmyeventhub myconsumergroupmyconsumergroup 3be3f9d3-9d9e-4c50-9491-85ece8334ff63be3f9d3-9d9e-4c50-9491-85ece8334ff6 00 2020-01-15T01:22:152020-01-15T01:22:15
mynamespace.servicebus.chinacloudapi.cnmynamespace.servicebus.chinacloudapi.cn myeventhubmyeventhub myconsumergroupmyconsumergroup f5cc5176-ce96-4bb4-bbaa-a0e3a9054ecff5cc5176-ce96-4bb4-bbaa-a0e3a9054ecf 11 2020-01-15T01:22:172020-01-15T01:22:17
mynamespace.servicebus.chinacloudapi.cnmynamespace.servicebus.chinacloudapi.cn myeventhubmyeventhub myconsumergroupmyconsumergroup 72b980e9-2efc-4ca7-ab1b-ffd7bece847272b980e9-2efc-4ca7-ab1b-ffd7bece8472 22 2020-01-15T01:22:102020-01-15T01:22:10
解码的字符::
解码的字符::
mynamespace.servicebus.chinacloudapi.cnmynamespace.servicebus.chinacloudapi.cn myeventhubmyeventhub myconsumergroupmyconsumergroup 844bd8fb-1f3a-4580-984d-6324f9e208af844bd8fb-1f3a-4580-984d-6324f9e208af 1515 2020-01-15T01:22:002020-01-15T01:22:00

每个事件处理器实例获取分区的所有权,并从上一个已知的[检查点](# Checkpointing)开始处理分区。Each event processor instance acquires ownership of a partition and starts processing the partition from last known [checkpoint](# Checkpointing). 如果某个处理器出现故障(VM 关闭),其他实例将通过查看上次修改时间来检测此情况。If a processor fails (VM shuts down), then other instances detect this by looking at the last modified time. 其他实例尝试获取之前由非活动实例所拥有的分区所有权,检查点存储保证只有一个实例能够成功声明分区所有权。Other instances try to get ownership of the partitions previously owned by the inactive instance, and the checkpoint store guarantees that only one of the instances succeeds in claiming ownership of a partition. 因此,在任意给定时间点,最多只有一个处理器从分区接收事件。So, at any given point of time, there is at most one processor receiving events from a partition.

接收消息Receive messages

创建事件处理器时,需要指定用于处理事件和错误的函数。When you create an event processor, you specify the functions that will process events and errors. 每次调用处理事件的函数都会从特定的分区传送单个事件。Each call to the function that processes events delivers a single event from a specific partition. 你需要负责处理此事件。It's your responsibility to handle this event. 若要确保使用者将每条消息至少处理一次,需要编写自己的包含重试逻辑的代码。If you want to make sure the consumer processes every message at least once, you need to write your own code with retry logic. 但请注意有害消息。But be cautious about poisoned messages.

我们建议以相对较快的速度执行操作。We recommend that you do things relatively fast. 也就是说,尽量减少处理量。That is, do as little processing as possible. 如果需要写入存储并执行某种路由,最好是使用两个使用者组和两个事件处理器。If you need to write to storage and do some routing, it's better to use two consumer groups and have two event processors.

检查点Checkpointing

检查点是事件处理器标记或提交上次在分区中成功处理的事件位置的过程。Checkpointing is a process by which an event processor marks or commits the position of the last successfully processed event within a partition. 标记检查点通常在处理事件的函数内部执行,并在使用者组中按分区进行。Marking a checkpoint is typically done within the function that processes the events and occurs on a per-partition basis within a consumer group.

如果事件处理器从分区断开连接,另一个实例可以在检查点位置继续处理该分区,该检查点是以前由该使用者组中该分区的最后一个处理器提交的。If an event processor disconnects from a partition, another instance can resume processing the partition at the checkpoint that was previously committed by the last processor of that partition in that consumer group. 当处理器建立连接时,它会将此偏移量传递给事件中心,以指定要从其开始读取数据的位置。When the processor connects, it passes the offset to the event hub to specify the location at which to start reading. 这样,你便可以使用检查点将事件标记为已由下游应用程序“完成”,并在事件处理器出现故障时提供复原能力。In this way, you can use checkpointing to both mark events as "complete" by downstream applications and to provide resiliency when an event processor goes down. 若要返回到较旧的数据,可以在此检查点过程中指定较低的偏移量。It is possible to return to older data by specifying a lower offset from this checkpointing process.

执行检查点将事件标记为已处理时,将会根据事件偏移量和序列号在检查点存储中添加或更新某个条目。When the checkpoint is performed to mark an event as processed, an entry in checkpoint store is added or updated with the event's offset and sequence number. 用户应确定检查点的更新频率。Users should decide the frequency of updating the checkpoint. 每次成功处理事件之后进行更新可能会影响性能和成本,因为这会对底层检查点存储触发写入操作。Updating after each successfully processed event can have performance and cost implications as it triggers a write operation to the underlying checkpoint store. 另外,为每个事件设置检查点意味着对排队消息传递模式使用服务总线队列可能比使用事件中心更好。Also, checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus queue might be a better option than an event hub. 事件中心背后的理念是大规模实现“至少一次”传递。The idea behind Event Hubs is that you get "at least once" delivery at great scale. 将下游系统指定为幂等,即可方便地在发生故障或重启(导致多次收到相同的事件)后恢复。By making your downstream systems idempotent, it is easy to recover from failures or restarts that result in the same events being received multiple times.

备注

如果你在一个环境中使用 Azure Blob 存储作为检查点存储,该环境支持与 Azure 上通常可用的存储 Blob SDK 版本不同的版本,那么你需要使用代码将存储服务 API 版本更改为该环境支持的特定版本。If you are using Azure Blob Storage as the checkpoint store in an environment that supports a different version of Storage Blob SDK than those typically available on Azure, you'll need to use code to change the Storage service API version to the specific version supported by that environment. 例如,如果在 Azure Stack Hub 版本 2002 上运行事件中心,则存储服务的最高可用版本为 2017-11-09。For example, if you are running Event Hubs on an Azure Stack Hub version 2002, the highest available version for the Storage service is version 2017-11-09. 在这种情况下,需要使用代码将存储服务 API 版本设定为 2017-11-09。In this case, you need to use code to target the Storage service API version to 2017-11-09. 如需通过示例来了解如何以特定的存储 API 版本为目标,请参阅“GitHub 上的这些示例”:For an example on how to target a specific Storage API version, see these samples on GitHub:

线程安全性和处理程序实例Thread safety and processor instances

默认情况下,将会针对给定分区按顺序调用处理事件的函数。By default, the function that processes the events is called sequentially for a given partition. 当事件泵继续在后台的其他线程上运行时,来自同一分区的后续事件和对该函数的调用将在后台排队。Subsequent events and calls to this function from the same partition queue up behind the scenes as the event pump continues to run in the background on other threads. 请注意,可以同时处理来自不同分区的事件,但必须同步跨分区访问的任何共享状态。Note that events from different partitions can be processed concurrently and any shared state that is accessed across partitions have to be synchronized.

后续步骤Next steps

参阅以下快速入门:See the following quick starts: