适用于 Azure Functions 的 Apache Kafka 输出绑定


输出绑定允许 Azure Functions 应用将消息写入到 Kafka 主题。


Kafka 绑定仅可用于弹性高级计划专用(应用服务)计划中的 Functions。 它们仅在 3.x 版及更高版本的 Functions 运行时上受支持。


绑定的用法取决于函数应用中使用的 C# 模态,后者可以是以下模态之一:

独立工作进程类库的已编译 C# 函数在独立于运行时的进程中运行。


以下示例具有一个自定义返回类型 MultipleOutputType,该类型由 HTTP 响应和 Kafka 输出组成。


        public static MultipleOutputType Output(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
            FunctionContext executionContext)
            var log = executionContext.GetLogger("HttpFunction");
            log.LogInformation("C# HTTP trigger function processed a request.");

            string message = req.FunctionContext

            var response = req.CreateResponse(HttpStatusCode.OK);
            return new MultipleOutputType()
                Kevent = message,
                HttpResponse = response

MultipleOutputType 类中,Kevent 是 Kafka 绑定的输出绑定变量。

    public class MultipleOutputType
                    Username = "ConfluentCloudUserName",
                    Password = "ConfluentCloudPassword",
            Protocol = BrokerProtocol.SaslSsl,
            AuthenticationMode = BrokerAuthenticationMode.Plain
        public string Kevent { get; set; }

        public HttpResponseData HttpResponse { get; set; }



        public static MultipleOutputTypeForBatch Output(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
            FunctionContext executionContext)
            var log = executionContext.GetLogger("HttpFunction");
            log.LogInformation("C# HTTP trigger function processed a request.");
            var response = req.CreateResponse(HttpStatusCode.OK);

            string[] messages = new string[2];
            messages[0] = "one";
            messages[1] = "two";

            return new MultipleOutputTypeForBatch()
                Kevents = messages,
                HttpResponse = response

该字符串数组被定义为类的 Kevents 属性,在其上定义了输出绑定:

    public class MultipleOutputTypeForBatch
                     Username = "ConfluentCloudUserName",
                     Password = "ConfluentCloudPassword",
            Protocol = BrokerProtocol.SaslSsl,
            AuthenticationMode = BrokerAuthenticationMode.Plain
        public string[] Kevents { get; set; }

        public HttpResponseData HttpResponse { get; set; }

以下函数将标头添加到 Kafka 输出数据:


public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
        Kevent = kevent,
        HttpResponse = response

有关完整的一组可用 .NET 示例,请参阅 Kafka 扩展存储库


有关等效的一组 TypeScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于你的事件提供程序(在这些示例中是 Confluent 或 Azure 事件中心)。 以下示例显示了由 HTTP 请求触发并将来自请求的数据发送到 Kafka 主题的函数的 Kafka 输出绑定。

以下 function.json 在这些示例中定义特定提供程序的触发器:

  "bindings": [
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
      "type": "http",
      "direction": "out",
      "name": "res"


// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'


// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage

以下示例演示如何将包含标头的事件消息发送到同一 Kafka 主题:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage

有关完整的一组可用 JavaScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于你的事件提供程序(在这些示例中是 Confluent 或 Azure 事件中心)。 以下示例显示了由 HTTP 请求触发并将来自请求的数据发送到 Kafka 主题的函数的 Kafka 输出绑定。

以下 function.json 在这些示例中定义特定提供程序的触发器:

  "bindings": [
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
      "type": "http",
      "direction": "out",
      "name": "Response"


using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message


Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK


using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK

以下示例演示如何将包含标头的事件消息发送到同一 Kafka 主题:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'

有关完整的一组可用 PowerShell 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于你的事件提供程序(在这些示例中是 Confluent 或 Azure 事件中心)。 以下示例显示了由 HTTP 请求触发并将来自请求的数据发送到 Kafka 主题的函数的 Kafka 输出绑定。

以下 function.json 在这些示例中定义特定提供程序的触发器:

  "scriptFile": "main.py",
  "bindings": [
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
      "type": "http",
      "direction": "out",
      "name": "$return"


import logging

import azure.functions as func

def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    return 'OK'


import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

以下示例演示如何将包含标头的事件消息发送到同一 Kafka 主题:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    return 'OK'

有关完整的一组可用 Python 示例,请参阅 Kafka 扩展存储库


以下函数向 Kafka 主题发送消息。

    public HttpResponseMessage run(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
                name = "kafkaOutput",
                topic = "topic",  
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
                protocol = BrokerProtocol.SASLSSL
            )  OutputBinding<String> output,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");

        // Parse query parameter
        String query = request.getQueryParameters().get("message");
        String message = request.getBody().orElse(query);
        context.getLogger().info("Message:" + message);
        return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

以下示例演示如何将多个消息发送到一个 Kafka 主题。

    public HttpResponseMessage run(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
                name = "kafkaOutput",
                topic = "topic",  
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
                protocol = BrokerProtocol.SASLSSL
            )  OutputBinding<String[]> output,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");
        String[] messages = new String[2];
        messages[0] = "one";
        messages[1] = "two";
        return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();


上一个示例用于这些 KafkaEntityKafkaHeader 类:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;

以下示例函数向 Kafka 主题发送包含标头的消息。

    public HttpResponseMessage run(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
                name = "kafkaOutput",
                topic = "topic",  
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
                protocol = BrokerProtocol.SASLSSL
            )  OutputBinding<KafkaEntity> output,
            final ExecutionContext context) {
                context.getLogger().info("Java HTTP trigger processed a request.");

                // Parse query parameter
                String query = request.getQueryParameters().get("message");
                String message = request.getBody().orElse(query);
                KafkaHeaders[] headers = new KafkaHeaders[1];
                headers[0] = new KafkaHeaders("test", "java");
                KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
                return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

有关完整的一组适用于 Confluent 的可用 Java 示例,请参阅 Kafka 扩展存储库


进程内独立工作进程 C# 库都使用 Kafka 特性来定义函数触发器。


参数 说明
BrokerList (必需)将输出发送到的 Kafka 代理的列表。 有关详细信息,请参阅连接
主题 (必需)将输出发送到的主题。
AvroSchema (可选)使用 Avro 协议时的通用记录的架构。
MaxMessageBytes (可选)要发送的输出消息的最大大小(以 MB 为单位),默认值为 1
BatchSize (可选)单个消息集内批处理的最大消息数,默认值为 10000
EnableIdempotence (可选)设置为 true 时,保证消息按原始生成顺序生成且仅成功生成一次,默认值为 false
MessageTimeoutMs (可选)本地消息超时(以毫秒为单位)。 此值仅在本地强制使用,并限制生成的消息等待成功传递的时间,默认值为 300000。 时间为 0 表示时间无限。 此值是用于传递消息(包括重试)的最长时间。 超过重试计数或消息超时时,会发生传递错误。
RequestTimeoutMs (可选)输出请求的确认超时,以毫秒为单位,默认值为 5000
MaxRetries (可选)重试发送失败消息的次数,默认值为 2。 重试可能会导致重新排序,除非将 EnableIdempotence 设置为 true
AuthenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
用户名 (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
密码 (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
协议 (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
SslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
SslCertificateLocation (可选)客户端证书的路径。
SslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
SslKeyPassword (可选)客户端证书的密码。


通过 KafkaOutput 注释,可以创建用于写入到特定主题的函数。 支持的选项包括以下元素:

元素 说明
name 表示函数代码中代理数据的变量的名称。
brokerList (必需)将输出发送到的 Kafka 代理的列表。 有关详细信息,请参阅连接
topic (必需)将输出发送到的主题。
dataType 定义 Functions 如何处理参数值。 默认情况下,值是作为一个字符串获取的,并且 Functions 会尝试将此字符串反序列化为实际的普通旧 Java 对象 (POJO)。 当此元素为 string 时,输入仅仅被视为一个字符串。 当此元素为 binary 时,消息是作为二进制数据接收的,Functions 会尝试将其反序列化为实际参数类型 byte[]。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。 (目前不支持 Java。)
maxMessageBytes (可选)要发送的输出消息的最大大小(以 MB 为单位),默认值为 1
batchSize (可选)单个消息集内批处理的最大消息数,默认值为 10000
enableIdempotence (可选)设置为 true 时,保证消息按原始生成顺序生成且仅成功生成一次,默认值为 false
messageTimeoutMs (可选)本地消息超时(以毫秒为单位)。 此值仅在本地强制使用,并限制生成的消息等待成功传递的时间,默认值为 300000。 时间为 0 表示时间无限。 这是用于传递消息(包括重试)的最长时间。 超过重试计数或消息超时时,会发生传递错误。
requestTimeoutMs (可选)输出请求的确认超时,以毫秒为单位,默认值为 5000
maxRetries (可选)重试发送失败消息的次数,默认值为 2。 重试可能会导致重新排序,除非将 EnableIdempotence 设置为 true
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
username (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。


下表解释了在 function.json 文件中设置的绑定配置属性。

“function.json”属性 说明
type 必须设置为 kafka
direction 必须设置为 out
name 表示函数代码中代理数据的变量的名称。
brokerList (必需)将输出发送到的 Kafka 代理的列表。 有关详细信息,请参阅连接
topic (必需)将输出发送到的主题。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。
maxMessageBytes (可选)要发送的输出消息的最大大小(以 MB 为单位),默认值为 1
batchSize (可选)单个消息集内批处理的最大消息数,默认值为 10000
enableIdempotence (可选)设置为 true 时,保证消息按原始生成顺序生成且仅成功生成一次,默认值为 false
messageTimeoutMs (可选)本地消息超时(以毫秒为单位)。 此值仅在本地强制使用,并限制生成的消息等待成功传递的时间,默认值为 300000。 时间为 0 表示时间无限。 这是用于传递消息(包括重试)的最长时间。 超过重试计数或消息超时时,会发生传递错误。
requestTimeoutMs (可选)输出请求的确认超时,以毫秒为单位,默认值为 5000
maxRetries (可选)重试发送失败消息的次数,默认值为 2。 重试可能会导致重新排序,除非将 EnableIdempotence 设置为 true
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
username (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。


内置 AvroProtobuf 序列化支持键和值类型。

事件的偏移量、分区和时间戳在运行时生成。 只能在函数内设置值和标头。 主题是在 function.json 中设置的。

请确保有权访问你尝试写入的 Kafka 主题。 需要为绑定配置对 Kafka 主题的访问权限和连接凭据。

在高级计划中,必须为 Kafka 输出启用运行时规模监视,以便能够横向扩展到多个实例。 若要了解详细信息,请参阅启用运行时缩放

有关 Kafka 触发器支持的完整一组 host.json 设置,请参阅 host.json 设置


触发器和绑定所需的所有连接信息都应在应用程序设置中(而不是在代码内的绑定定义中)维护。 这适用于凭据,它们永远不应存储在代码中。


凭据设置必须引用应用程序设置。 不要在代码或配置文件中对凭据进行硬编码。 在本地运行时,请对凭据使用 local.settings.json 文件,并且不要发布 local.settings.json 文件。

Azure 中的 Confluent 提供的托管 Kafka 群集进行连接时,请确保在触发器或绑定中设置了适用于 Confluent Cloud 环境的以下身份验证凭据:

设置 建议的值 说明
BrokerList BootstrapServer 名为 BootstrapServer 的应用设置包含在 Confluent Cloud 设置页面中找到的引导服务器的值。 该值类似于 xyz-xyzxzy.chinanorth.azure.confluent.cloud:9092
用户名 ConfluentCloudUsername 名为 ConfluentCloudUsername 的应用设置包含来自 Confluent Cloud 网站的 API 访问密钥。
密码 ConfluentCloudPassword 名为 ConfluentCloudPassword 的应用设置包含从 Confluent Cloud 网站获取的 API 机密。

在本地开发期间,用于这些设置的字符串值必须作为 Azure 中的应用程序设置存在,或存在于 local.settings.json 文件Values 集合中。

还应在绑定定义中设置 ProtocolAuthenticationModeSslCaLocation
