Compartir a través de

适用于 Azure Functions 的 Apache Kafka 绑定概述

使用适用于 Azure Functions 的 Kafka 扩展,你可以通过输出绑定将值写入到 Apache Kafka 主题。 还可以使用触发器来调用函数,以响应 Kafka 主题中的消息。

重要

Kafka 绑定仅可用于弹性高级计划专用(应用服务)计划中的 Functions。 它们仅在 3.x 版及更高版本的 Functions 运行时上受支持。

行动 类型
基于新的 Kafka 事件来运行函数。 触发器
写入到 Kafka 事件流。 输出绑定

安装扩展

你安装的扩展 NuGet 包取决于你在函数应用中使用的 C# 模式:

函数在独立的 C# 工作进程中执行。 若要了解详细信息,请参阅有关在独立工作进程中运行 C# Azure Functions 的指南

通过安装此 NuGet 包将该扩展添加到你的项目。

安装捆绑包

若要能够在应用中使用此绑定扩展,请确保项目的根目录中 host.json 文件包含以下 extensionBundle 引用:

{
    "version": "2.0",
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[4.0.0, 5.0.0)"
    }
}

在此示例中,version[4.0.0, 5.0.0)值指示 Functions 主机使用至少4.0.0小于但小于5.0.0的捆绑包版本,其中包括所有可能的 4.x 版本。 此表示法有效地在 v4.x 扩展捆绑包的最新可用次要版本上维护应用。

如果可能,应使用最新的扩展捆绑包主版本,并允许运行时自动维护最新的次要版本。 可以在 扩展捆绑包发布页上查看最新捆绑包的内容。

如果应用要求使用以前的扩展版本,则可能需要改为指定以前的捆绑包版本。 可以查看 捆绑包版本 ,找到包含此扩展版本的捆绑包,该版本可供应用使用。 有关详细信息,请参阅 Azure Functions 扩展捆绑包。 结束区域

启用运行时缩放

若要允许函数在使用 Kafka 触发器和绑定时在高级计划上正确进行缩放,需要启用运行时缩放监视。

在 Azure 门户的函数应用中,选择“配置”,然后在“函数运行时设置”选项卡上将“运行时缩放监视”切换到“打开”。

用于启用运行时缩放的 Azure 门户面板的屏幕截图。

host.json 设置

本部分介绍 3.x 及更高版本中可用于此绑定的配置设置。 host.json 文件中的设置将应用于函数应用实例中的所有函数。 若要详细了解 3.x 及更高版本中的函数应用配置设置,请参阅 Azure Functions 的 host.json 参考

{
    "version": "2.0",
    "extensions": {
        "kafka": {
            "maxBatchSize": 64,
            "SubscriberIntervalInSeconds": 1,
            "ExecutorChannelCapacity": 1,
            "ChannelFullRetryIntervalInMs": 50
        }
    }
}

资产 违约 类型 DESCRIPTION
ChannelFullRetryIntervalInMs 50 触发器 定义尝试将项添加到满负荷通道时使用的订阅服务器重试间隔(以毫秒为单位)。
ExecutorChannelCapacity 1 两者 定义通道消息容量。 达到容量后,Kafka 订阅服务器会暂停,直到函数赶上。
MaxBatchSize 64 触发器 调用 Kafka 触发的函数时的最大批大小。
SubscriberIntervalInSeconds 1 触发器 定义每个函数执行传入消息的最小频率,按秒计算。 前提条件是消息量小于 MaxBatchSize / SubscriberIntervalInSeconds

host.json 的 节也支持继承自 kafka的以下属性,不管是对于触发器,还是对于输出绑定和触发器:

资产 适用于 librdkafka 等效项
AutoCommitIntervalMs 触发器 auto.commit.interval.ms
AutoOffsetReset 触发器 auto.offset.reset
FetchMaxBytes 触发器 fetch.max.bytes
LibkafkaDebug 两者 debug
MaxPartitionFetchBytes 触发器 max.partition.fetch.bytes
MaxPollIntervalMs 触发器 max.poll.interval.ms
MetadataMaxAgeMs 两者 metadata.max.age.ms
QueuedMinMessages 触发器 queued.min.messages
QueuedMaxMessagesKbytes 触发器 queued.max.messages.kbytes
ReconnectBackoffMs 触发器 reconnect.backoff.max.ms
ReconnectBackoffMaxMs 触发器 reconnect.backoff.max.ms
SessionTimeoutMs 触发器 session.timeout.ms
SocketKeepaliveEnable 两者 socket.keepalive.enable
StatisticsIntervalMs 触发器 statistics.interval.ms

后续步骤