Azure Functions 的 Azure 事件中心绑定Azure Event Hubs bindings for Azure Functions

本文介绍如何使用适用于 Azure Functions 的 Azure 事件中心触发器。This article explains how to work with Azure Event Hubs trigger for Azure Functions. Azure Functions 支持事件中心的触发器和输出绑定Azure Functions supports trigger and output bindings for Event Hubs.

有关设置和配置详细信息,请参阅概述For information on setup and configuration details, see the overview.

使用函数触发器来响应发送到事件中心事件流的事件。Use the function trigger to respond to an event sent to an event hub event stream. 若要设置触发器,必须具有基础事件中心的读取访问权限。You must have read access to the underlying event hub to set up the trigger. 触发函数时,传递给函数的消息充当字符串类型。When the function is triggered, the message passed to the function is typed as a string.

扩展Scaling

事件触发的函数的每个实例由单个 EventProcessorHost 实例提供支持。Each instance of an event triggered function is backed by a single EventProcessorHost instance. 触发器(由事件中心提供支持)确保只有一个 EventProcessorHost 实例能够在给定分区上获得租约。The trigger (powered by Event Hubs) ensures that only one EventProcessorHost instance can get a lease on a given partition.

例如,考虑如下所述的一个事件中心:For example, consider an Event Hub as follows:

  • 10 个分区10 partitions
  • 在所有分区之间平均分配 1000 个事件,每个分区中有 100 条消息1,000 events distributed evenly across all partitions, with 100 messages in each partition

首次启用函数时,只有一个函数实例。When your function is first enabled, there is only one instance of the function. 我们将第一个函数实例命名为 Function_0Let's call the first function instance Function_0. Function_0 函数具有单个 EventProcessorHost 实例,此实例在所有十个分区上都有租约。The Function_0 function has a single instance of EventProcessorHost that holds a lease on all ten partitions. 此实例从分区 0-9 读取事件。This instance is reading events from partitions 0-9. 从此时开始,将发生下列情况之一:From this point forward, one of the following happens:

  • 不需要新的函数实例:在 Functions 的缩放逻辑生效之前,Function_0 能够处理所有 1,000 个事件。New function instances are not needed: Function_0 is able to process all 1,000 events before the Functions scaling logic take effect. 在这种情况下,所有 1,000 条消息都由 Function_0 处理。In this case, all 1,000 messages are processed by Function_0.

  • 添加其他函数实例:如果 Functions 缩放逻辑确定 Function_0 的消息数超出了它的处理能力,则会创建新的函数应用实例 (Function_1)。An additional function instance is added: If the Functions scaling logic determines that Function_0 has more messages than it can process, a new function app instance (Function_1) is created. 此新函数也有一个关联的 EventProcessorHost 实例。This new function also has an associated instance of EventProcessorHost. 基础事件中心检测到新主机实例在尝试读取消息时,会在主机实例之间对分区进行负载均衡。As the underlying Event Hubs detect that a new host instance is trying read messages, it load balances the partitions across the host instances. 例如,可以将分区 0-4 分配给 Function_0,将分区 5-9 分配给 Function_1For example, partitions 0-4 may be assigned to Function_0 and partitions 5-9 to Function_1.

  • 额外添加 N 个函数实例:如果 Functions 缩放逻辑确定 Function_0Function_1 的消息数超出了它们的处理能力,则会创建新的 Functions_N 函数应用实例。N more function instances are added: If the Functions scaling logic determines that both Function_0 and Function_1 have more messages than they can process, new Functions_N function app instances are created. 会一直创建应用,直到 N 大于事件中心分区数为止。Apps are created to the point where N is greater than the number of event hub partitions. 在我们的示例中,事件中心再次对分区进行负载均衡,在本例中是在实例 Function_0...Functions_9 之间进行的。In our example, Event Hubs again load balances the partitions, in this case across the instances Function_0...Functions_9.

缩放时,N 实例是一个大于事件中心分区数的数字。As scaling occurs, N instances is a number greater than the number of event hub partitions. 此模式用于确保 EventProcessorHost 实例可供用来在其他实例释放锁时在分区上获取锁。This pattern is used to ensure EventProcessorHost instances are available to obtain locks on partitions as they become available from other instances. 你只需为执行函数实例时使用的资源付费。You are only charged for the resources used when the function instance executes. 换句话说,不需要为过度预配的资源付费。In other words, you are not charged for this over-provisioning.

当所有函数执行都完成时(不管是否有错误),则会将检查点添加到关联的存储帐户。When all function execution completes (with or without errors), checkpoints are added to the associated storage account. 检查点设置成功后,永远不会再次检索所有 1,000 条消息。When check-pointing succeeds, all 1,000 messages are never retrieved again.

以下示例演示用于记录事件中心触发器消息正文的 C# 函数The following example shows a C# function that logs the message body of the event hub trigger.

[FunctionName("EventHubTriggerCSharp")]
public static void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] string myEventHubMessage, ILogger log)
{
    log.LogInformation($"C# function triggered to process a message: {myEventHubMessage}");
}

若要在函数代码中访问事件元数据,请绑定到 EventData 对象(需要对 Microsoft.Azure.EventHubs 使用 using 语句)。To get access to event metadata in function code, bind to an EventData object (requires a using statement for Microsoft.Azure.EventHubs). 此外,还可以通过在方法签名中使用绑定表达式来访问相同的属性。You can also access the same properties by using binding expressions in the method signature. 以下示例演示了获取相同数据的两种方法:The following example shows both ways to get the same data:

[FunctionName("EventHubTriggerCSharp")]
public static void Run(
    [EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData myEventHubMessage,
    DateTime enqueuedTimeUtc,
    Int64 sequenceNumber,
    string offset,
    ILogger log)
{
    log.LogInformation($"Event: {Encoding.UTF8.GetString(myEventHubMessage.Body)}");
    // Metadata accessed by binding to EventData
    log.LogInformation($"EnqueuedTimeUtc={myEventHubMessage.SystemProperties.EnqueuedTimeUtc}");
    log.LogInformation($"SequenceNumber={myEventHubMessage.SystemProperties.SequenceNumber}");
    log.LogInformation($"Offset={myEventHubMessage.SystemProperties.Offset}");
    // Metadata accessed by using binding expressions in method parameters
    log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
    log.LogInformation($"SequenceNumber={sequenceNumber}");
    log.LogInformation($"Offset={offset}");
}

若要成批接收事件,请将 stringEventData 设为数组。To receive events in a batch, make string or EventData an array.

备注

成批接收事件时,不能像上述示例那样使用 DateTime enqueuedTimeUtc 绑定到方法参数,且必须从每个 EventData 对象中接收这些事件When receiving in a batch you cannot bind to method parameters like in the above example with DateTime enqueuedTimeUtc and must receive these from each EventData object

[FunctionName("EventHubTriggerCSharp")]
public static void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
    foreach (var message in eventHubMessages)
    {
        log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
        log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
    }
}

特性和注释Attributes and annotations

C# 类库中,使用 EventHubTriggerAttribute 特性。In C# class libraries, use the EventHubTriggerAttribute attribute.

该特性的构造函数使用事件中心的名称、使用者组的名称和包含连接字符串的应用设置的名称。The attribute's constructor takes the name of the event hub, the name of the consumer group, and the name of an app setting that contains the connection string. 有关这些设置的详细信息,请参阅触发器配置部分For more information about these settings, see the trigger configuration section. 下面是 EventHubTriggerAttribute 特性的示例:Here's an EventHubTriggerAttribute attribute example:

[FunctionName("EventHubTriggerCSharp")]
public static void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] string myEventHubMessage, ILogger log)
{
    ...
}

有关完整示例,请参阅触发器 - C# 示例For a complete example, see Trigger - C# example.

配置Configuration

下表解释了在 function.json 文件和 EventHubTrigger 特性中设置的绑定配置属性。The following table explains the binding configuration properties that you set in the function.json file and the EventHubTrigger attribute.

function.json 属性function.json property Attribute 属性Attribute property 说明Description
typetype 不适用n/a 必须设置为 eventHubTriggerMust be set to eventHubTrigger. 在 Azure 门户中创建触发器时,会自动设置此属性。This property is set automatically when you create the trigger in the Azure portal.
directiondirection 不适用n/a 必须设置为 inMust be set to in. 在 Azure 门户中创建触发器时,会自动设置此属性。This property is set automatically when you create the trigger in the Azure portal.
namename 不适用n/a 在函数代码中表示事件项的变量的名称。The name of the variable that represents the event item in function code.
路径path EventHubNameEventHubName 仅适用于 Functions 1.x。Functions 1.x only. 事件中心的名称。The name of the event hub. 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。When the event hub name is also present in the connection string, that value overrides this property at runtime.
eventHubNameeventHubName EventHubNameEventHubName Functions 2.x 及更高版本。Functions 2.x and higher. 事件中心的名称。The name of the event hub. 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。When the event hub name is also present in the connection string, that value overrides this property at runtime. 可以通过应用设置 %eventHubName% 进行引用Can be referenced via app settings %eventHubName%
consumerGroupconsumerGroup ConsumerGroupConsumerGroup 一个可选属性,用于设置使用者组,该组用于订阅事件中心中的事件。An optional property that sets the consumer group used to subscribe to events in the hub. 如果将其省略,则会使用 $Default 使用者组。If omitted, the $Default consumer group is used.
基数cardinality 不适用n/a 用于所有非 C# 语言。Used for all non-C# languages. 设为 many 以启用批处理。Set to many in order to enable batching. 如果省略或设为 one,将向函数传递一条消息。If omitted or set to one, a single message is passed to the function.

在 C# 中,只要触发器具有该类型的数组,就会自动分配此属性。In C#, this property is automatically assigned whenever the trigger has an array for the type.
连接connection ConnectionConnection 应用设置的名称,该名称中包含事件中心命名空间的连接字符串。The name of an app setting that contains the connection string to the event hub's namespace. 单击 命名空间 (而不是事件中心本身)的“连接信息”按钮,以复制此连接字符串。Copy this connection string by clicking the Connection Information button for the namespace, not the event hub itself. 此连接字符串必须至少具有读取权限才可激活触发器。This connection string must have at least read permissions to activate the trigger.

在本地进行开发时,应用设置将取 local.settings.json 文件的值。When you're developing locally, app settings go into the local.settings.json file.

事件元数据Event metadata

事件中心触发器提供了几个元数据属性The Event Hubs trigger provides several metadata properties. 元数据属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。Metadata properties can be used as part of binding expressions in other bindings or as parameters in your code. 属性来自 EventData 类。The properties come from the EventData class.

属性Property 类型Type 说明Description
PartitionContext PartitionContextPartitionContext PartitionContext 实例。The PartitionContext instance.
EnqueuedTimeUtc DateTime 排队时间 (UTC)。The enqueued time in UTC.
Offset string 数据相对于事件中心分区流的偏移量。The offset of the data relative to the Event Hub partition stream. 偏移量是事件中心流中的事件的标记或标识符。The offset is a marker or identifier for an event within the Event Hubs stream. 该标识符在事件中心流的分区中是惟一的。The identifier is unique within a partition of the Event Hubs stream.
PartitionKey string 事件数据应该发送到的分区。The partition to which event data should be sent.
Properties IDictionary<String,Object> 事件数据的用户属性。The user properties of the event data.
SequenceNumber Int64 事件的逻辑序列号。The logical sequence number of the event.
SystemProperties IDictionary<String,Object> 系统属性,包括事件数据。The system properties, including the event data.

请参阅在本文的前面部分使用这些属性的代码示例See code examples that use these properties earlier in this article.

host.json 属性host.json properties

host.json 文件包含控制事件中心触发器行为的设置。The host.json file contains settings that control Event Hubs trigger behavior. 配置因 Azure Functions 版本而异。The configuration is different depending on the Azure Functions version.

Functions 2.x 及更高版本Functions 2.x and higher

{
    "version": "2.0",
    "extensions": {
        "eventHubs": {
            "batchCheckpointFrequency": 5,
            "eventProcessorOptions": {
                "maxBatchSize": 256,
                "prefetchCount": 512
            }
        }
    }
}  
属性Property 默认Default 说明Description
maxBatchSizemaxBatchSize 10 个10 每个接收循环收到的最大事件计数。The maximum event count received per receive loop.
prefetchCountprefetchCount 300300 基础 EventProcessorHost 使用的默认预提取计数。The default pre-fetch count used by the underlying EventProcessorHost.
batchCheckpointFrequencybatchCheckpointFrequency 11 创建 EventHub 游标检查点之前要处理的事件批数。The number of event batches to process before creating an EventHub cursor checkpoint.

备注

有关 Azure Functions 2.x 及更高版本中的 host.json 参考,请参阅 Azure Functions 的 host.json 参考For a reference of host.json in Azure Functions 2.x and beyond, see host.json reference for Azure Functions.

Functions 1.xFunctions 1.x

{
    "eventHub": {
      "maxBatchSize": 64,
      "prefetchCount": 256,
      "batchCheckpointFrequency": 1
    }
}
属性Property 默认Default 说明Description
maxBatchSizemaxBatchSize 6464 每个接收循环收到的最大事件计数。The maximum event count received per receive loop.
prefetchCountprefetchCount 不适用n/a 基础 EventProcessorHost 将使用的默认预提取。The default pre-fetch that will be used by the underlying EventProcessorHost.
batchCheckpointFrequencybatchCheckpointFrequency 11 创建 EventHub 游标检查点之前要处理的事件批数。The number of event batches to process before creating an EventHub cursor checkpoint.

备注

有关 Azure Functions 1.x 中 host.json 的参考,请参阅 Azure Functions 1.x 的 host.json 参考For a reference of host.json in Azure Functions 1.x, see host.json reference for Azure Functions 1.x.

后续步骤Next steps