适用于 Azure Functions 的 Azure 事件中心输出绑定

本文介绍如何使用 Azure Functions 的 Azure 事件中心绑定。 Azure Functions 支持事件中心的触发器和输出绑定。

有关设置和配置详细信息,请参阅概述

使用事件中心输出绑定将事件写入到事件流。 必须具有事件中心的发送权限才可将事件写入到其中。

在尝试实现输出绑定之前,请确保所需的包引用已准备就绪。

重要

本文使用选项卡来支持多个版本的 Node.js 编程模型。 v4 模型目前处于预览状态,旨在为 JavaScript 和 TypeScript 开发人员提供更为灵活和直观的体验。 在升级指南中详细了解 v3 和 v4 之间的差异。

Azure Functions 支持两种 Python 编程模型。 定义绑定的方式取决于选择的编程模型。

使用 Python v2 编程模型,可以直接在 Python 函数代码中使用修饰器定义绑定。 有关详细信息,请参阅 Python 开发人员指南

本文同时支持两个编程模型。

示例

以下示例显示了使用方法返回值作为输出将消息字符串写入到事件中心的 C# 函数

    {
        private readonly ILogger<EventHubsFunction> _logger;

        public EventHubsFunction(ILogger<EventHubsFunction> logger)
        {
            _logger = logger;
        }

        [Function(nameof(EventHubFunction))]
        [FixedDelayRetry(5, "00:00:10")]
        [EventHubOutput("dest", Connection = "EventHubConnection")]
        public string EventHubFunction(

以下示例显示计时器触发的 TypeScript 函数,该函数将一条消息发送到事件中心:

import { app, InvocationContext, output, Timer } from '@azure/functions';

export async function timerTrigger1(myTimer: Timer, context: InvocationContext): Promise<string> {
    const timeStamp = new Date().toISOString();
    return `Message created at: ${timeStamp}`;
}

app.timer('timerTrigger1', {
    schedule: '0 */5 * * * *',
    return: output.eventHub({
        eventHubName: 'myeventhub',
        connection: 'MyEventHubSendAppSetting',
    }),
    handler: timerTrigger1,
});

若要输出多条消息,请返回一个数组而不是单个对象。 例如:

import { app, InvocationContext, output, Timer } from '@azure/functions';

export async function timerTrigger1(myTimer: Timer, context: InvocationContext): Promise<string[]> {
    // <displayInDocs>
    const timeStamp = new Date().toISOString();
    const message = `Message created at: ${timeStamp}`;
    return [`1: ${message}`, `2: ${message}`];
    // </displayInDocs>
}

app.timer('timerTrigger1', {
    schedule: '0 */5 * * * *',
    return: output.eventHub({
        eventHubName: 'myeventhub',
        connection: 'MyEventHubSendAppSetting',
    }),
    handler: timerTrigger1,
});

以下示例显示计时器触发的 JavaScript 函数,该函数将一条消息发送到事件中心:

const { app, output } = require('@azure/functions');

const eventHubOutput = output.eventHub({
    eventHubName: 'myeventhub',
    connection: 'MyEventHubSendAppSetting',
});

app.timer('timerTrigger1', {
    schedule: '0 */5 * * * *',
    return: eventHubOutput,
    handler: (myTimer, context) => {
        const timeStamp = new Date().toISOString();
        return `Message created at: ${timeStamp}`;
    },
});

若要输出多条消息,请返回一个数组而不是单个对象。 例如:

const { app, output } = require('@azure/functions');

const eventHubOutput = output.eventHub({
    eventHubName: 'myeventhub',
    connection: 'MyEventHubSendAppSetting',
});

app.timer('timerTrigger1', {
    schedule: '0 */5 * * * *',
    return: eventHubOutput,
    handler: (myTimer, context) => {
        // <displayInDocs>
        const timeStamp = new Date().toISOString();
        const message = `Message created at: ${timeStamp}`;
        return [`1: ${message}`, `2: ${message}`];
        // </displayInDocs>
    },
});

完整的 PowerShell 示例待定。

以下示例演示了事件中心触发器绑定以及使用该绑定的 Python 函数。 该函数将消息写入事件中心。 该示例取决于使用的是 v1 还是 v2 Python 编程模型

import logging
import azure.functions as func

app = func.FunctionApp()

@app.function_name(name="eventhub_output")
@app.route(route="eventhub_output")
@app.event_hub_output(arg_name="event",
                      event_hub_name="<EVENT_HUB_NAME>",
                      connection="<CONNECTION_SETTING>")
def eventhub_output(req: func.HttpRequest, event: func.Out[str]):
    body = req.get_body()
    if body is not None:
        event.set(body.decode('utf-8'))
    else:    
        logging.info('req body is none')
    return 'ok'

下面是可发送多条消息的 Python 代码:

import logging
import azure.functions as func
from typing import List

app = func.FunctionApp()

@app.function_name(name="eventhub_output")
@app.route(route="eventhub_output")
@app.event_hub_output(arg_name="event",
                      event_hub_name="<EVENT_HUB_NAME>",
                      connection="<CONNECTION_SETTING>")

def eventhub_output(req: func.HttpRequest, event: func.Out[List[str]]) -> func.HttpResponse:
    my_messages=["message1", "message2","message3"]
    event.set(my_messages)
    return func.HttpResponse(f"Messages sent")

以下示例演示一个 Java 函数,该函数将包含当前时间的消息写入到事件中心。

@FunctionName("sendTime")
@EventHubOutput(name = "event", eventHubName = "samples-workitems", connection = "AzureEventHubConnection")
public String sendTime(
   @TimerTrigger(name = "sendTimeTrigger", schedule = "0 */5 * * * *") String timerInfo)  {
     return LocalDateTime.now().toString();
 }

Java 函数运行时库中,会对其值被发布到事件中心的参数使用 @EventHubOutput 注释。 此参数应为 OutputBinding<T> 类型,其中 T 是 POJO 或任何本机 Java 类型。

特性

进程内独立工作进程 C# 库使用特性来配置绑定。 C# 脚本改用 function.json 配置文件,如 C# 脚本指南中所述。

使用 [EventHubOutputAttribute] 定义到事件中心的输出绑定,EventHubOutputAttribute 支持以下属性。

parameters 说明
EventHubName 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。
Connection 指定如何连接到事件中心的应用设置或设置集合的名称。 若要了解详细信息,请参阅连接

修饰符

仅适用于 Python v2 编程模型。

对于使用修饰器定义的 Python v2 功能,支持 cosmos_db_trigger 上的以下属性:

properties 说明
arg_name 函数代码中使用的表示事件的变量名称。
event_hub_name 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。
connection 指定如何连接到事件中心的应用设置或设置集合的名称。 若要了解详细信息,请参阅连接

对于使用 function.json 定义的 Python 函数,请参阅“配置”部分。

批注

Java 函数运行时库中,会对其值将被发布到事件中心的参数使用 EventHubOutput 注释。 注释支持以下设置:

配置

仅适用于 Python v1 编程模型

下表说明了可以在传递给 output.eventHub() 方法的 options 对象上设置的属性。

properties 说明
eventHubName 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。
连接 指定如何连接到事件中心的应用设置或设置集合的名称。 若要了解详细信息,请参阅连接

下表说明了在 function.json 文件中设置的绑定配置属性,这些属性因运行时版本而异。

function.json 属性 说明
type 必须设置为 eventHub
direction 必须设置为 out。 在 Azure 门户中创建绑定时,会自动设置该参数。
name 函数代码中使用的表示事件的变量名称。
eventHubName Functions 2.x 及更高版本。 事件中心的名称。 当事件中心名称也出现在连接字符串中时,该值会在运行时覆盖此属性。
连接 指定如何连接到事件中心的应用设置或设置集合的名称。 若要了解详细信息,请参阅连接

在本地开发时,需要将应用程序设置添加到 Values 集合中的 local.settings.json 文件中。

使用情况

事件中心输出绑定支持的参数类型取决于所用的 Functions 运行时版本、扩展包版本以及 C# 模态。

如果希望函数写入单个事件,事件中心输出绑定可以绑定到以下类型:

类型 说明
string 字符串形式的事件。 当事件为简单文本时使用。
byte[] 事件的字节数。
JSON 可序列化类型 表示事件的对象。 函数尝试将普通的旧 CLR 对象 (POCO) 类型序列化为 JSON 数据。

如果希望函数写入多个事件,事件中心输出绑定可以绑定到以下类型:

类型 说明
T[],其中 T 是单事件类型之一 包含多个事件的数组。 每个条目表示一个事件。

对于其他输出方案,请直接从 Microsoft.Azure.EventHubs 创建和使用类型。

有两个选项可通过使用 EventHubOutput 注释从函数输出事件中心消息:

  • 返回值:通过将注释应用于函数本身,函数的返回值将持久保存为事件中心消息。

  • 命令性:若要显式设置消息值,请将注释应用于 OutputBinding<T> 类型的特定参数,其中 T 是 POJO 或任何本机 Java 类型。 使用此配置时,向 setValue 方法传递某值会将该值持久保存为事件中心消息。

完整的 PowerShell 示例待定。

通过直接返回值或使用 context.extraOutputs.set() 来访问输出消息。

有两个选项可用于从函数中输出事件中心消息:

  • 返回值:将 function.json 中的 name 属性 设置为 $return。 使用此配置时,函数的返回值将作为事件中心消息保留。

  • 命令性:将值传递给声明为 Out 类型的参数的 set 方法。 传递给 set 的值将作为事件中心消息保留。

连接

connection 属性是环境配置的引用,它指定应用应如何连接到事件中心。 它可能指定:

如果配置的值既是单个设置的完全匹配,也是其他设置的前缀匹配,则使用完全匹配。

连接字符串

单击命名空间(而不是事件中心本身)的“连接信息”按钮,以获取此连接字符串。 连接字符串必须用于事件中心命名空间,而不是事件中心本身。

当用于触发器时,连接字符串必须至少具有“读取”权限才能激活函数。 当用于输出绑定时,连接字符串必须具有“发送”权限才能将消息发送到事件流。

此连接字符串应存储在应用程序设置中,其名称与绑定配置的 connection 属性指定的值匹配。

基于标识的连接

如果使用 5.x 版或更高版本的扩展,则无需将连接字符串与机密一起使用,而是可以使应用使用 Azure Active Directory 标识。 为此,需要定义公共前缀下的设置,该前缀映射到触发器和绑定配置中的 connection 属性。

在此模式下,扩展需要以下属性:

注意

提供的环境变量当前必须以 AzureWebJobs 为前缀才能在消耗计划中使用。 在高级计划中,不需要此前缀。

属性 环境变量模板 说明 示例值
完全限定的命名空间 AzureWebJobs<CONNECTION_NAME_PREFIX>__fullyQualifiedNamespace 完全限定的事件中心命名空间。 <event_hubs_namespace>.servicebus.chinacloudapi.cn

可以设置其他属性来自定义连接。 请参阅基于标识的连接的通用属性

注意

使用 Azure 应用程序配置Key Vault 为托管标识连接提供设置时,设置名称应使用有效的键分隔符(例如 :/)替代 __,以确保正确解析名称。

例如 <CONNECTION_NAME_PREFIX>:fullyQualifiedNamespace

在 Azure Functions 服务中托管时,基于标识的连接将使用托管标识。 默认情况下使用系统分配的标识,但可以使用 credentialclientID 属性来指定用户分配的标识。 请注意,不支持为用户分配的标识配置资源 ID。 在其他上下文(如本地开发)中运行时,将改用开发人员标识,尽管可以进行自定义。 请参阅使用基于标识的连接进行本地开发

向标识授予权限

无论使用何种标识,都必须具有执行所需操作的权限。 需要使用内置角色或者提供这些权限的自定义角色在 Azure RBAC 中分配角色

重要

某些权限可能由并非所有上下文都需要的目标服务公开。 尽可能遵循最低权限原则,仅授予标识所需的权限。 例如,如果应用只需要从数据源进行读取即可,则使用仅具有读取权限的角色。 分配一个也具有该服务写入权限的角色并不恰当,因为对于读取操作来说,写入是多余的权限。 同样,你也希望确保角色分配的范围仅限于需要读取的资源。

你将需要创建一个角色分配,以便在运行时提供对事件中心的访问权限。 角色分配的范围可以是事件中心命名空间,也可以是事件中心本身。 所有者等管理角色还不够。 下表显示了在正常操作中使用事件中心扩展时建议使用的内置角色。 根据所编写的代码,应用程序可能需要具有其他权限。

绑定类型 内置角色示例
触发器 Azure 事件中心数据接收方Azure 事件中心数据所有者
输出绑定 Azure 事件中心数据发送方

异常和返回代码

绑定 参考
事件中心 操作指南

后续步骤