Compartir a través de

适用于 Azure Functions 的 Apache Kafka 输出绑定

输出绑定使 Azure Functions 应用能够将消息发送到 Kafka 主题。

重要

Kafka 绑定适用于弹性消耗计划弹性高级计划和专用(应用服务)计划的 Functions。 它们仅在 Functions 运行时版本 4.x 上受支持。

示例

绑定的使用方式取决于函数应用中的 C# 形式。 可以使用以下形式之一:

一个已编译的 C# 函数,它使用 独立工作进程类库 ,该库在独立于运行时的进程中运行。

所使用的特性取决于特定的事件提供程序。

以下示例使用名为 <a0/> 的自定义返回类型,该类型由 HTTP 响应和 Kafka 输出组成。

        [Function("KafkaOutput")]

        public static MultipleOutputType Output(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
            FunctionContext executionContext)
        {
            var log = executionContext.GetLogger("HttpFunction");
            log.LogInformation("C# HTTP trigger function processed a request.");

            string message = req.FunctionContext
                                .BindingContext
                                .BindingData["message"]
                                .ToString();

            var response = req.CreateResponse(HttpStatusCode.OK);
            return new MultipleOutputType()
            {
                Kevent = message,
                HttpResponse = response
            };
        }

MultipleOutputType 类中, Kevent 是 Kafka 绑定的输出绑定变量。

    public class MultipleOutputType
    {
        [KafkaOutput("BrokerList",
                    "topic",
                    Username = "ConfluentCloudUserName",
                    Password = "ConfluentCloudPassword",
            Protocol = BrokerProtocol.SaslSsl,
            AuthenticationMode = BrokerAuthenticationMode.Plain
        )]        
        public string Kevent { get; set; }

        public HttpResponseData HttpResponse { get; set; }
    }

若要发送一批事件,请将字符串数组传递给输出类型,如以下示例所示:

        [Function("KafkaOutputMany")]

        public static MultipleOutputTypeForBatch Output(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
            FunctionContext executionContext)
        {
            var log = executionContext.GetLogger("HttpFunction");
            log.LogInformation("C# HTTP trigger function processed a request.");
            var response = req.CreateResponse(HttpStatusCode.OK);

            string[] messages = new string[2];
            messages[0] = "one";
            messages[1] = "two";

            return new MultipleOutputTypeForBatch()
            {
                Kevents = messages,
                HttpResponse = response
            };
        }

字符串数组定义为 Kevents 类上的属性,并且输出绑定在此属性上定义:

    public class MultipleOutputTypeForBatch
    {
        [KafkaOutput("BrokerList",
                     "topic",
                     Username = "ConfluentCloudUserName",
                     Password = "ConfluentCloudPassword",
            Protocol = BrokerProtocol.SaslSsl,
            AuthenticationMode = BrokerAuthenticationMode.Plain
        )]        
        public string[] Kevents { get; set; }

        public HttpResponseData HttpResponse { get; set; }
    }

以下函数将标头添加到 Kafka 输出数据:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

有关完整的一组可用 .NET 示例,请参阅 Kafka 扩展存储库

输出绑定的使用取决于 Node.js 编程模型的版本。

在 Node.js v4 模型中,可以直接在函数代码中定义输出绑定。 有关详细信息,请参阅 Azure Functions Node.js 开发人员指南

在这些示例中,事件提供程序为 Confluent 或 Azure 事件中心。 这些示例演示 HTTP 请求触发并从请求发送到 Kafka 主题的函数的 Kafka 输出绑定。

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

若要批量发送事件,请发送消息数组,如以下示例所示:

const { app, output } = require("@azure/functions");

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

async function kafkaOutputManyWithHttp(request, context) {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

这些示例演示如何将包含标头的事件消息发送到 Kafka 主题:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "javascript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

有关完整的一组可用 JavaScript 示例,请参阅 Kafka 扩展存储库

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

若要批量发送事件,请发送消息数组,如以下示例所示:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputManyWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

这些示例演示如何将包含标头的事件消息发送到 Kafka 主题:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "typescript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

有关一组完整的工作 TypeScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于你的事件提供程序(在这些示例中是 Confluent 或 Azure 事件中心)。 以下示例演示 HTTP 请求触发并从请求发送到 Kafka 主题的函数的 Kafka 输出绑定。

以下 function.json 在这些示例中定义特定提供程序的触发器:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

以下代码向主题发送消息:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

以下代码将多个消息作为一个数组发送到同一主题:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

以下示例演示如何将包含标头的事件消息发送到同一 Kafka 主题:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

有关完整的一组可用 PowerShell 示例,请参阅 Kafka 扩展存储库


输出绑定的用法取决于 Python 编程模型的版本。

在 Python v2 模型中,使用修饰器直接在函数代码中定义输出绑定。 有关详细信息,请参阅 Azure Functions Python 开发人员指南

这些示例演示 HTTP 请求触发并从请求发送到 Kafka 主题的函数的 Kafka 输出绑定。

input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'


@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
    outputMessage.set(json.dumps(['one', 'two']))
    return 'OK'

若要批量发送事件,请发送消息数组,如以下示例所示:

@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }]  },

这些示例演示如何将包含标头的事件消息发送到 Kafka 主题:

out.set(json.dumps(kevent))
return 'OK'

有关完整的一组可用 Python 示例,请参阅 Kafka 扩展存储库

用于配置输出绑定的注释取决于特定的事件提供程序。

以下函数向 Kafka 主题发送消息。

    @FunctionName("KafkaOutput")
    public HttpResponseMessage run(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
            @KafkaOutput(
                name = "kafkaOutput",
                topic = "topic",  
                brokerList="%BrokerList%",
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
                protocol = BrokerProtocol.SASLSSL
            )  OutputBinding<String> output,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");

        // Parse query parameter
        String query = request.getQueryParameters().get("message");
        String message = request.getBody().orElse(query);
        context.getLogger().info("Message:" + message);
        output.setValue(message);
        return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

以下示例演示如何将多个消息发送到一个 Kafka 主题。

    @FunctionName("KafkaOutputMany")
    public HttpResponseMessage run(
            @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
            @KafkaOutput(
                name = "kafkaOutput",
                topic = "topic",  
                brokerList="%BrokerList%",
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
                protocol = BrokerProtocol.SASLSSL
            )  OutputBinding<String[]> output,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");
        String[] messages = new String[2];
        messages[0] = "one";
        messages[1] = "two";
        output.setValue(messages);
        return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
    }

在此示例中,输出绑定参数更改为字符串数组。

最后一个示例使用这些 KafkaEntity 类和 KafkaHeader 类:

public class KafkaEntity {
    int Offset;
    int Partition;
    String Timestamp;
    String Topic;
    String Value;
    KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    String Key;
    String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

以下示例函数向 Kafka 主题发送包含标头的消息。

@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
            @KafkaOutput(
                name = "kafkaOutput",
                topic = "topic",  
                brokerList="%BrokerList%",
                username= "$ConnectionString",
                password = "EventHubConnectionString",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
                protocol = BrokerProtocol.SASLSSL
            )  OutputBinding<KafkaEntity> output,
            final ExecutionContext context) {
        context.getLogger().info("Java HTTP trigger processed a request.");

        // Parse query parameter
        String query = request.getQueryParameters().get("message");
        String message = request.getBody().orElse(query);
        KafkaHeaders[] headers = new KafkaHeaders[1];
        headers[0] = new KafkaHeaders("test", "java");
        KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
        output.setValue(kevent);
        return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
    }
}

有关完整的一组适用于 Confluent 的可用 Java 示例,请参阅 Kafka 扩展存储库

属性

进程内独立工作进程 C# 库都使用 Kafka 特性来定义函数触发器。

下表说明了可以使用此属性设置的属性:

参数 说明
BrokerList (必需)将输出发送到的 Kafka 代理的列表。 有关详细信息,请参阅连接
主题 (必需)将输出发送到的主题。
AvroSchema (可选)使用 Avro 协议时消息值的通用记录的架构。
KeyAvroSchema (可选)使用 Avro 协议时消息密钥的通用记录的架构。
KeyDataType (可选)用于将消息密钥发送到 Kafka 主题的数据类型。 如果 KeyAvroSchema 已设置,则此值为泛型记录。 接受的值是IntLongStringBinary
MaxMessageBytes (可选)要发送的输出消息的最大大小(以 MB 为单位),默认值为 1
BatchSize (可选)单个消息集内批处理的最大消息数,默认值为 10000
EnableIdempotence (可选)设置为 true 时,保证消息按原始生成顺序生成且仅成功生成一次,默认值为 false
MessageTimeoutMs (可选)本地消息超时(以毫秒为单位)。 此值仅在本地强制使用,并限制生成的消息等待成功传递的时间,默认值为 300000。 时间为 0 表示时间无限。 此值是用于传递消息(包括重试)的最长时间。 超过重试计数或消息超时时,会发生传递错误。
RequestTimeoutMs (可选)输出请求的确认超时,以毫秒为单位,默认值为 5000
MaxRetries (可选)重试发送失败消息的次数,默认值为 2。 重试可能会导致重新排序,除非将 EnableIdempotence 设置为 true
AuthenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NotSet(默认值)、GssapiPlainScramSha256ScramSha512OAuthBearer
用户名 (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
密码 (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
协议 (可选)与代理通信时使用的安全协议。 支持的值是NotSet(默认值)、plaintextsslsasl_plaintextsasl_ssl
SslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
SslCertificateLocation (可选)客户端证书的路径。
SslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
SslKeyPassword (可选)客户端证书的密码。
SslCertificatePEM (可选)以 PEM 格式作为字符串的客户端证书。 有关详细信息,请参阅连接
SslKeyPEM (可选)以 PEM 格式作为字符串的客户端私钥。 有关详细信息,请参阅连接
SslCaPEM (可选)PEM 格式为字符串的 CA 证书。 有关详细信息,请参阅连接
SslCertificateandKeyPEM (可选)以 PEM 格式作为字符串的客户端证书和密钥。 有关详细信息,请参阅连接
SchemaRegistryUrl (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接
SchemaRegistryUsername (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接
SchemaRegistryPassword (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接
OAuthBearerMethod (可选)OAuth Bearer 方法。 接受的值是 oidcdefault
OAuthBearerClientId (可选)如果 OAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端 ID。 有关详细信息,请参阅连接
OAuthBearerClientSecret (可选)如果 OAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端密码。 有关详细信息,请参阅连接
OAuthBearerScope (可选)指定对代理的访问请求的范围。
OAuthBearerTokenEndpointUrl (可选)使用方法时用于检索令牌的 oidc OAuth/OIDC 颁发者令牌终结点 HTTP(S) URI。 有关详细信息,请参阅连接
OAuthBearerExtensions (可选)使用方法时 oidc 要作为中转站的其他信息提供的 key=value 对的逗号分隔列表。 例如: supportFeatureX=true,organizationId=sales-emea

批注

通过 KafkaOutput 批注,可以创建写入特定主题的函数。 支持的选项包括以下元素:

元素 说明
name 表示函数代码中代理数据的变量的名称。
brokerList (必需)将输出发送到的 Kafka 代理的列表。 有关详细信息,请参阅连接
topic (必需)将输出发送到的主题。
dataType 定义 Functions 如何处理参数值。 默认情况下,值是作为一个字符串获取的,并且 Functions 会尝试将此字符串反序列化为实际的普通旧 Java 对象 (POJO)。 当此元素为 string 时,输入仅仅被视为一个字符串。 当此元素为 binary 时,消息是作为二进制数据接收的,Functions 会尝试将其反序列化为实际参数类型 byte[]。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。 (目前不支持 Java。)
maxMessageBytes (可选)要发送的输出消息的最大大小(以 MB 为单位),默认值为 1
batchSize (可选)单个消息集内批处理的最大消息数,默认值为 10000
enableIdempotence (可选)设置为 时,保证消息在原始生成顺序中成功生成一次,默认值为 < a1/>。
messageTimeoutMs (可选)本地消息超时(以毫秒为单位)。 此值仅在本地强制使用,并限制生成的消息等待成功传递的时间,默认值为 300000。 时间为 0 表示时间无限。 此值是用于传递消息(包括重试)的最长时间。 超过重试计数或消息超时时,会发生传递错误。
requestTimeoutMs (可选)输出请求的确认超时,以毫秒为单位,默认值为 5000
maxRetries (可选)重试发送失败消息的次数,默认值为 2。 除非设置为 EnableIdempotencetrue否则重试可能会导致重新排序。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NotSet(默认值)、GssapiPlainScramSha256ScramSha512
username (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值是NotSet(默认值)、plaintextsslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。
schemaRegistryUrl (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接
schemaRegistryUsername (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接
schemaRegistryPassword (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接

配置

下表解释了在 function.json 文件中设置的绑定配置属性。

“function.json”属性 说明
type 设置为 kafka
direction 设置为 out
name 表示函数代码中代理数据的变量的名称。
brokerList (必需)将输出发送到的 Kafka 代理的列表。 有关详细信息,请参阅连接
topic (必需)将输出发送到的主题。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。
keyAvroSchema (可选)使用 Avro 协议时消息密钥的通用记录的架构。
keyDataType (可选)用于将消息密钥发送到 Kafka 主题的数据类型。 如果 keyAvroSchema 已设置,则此值为泛型记录。 接受的值是IntLongStringBinary
maxMessageBytes (可选)要发送的输出消息的最大大小(以 MB 为单位),默认值为 1
batchSize (可选)单个消息集内批处理的最大消息数,默认值为 10000
enableIdempotence (可选)设置为 时,保证消息在原始生成顺序中成功生成一次,默认值为 < a1/>。
messageTimeoutMs (可选)本地消息超时(以毫秒为单位)。 此值仅在本地强制使用,并限制生成的消息等待成功传递的时间,默认值为 300000。 时间为 0 表示时间无限。 此值是用于传递消息(包括重试)的最长时间。 超过重试计数或消息超时时,会发生传递错误。
requestTimeoutMs (可选)输出请求的确认超时,以毫秒为单位,默认值为 5000
maxRetries (可选)重试发送失败消息的次数,默认值为 2。 除非设置为 EnableIdempotencetrue否则重试可能会导致重新排序。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NotSet(默认值)、GssapiPlainScramSha256ScramSha512
username (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值是NotSet(默认值)、plaintextsslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。
sslCertificatePEM (可选)以 PEM 格式作为字符串的客户端证书。 有关详细信息,请参阅连接
sslKeyPEM (可选)以 PEM 格式作为字符串的客户端私钥。 有关详细信息,请参阅连接
sslCaPEM (可选)PEM 格式为字符串的 CA 证书。 有关详细信息,请参阅连接
sslCertificateandKeyPEM (可选)以 PEM 格式作为字符串的客户端证书和密钥。 有关详细信息,请参阅连接
schemaRegistryUrl (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接
schemaRegistryUsername (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接
schemaRegistryPassword (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接
oAuthBearerMethod (可选)OAuth Bearer 方法。 接受的值是 oidcdefault
oAuthBearerClientId (可选)如果 oAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端 ID。 有关详细信息,请参阅连接
oAuthBearerClientSecret (可选)如果 oAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端密码。 有关详细信息,请参阅连接
oAuthBearerScope (可选)指定对代理的访问请求的范围。
oAuthBearerTokenEndpointUrl (可选)使用方法时用于检索令牌的 oidc OAuth/OIDC 颁发者令牌终结点 HTTP(S) URI。 有关详细信息,请参阅连接

配置

下表解释了在 function.json 文件中设置的绑定配置属性。 Python 对配置属性使用snake_case命名约定。

“function.json”属性 说明
type 设置为 kafka
direction 设置为 out
name 表示函数代码中代理数据的变量的名称。
broker_list (必需)将输出发送到的 Kafka 代理的列表。 有关详细信息,请参阅连接
topic (必需)将输出发送到的主题。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。
maxMessageBytes (可选)要发送的输出消息的最大大小(以 MB 为单位),默认值为 1
batchSize (可选)单个消息集内批处理的最大消息数,默认值为 10000
enableIdempotence (可选)设置为 时,保证消息在原始生成顺序中成功生成一次,默认值为 < a1/>。
messageTimeoutMs (可选)本地消息超时(以毫秒为单位)。 此值仅在本地强制使用,并限制生成的消息等待成功传递的时间,默认值为 300000。 时间为 0 表示时间无限。 此值是用于传递消息(包括重试)的最长时间。 超过重试计数或消息超时时,会发生传递错误。
requestTimeoutMs (可选)输出请求的确认超时,以毫秒为单位,默认值为 5000
maxRetries (可选)重试发送失败消息的次数,默认值为 2。 除非设置为 EnableIdempotencetrue否则重试可能会导致重新排序。
authentication_mode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NOTSET(默认值)、GssapiPlainScramSha256ScramSha512
username (可选)SASL 身份验证的用户名。 当 authentication_modeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)SASL 身份验证的密码。 当 authentication_modeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值是NOTSET(默认值)、plaintextsslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。
schema_registry_url (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接
schema_registry_username (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接
schema_registry_password (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接
o_auth_bearer_method (可选)OAuth Bearer 方法。 接受的值是 oidcdefault
o_auth_bearer_client_id (可选)如果 o_auth_bearer_method 设置为 oidc,则指定 OAuth 持有者客户端 ID。 有关详细信息,请参阅连接
o_auth_bearer_client_secret (可选)如果 o_auth_bearer_method 设置为 oidc,则指定 OAuth 持有者客户端密码。 有关详细信息,请参阅连接
o_auth_bearer_scope (可选)指定对代理的访问请求的范围。
o_auth_bearer_token_endpoint_url (可选)使用方法时用于检索令牌的 oidc OAuth/OIDC 颁发者令牌终结点 HTTP(S) URI。 有关详细信息,请参阅连接

注意

证书 PEM 相关属性和 Avro 密钥相关属性在 Python 库中尚不可用。

使用情况

键类型和值类型都适用于内置的 AvroProtobuf 序列化。

事件的偏移量、分区和时间戳在运行时生成。 只能设置函数中的值和标头。 在 function.json 文件中设置主题。

请确保你有权访问要写入的 Kafka 主题。 需要为绑定配置对 Kafka 主题的访问权限和连接凭据。

在高级计划中,必须为 Kafka 输出启用运行时缩放监视,以便横向扩展到多个实例。 若要了解详细信息,请参阅启用运行时缩放

有关 Kafka 触发器支持的完整一组 host.json 设置,请参阅 host.json 设置

连接

将触发器和绑定所需的所有连接信息存储在应用程序设置中,而不是存储在代码中的绑定定义中。 本指南适用于不应存储在代码中的凭据。

重要

凭据设置必须引用应用程序设置。 不要在代码或配置文件中对凭据进行硬编码。 在本地运行时,请对凭据使用 local.settings.json 文件,并且不要发布 local.settings.json 文件。

连接到 Azure 中 Confluent 提供的托管 Kafka 群集时,可以使用以下身份验证方法之一。

注意

使用 Flex Consumption 计划时,不支持基于文件位置的证书身份验证属性(SslCaLocationSslCertificateLocationSslKeyLocation)。 请改用基于 PEM 的证书属性(SslCaPEM、、SslCertificatePEMSslKeyPEMSslCertificateandKeyPEM)或将证书存储在 Azure Key Vault 中。

架构注册表

若要在 Kafka 扩展中使用 Confluent 提供的架构注册表,请设置以下凭据:

设置 建议的值 说明
SchemaRegistryUrl SchemaRegistryUrl 用于架构管理的架构注册表服务的 URL。 通常为格式 https://psrc-xyz.us-east-2.aws.confluent.cloud
SchemaRegistryUsername CONFLUENT_API_KEY 架构注册表上基本身份验证的用户名(如果需要)。
SchemaRegistryPassword CONFLUENT_API_SECRET 架构注册表上基本身份验证的密码(如果需要)。

用户名/密码身份验证

使用此形式的身份验证时,请确保设置为ProtocolSaslPlaintextSaslSsl设置为AuthenticationModePlainScramSha256或者ScramSha512,如果使用的 CA 证书不同于默认 ISRG 根 X1 证书,请确保更新或SslCaLocationSslCaPEM

设置 建议的值 说明
BrokerList BootstrapServer 名为 BootstrapServer 的应用设置包含在 Confluent Cloud 设置页面中找到的引导服务器的值。 该值类似于 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092
用户名 ConfluentCloudUsername 名为 ConfluentCloudUsername 的应用设置包含来自 Confluent Cloud 网站的 API 访问密钥。
密码 ConfluentCloudPassword 名为 ConfluentCloudPassword 的应用设置包含从 Confluent Cloud 网站获取的 API 机密。
SslCaPEM SSLCaPemCertificate 以 PEM 格式将 CA 证书作为字符串命名 SSLCaPemCertificate 的应用设置。 该值应遵循标准格式,例如: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----

SSL 身份验证

确保 Protocol 已设置为 SSL.

设置 建议的值 说明
BrokerList BootstrapServer 名为 BootstrapServer 的应用设置包含在 Confluent Cloud 设置页面中找到的引导服务器的值。 该值类似于 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092
SslCaPEM SslCaCertificatePem 名为 SslCaCertificatePem 包含 CA 证书 PEM 值的应用设置作为字符串。 该值应遵循标准格式: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslCertificatePEM SslClientCertificatePem 名为 SslClientCertificatePem 包含客户端证书 PEM 值的应用设置作为字符串。 该值应遵循标准格式: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslKeyPEM SslClientKeyPem 名为 SslClientKeyPem 包含客户端私钥的 PEM 值作为字符串的应用设置。 该值应遵循标准格式: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY-----
SslCertificateandKeyPEM SslClientCertificateAndKeyPem 名为 SslClientCertificateAndKeyPem 包含客户端证书的 PEM 值和以字符串形式串联的客户端私钥的应用设置。 该值应遵循标准格式: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY-----
SslKeyPassword SslClientKeyPassword 名为 SslClientKeyPassword 包含私钥密码的应用设置(如果有)。

OAuth 身份验证

使用 OAuth 身份验证时,请在绑定定义中配置与 OAuth 相关的属性。

在本地开发期间,用于这些设置的字符串值必须作为 Azure 中的应用程序设置存在,或存在于 Values 集合中。

还应设置 Protocol 绑定定义和 AuthenticationMode 绑定定义。

后续步骤