适用于 Azure Functions 的 Azure 事件中心触发器
本文介绍如何使用适用于 Azure Functions 的 Azure 事件中心触发器。 Azure Functions 支持事件中心的触发器和输出绑定。
有关设置和配置详细信息,请参阅概述。
使用函数触发器来响应发送到事件中心事件流的事件。 若要设置触发器,必须具有基础事件中心的读取访问权限。 触发函数时,传递给函数的消息充当字符串类型。
面向消耗计划和高级计划的事件中心缩放决策是通过基于目标的缩放完成的。 有关详细信息,请参阅基于目标的缩放。
有关 Azure Functions 如何使用触发器响应发送到事件中心事件流的事件的信息,请参阅将事件中心与 Azure 上的无服务器函数集成。
Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。
使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南。
本文同时支持两个编程模型。
重要
Python v2 编程模型目前以预览版提供。
示例
以下示例显示了用于记录事件中心触发器消息正文的 C# 函数。
[FunctionName("EventHubTriggerCSharp")]
public void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] string myEventHubMessage, ILogger log)
{
log.LogInformation($"C# function triggered to process a message: {myEventHubMessage}");
}
若要在函数代码中访问事件元数据,请绑定到 EventData 对象。 此外,还可以通过在方法签名中使用绑定表达式来访问相同的属性。 以下示例演示了获取相同数据的两种方法:
[FunctionName("EventHubTriggerCSharp")]
public void Run(
[EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData myEventHubMessage,
DateTime enqueuedTimeUtc,
Int64 sequenceNumber,
string offset,
ILogger log)
{
log.LogInformation($"Event: {Encoding.UTF8.GetString(myEventHubMessage.Body)}");
// Metadata accessed by binding to EventData
log.LogInformation($"EnqueuedTimeUtc={myEventHubMessage.SystemProperties.EnqueuedTimeUtc}");
log.LogInformation($"SequenceNumber={myEventHubMessage.SystemProperties.SequenceNumber}");
log.LogInformation($"Offset={myEventHubMessage.SystemProperties.Offset}");
// Metadata accessed by using binding expressions in method parameters
log.LogInformation($"EnqueuedTimeUtc={enqueuedTimeUtc}");
log.LogInformation($"SequenceNumber={sequenceNumber}");
log.LogInformation($"Offset={offset}");
}
若要成批接收事件,请将 string
或 EventData
设为数组。
注意
成批接收事件时,不能像上述示例那样使用 DateTime enqueuedTimeUtc
绑定到方法参数,且必须从每个 EventData
对象中接收这些事件
[FunctionName("EventHubTriggerCSharp")]
public void Run([EventHubTrigger("samples-workitems", Connection = "EventHubConnectionAppSetting")] EventData[] eventHubMessages, ILogger log)
{
foreach (var message in eventHubMessages)
{
log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(message.Body)}");
log.LogInformation($"EnqueuedTimeUtc={message.SystemProperties.EnqueuedTimeUtc}");
}
}
以下示例演示了 function.json 文件中的一个事件中心触发器绑定以及使用该绑定的 JavaScript 函数。 此函数将读取事件元数据并记录消息。
以下示例显示了 function.json 文件中的事件中心绑定数据,与更高版本相比,这对于 Functions 运行时版本 1.x 而言是不同的。
{
"type": "eventHubTrigger",
"name": "myEventHubMessage",
"direction": "in",
"eventHubName": "MyEventHub",
"connection": "myEventHubReadConnectionAppSetting"
}
JavaScript 代码如下所示:
module.exports = function (context, myEventHubMessage) {
context.log('Function triggered to process a message: ', myEventHubMessage);
context.log('EnqueuedTimeUtc =', context.bindingData.enqueuedTimeUtc);
context.log('SequenceNumber =', context.bindingData.sequenceNumber);
context.log('Offset =', context.bindingData.offset);
context.done();
};
若要批量接收事件,请将 function.json 文件中的 cardinality
设为 many
,如以下示例所示。
{
"type": "eventHubTrigger",
"name": "eventHubMessages",
"direction": "in",
"eventHubName": "MyEventHub",
"cardinality": "many",
"connection": "myEventHubReadConnectionAppSetting"
}
JavaScript 代码如下所示:
module.exports = function (context, eventHubMessages) {
context.log(`JavaScript eventhub trigger function called for message array ${eventHubMessages}`);
eventHubMessages.forEach((message, index) => {
context.log(`Processed message ${message}`);
context.log(`EnqueuedTimeUtc = ${context.bindingData.enqueuedTimeUtcArray[index]}`);
context.log(`SequenceNumber = ${context.bindingData.sequenceNumberArray[index]}`);
context.log(`Offset = ${context.bindingData.offsetArray[index]}`);
});
context.done();
};
下面是 PowerShell 代码:
param($eventHubMessages, $TriggerMetadata)
Write-Host "PowerShell eventhub trigger function called for message array: $eventHubMessages"
$eventHubMessages | ForEach-Object { Write-Host "Processed message: $_" }
以下示例演示了一个事件中心触发器绑定以及使用该绑定的 Python 函数。 此函数将读取事件元数据并记录消息。 该示例取决于是使用 v1 还是 v2 Python 编程模型。
import logging
import azure.functions as func
app = func.FunctionApp()
@app.function_name(name="EventHubTrigger1")
@app.event_hub_message_trigger(arg_name="myhub",
event_hub_name="<EVENT_HUB_NAME>",
connection="<CONNECTION_SETTING>")
def test_function(myhub: func.EventHubEvent):
logging.info('Python EventHub trigger processed an event: %s',
myhub.get_body().decode('utf-8'))
以下示例演示了一个记录事件中心触发器消息正文的事件中心触发器绑定。
@FunctionName("ehprocessor")
public void eventHubProcessor(
@EventHubTrigger(name = "msg",
eventHubName = "myeventhubname",
connection = "myconnvarname") String message,
final ExecutionContext context )
{
context.getLogger().info(message);
}
在 Java 函数运行时库中,对其值来自事件中心的参数使用 EventHubTrigger
注释。 带有这些注释的参数会导致函数在事件到达时运行。 可以将此注释与本机 Java 类型、POJO 或使用了 Optional<T>
的可为 null 的值一起使用。
以下示例演示了广泛使用 SystemProperties
和其他绑定选项以进一步自检事件,并提供格式标准的 BlobOutput
路径(日期分层)。
package com.example;
import java.util.Map;
import java.time.ZonedDateTime;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;
/**
* Azure Functions with Event Hub trigger.
* and Blob Output using date in path along with message partition ID
* and message sequence number from EventHub Trigger Properties
*/
public class EventHubReceiver {
@FunctionName("EventHubReceiver")
@StorageAccount("bloboutput")
public void run(
@EventHubTrigger(name = "message",
eventHubName = "%eventhub%",
consumerGroup = "%consumergroup%",
connection = "eventhubconnection",
cardinality = Cardinality.ONE)
String message,
final ExecutionContext context,
@BindingName("Properties") Map<String, Object> properties,
@BindingName("SystemProperties") Map<String, Object> systemProperties,
@BindingName("PartitionContext") Map<String, Object> partitionContext,
@BindingName("EnqueuedTimeUtc") Object enqueuedTimeUtc,
@BlobOutput(
name = "outputItem",
path = "iotevents/{datetime:yy}/{datetime:MM}/{datetime:dd}/{datetime:HH}/" +
"{datetime:mm}/{PartitionContext.PartitionId}/{SystemProperties.SequenceNumber}.json")
OutputBinding<String> outputItem) {
var et = ZonedDateTime.parse(enqueuedTimeUtc + "Z"); // needed as the UTC time presented does not have a TZ
// indicator
context.getLogger().info("Event hub message received: " + message + ", properties: " + properties);
context.getLogger().info("Properties: " + properties);
context.getLogger().info("System Properties: " + systemProperties);
context.getLogger().info("partitionContext: " + partitionContext);
context.getLogger().info("EnqueuedTimeUtc: " + et);
outputItem.setValue(message);
}
}
特性
修饰符
仅适用于 Python v2 编程模型。
对于使用修饰器定义的 Python v2 功能,支持 cosmos_db_trigger
上的以下属性:
properties | 说明 |
---|---|
arg_name |
在函数代码中表示事件项的变量的名称。 |
event_hub_name |
事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。 |
connection |
指定如何连接到事件中心的应用设置或设置集合的名称。 请参阅连接。 |
对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。
配置
仅适用于 Python v1 编程模型。
下表说明了在 function.json 文件中设置的触发器配置属性,这些属性因运行时版本而异。
function.json 属性 | 说明 |
---|---|
type | 必须设置为 eventHubTrigger 。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
direction | 必须设置为 in 。 在 Azure 门户中创建触发器时,会自动设置此属性。 |
name | 在函数代码中表示事件项的变量的名称。 |
eventHubName | 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。 可以通过应用设置 %eventHubName% 进行引用 |
consumerGroup | 一个可选属性,用于设置使用者组,该组用于订阅事件中心中的事件。 如果将其省略,则会使用 $Default 使用者组。 |
基数 | 设为 many 以启用批处理。 如果省略或设为 one ,将向函数传递一条消息。 |
连接 | 指定如何连接到事件中心的应用设置或设置集合的名称。 请参阅连接。 |
在本地开发时,需要将应用程序设置添加到 Values
集合中的 local.settings.json 文件中。
使用情况
要详细了解事件中心触发器和 IoT 中心触发器的缩放方式,请参阅通过 Azure Functions 使用事件。
事件中心输出绑定支持的参数类型取决于所用的 Functions 运行时版本、扩展包版本以及 C# 模态。
进程内 C# 类库函数支持以下类型:
- Azure.Messaging.EventHubs.EventData
- 字符串
- Byte Array
- 普通旧 CLR 对象 (POCO)
参数类型可以为以下类型之一:
- 任意的本机 Java 类型,例如 int、String、byte[]。
- 使用可选的可为 null 的值。
- 任意 POJO 类型。
若要了解详细信息,请参阅 EventHubTrigger 参考。
事件元数据
事件中心触发器提供了几个元数据属性。 元数据属性可在其他绑定中用作绑定表达式的一部分,或者用作代码中的参数。 属性来自 EventData 类。
属性 | 类型 | 说明 |
---|---|---|
PartitionContext |
PartitionContext | PartitionContext 实例。 |
EnqueuedTimeUtc |
DateTime |
排队时间 (UTC)。 |
Offset |
string |
数据相对于事件中心分区流的偏移量。 偏移量是事件中心流中的事件的标记或标识符。 该标识符在事件中心流的分区中是惟一的。 |
PartitionKey |
string |
事件数据应该发送到的分区。 |
Properties |
IDictionary<String,Object> |
事件数据的用户属性。 |
SequenceNumber |
Int64 |
事件的逻辑序列号。 |
SystemProperties |
IDictionary<String,Object> |
系统属性,包括事件数据。 |
请参阅在本文的前面部分使用这些属性的代码示例。
连接
connection
属性是环境配置的引用,它指定应用应如何连接到事件中心。 它可能指定:
如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。
连接字符串
单击命名空间(而不是事件中心本身)的“连接信息”按钮,以获取此连接字符串。 连接字符串必须用于事件中心命名空间,而不是事件中心本身。
当用于触发器时,连接字符串必须至少具有“读取”权限才能激活函数。 当用于输出绑定时,连接字符串必须具有“发送”权限才能将消息发送到事件流。
此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection
属性指定的值匹配。
基于标识的连接
如果使用 5.x 版或更高版本的扩展,则无需将连接字符串与机密一起使用,而是可以使应用使用 Azure Active Directory 标识。 为此,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection
属性。
在此模式下,扩展需要以下属性:
属性 | 环境变量模板 | 说明 | 示例值 |
---|---|---|---|
完全限定的命名空间 | <CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace |
完全限定的事件中心命名空间。 | <event_hubs_namespace>.servicebus.chinacloudapi.cn |
可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性。
注意
使用 Azure 应用程序配置或 Key Vault 为托管标识连接提供设置时,设置名称应使用有效的键分隔符(例如 :
或 /
)替代 __
,以确保正确解析名称。
例如 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace
。
在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credential
和 clientID
属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发。
向标识授予权限
无论使用何种标识,都必须具有执行所需操作的权限。 对于大多数 Azure 服务,这意味着你需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色。
重要
某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。
你将需要创建一个角色分配,以便在运行时提供对事件中心的访问权限。 角色分配的范围可以是事件中心命名空间,也可以是事件中心本身。 所有者等管理角色还不够。 下表显示了在正常操作中使用事件中心扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。
绑定类型 | 内置角色示例 |
---|---|
触发器 | Azure 事件中心数据接收方、Azure 事件中心数据所有者 |
输出绑定 | Azure 事件中心数据发送方 |
host.json 设置
host.json 文件包含控制事件中心触发器行为的设置。 有关可用设置的详细信息,请参阅 host.json 设置部分。