Azure Functions 的 Apache Kafka 触发器

可以使用 Azure Functions 中的 Apache Kafka 触发器来运行函数代码,以响应 Kafka 主题中的消息。 还可以使用 Kafka 输出绑定从函数写入主题。 若要了解设置和配置详细信息,请参阅适用于 Azure Functions 的 Apache Kafka 绑定概述

重要

Kafka 绑定仅可用于弹性高级计划专用(应用服务)计划中的 Functions。 它们仅在 3.x 版及更高版本的 Functions 运行时上受支持。

示例

触发器的用法取决于函数应用中使用的 C# 模态,后者可以是以下模式之一:

独立工作进程类库的已编译 C# 函数在独立于运行时的进程中运行。

所使用的特性取决于特定的事件提供程序。

以下示例显示了一个 C# 函数,该函数将 Kafka 消息作为 Kafka 事件进行读取和记录:

        [Function("KafkaTrigger")]
        public static void Run(
            [KafkaTrigger("BrokerList",
                          "topic",
                          Username = "ConfluentCloudUserName",
                          Password = "ConfluentCloudPassword",
                          Protocol = BrokerProtocol.SaslSsl,
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default")] string eventData, FunctionContext context)
        {
            var logger = context.GetLogger("KafkaFunction");
            logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
        }

若要批量接收事件,请使用字符串数组作为输入,如以下示例所示:

        [Function("KafkaTriggerMany")]
        public static void Run(
            [KafkaTrigger("BrokerList",
                          "topic",
                          Username = "ConfluentCloudUserName",
                          Password = "ConfluentCloudPassword",
                          Protocol = BrokerProtocol.SaslSsl,
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default",
                          IsBatched = true)] string[] events, FunctionContext context)
        {
            foreach (var kevent in events)
            {
                var logger = context.GetLogger("KafkaFunction");
                logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
            }

以下函数记录 Kafka 事件的消息和标头:

        [Function("KafkaTriggerWithHeaders")]
        public static void Run(
            [KafkaTrigger("BrokerList",
                          "topic",
                          Username = "ConfluentCloudUserName",
                          Password = "ConfluentCloudPassword",
                          Protocol = BrokerProtocol.SaslSsl,
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default")] string eventData, FunctionContext context)
        {
            var eventJsonObject = JObject.Parse(eventData);
            var logger = context.GetLogger("KafkaFunction");
            logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
            var headersJArr = eventJsonObject["Headers"] as JArray;
            logger.LogInformation("Headers for this event: ");
            foreach (JObject header in headersJArr)
            {
                logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

            }
        }

有关完整的一组可用 .NET 示例,请参阅 Kafka 扩展存储库

注意

有关等效的一组 TypeScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于你的事件提供程序(在这些示例中是 Confluent 或 Azure 事件中心)。 以下示例演示用于读取和记录 Kafka 消息的函数的 Kafka 触发器。

以下 function.json 定义特定提供程序的触发器:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

然后,在触发函数时会运行以下代码:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

若要批量接收事件,请将 function.json 文件中的 cardinality 值设为 many,如以下示例所示:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

然后,以下代码会分析事件数组并记录事件数据:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

以下代码还记录标头数据:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

可以为传递给触发器的事件定义一个通用的 Avro 架构。 以下 function.json 使用通用 Avro 架构定义了特定提供程序的触发器:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然后,在触发函数时会运行以下代码:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

有关完整的一组可用 JavaScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于你的事件提供程序(在这些示例中是 Confluent 或 Azure 事件中心)。 以下示例演示用于读取和记录 Kafka 消息的函数的 Kafka 触发器。

以下 function.json 定义特定提供程序的触发器:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

然后,在触发函数时会运行以下代码:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

若要批量接收事件,请将 function.json 文件中的 cardinality 值设为 many,如以下示例所示:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

然后,以下代码会分析事件数组并记录事件数据:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

以下代码还记录标头数据:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

可以为传递给触发器的事件定义一个通用的 Avro 架构。 以下 function.json 使用通用 Avro 架构定义了特定提供程序的触发器:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然后,在触发函数时会运行以下代码:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

有关完整的一组可用 PowerShell 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于你的事件提供程序(在这些示例中是 Confluent 或 Azure 事件中心)。 以下示例演示用于读取和记录 Kafka 消息的函数的 Kafka 触发器。

以下 function.json 定义特定提供程序的触发器:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

然后,在触发函数时会运行以下代码:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

若要批量接收事件,请将 function.json 文件中的 cardinality 值设为 many,如以下示例所示:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

然后,以下代码会分析事件数组并记录事件数据:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

以下代码还记录标头数据:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

可以为传递给触发器的事件定义一个通用的 Avro 架构。 以下 function.json 使用通用 Avro 架构定义了特定提供程序的触发器:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然后,在触发函数时会运行以下代码:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

有关完整的一组可用 Python 示例,请参阅 Kafka 扩展存储库

用于配置触发器的注释取决于特定的事件提供程序。

以下示例显示了一个 Java 函数,该函数读取和记录 Kafka 事件的内容:

    @FunctionName("KafkaTrigger")
    public void runSingle(
            @KafkaTrigger(
                name = "KafkaTrigger",
                topic = "topic",  
                brokerList="%BrokerList%",
                consumerGroup="$Default", 
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
                dataType = "string"
             ) String kafkaEventData,
            final ExecutionContext context) {
            context.getLogger().info(kafkaEventData);
    }

若要批量接收事件,请使用输入字符串作为数组,如以下示例所示:

    @FunctionName("KafkaTriggerMany")
    public void runMany(
            @KafkaTrigger(
                name = "kafkaTriggerMany",
                topic = "topic",  
                brokerList="%BrokerList%",
                consumerGroup="$Default", 
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
                cardinality = Cardinality.MANY,
                dataType = "string"
             ) String[] kafkaEvents,
            final ExecutionContext context) {
            for (String kevent: kafkaEvents) {
                context.getLogger().info(kevent);
            }    
    }

以下函数记录 Kafka 事件的消息和标头:

    @FunctionName("KafkaTriggerManyWithHeaders")
    public void runSingle(
            @KafkaTrigger(
                name = "KafkaTrigger",
                topic = "topic",  
                brokerList="%BrokerList%",
                consumerGroup="$Default", 
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
                dataType = "string",
                cardinality = Cardinality.MANY
             ) List<String> kafkaEvents,
            final ExecutionContext context) {
                Gson gson = new Gson(); 
                for (String keventstr: kafkaEvents) {
                    KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                    context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                    context.getLogger().info("Headers for the message:");
                    for (KafkaHeaders header : kevent.Headers) {
                        String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                        context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                    }                
                }
            }     

可以为传递给触发器的事件定义一个通用的 Avro 架构。 以下函数使用通用 Avro 架构定义了特定提供程序的触发器:

    private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

    @FunctionName("KafkaAvroGenericTrigger")
    public void runOne(
            @KafkaTrigger(
                    name = "kafkaAvroGenericSingle",
                    topic = "topic",
                    brokerList="%BrokerList%",
                    consumerGroup="$Default",
                    username = "ConfluentCloudUsername",
                    password = "ConfluentCloudPassword",
                    avroSchema = schema,
                    authenticationMode = BrokerAuthenticationMode.PLAIN,
                    protocol = BrokerProtocol.SASLSSL) Payment payment,
            final ExecutionContext context) {
        context.getLogger().info(payment.toString());
    }

有关完整的一组适用于 Confluent 的可用 Java 示例,请参阅 Kafka 扩展存储库

属性

进程内独立工作进程 C# 库都使用 KafkaTriggerAttribute 来定义函数触发器。

下表说明了可使用此触发器特性设置的属性:

参数 说明
BrokerList (必需)触发器监视的 Kafka 代理的列表。 有关详细信息,请参阅连接
主题 (必需)触发器监视的主题。
ConsumerGroup (可选)触发器使用的 Kafka 使用者组。
AvroSchema (可选)使用 Avro 协议时的通用记录的架构。
AuthenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
用户名 (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
密码 (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
协议 (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
SslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
SslCertificateLocation (可选)客户端证书的路径。
SslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
SslKeyPassword (可选)客户端证书的密码。

批注

使用 KafkaTrigger 注释,可以创建在收到主题时要运行的函数。 支持的选项包括以下元素:

元素 说明
name (必需)变量的名称,表示函数代码中的队列或主题消息。
brokerList (必需)触发器监视的 Kafka 代理的列表。 有关详细信息,请参阅连接
topic (必需)触发器监视的主题。
基数 (可选)指示触发器输入的基数。 支持的值为 ONE(默认值)和 MANY。 当输入是单个消息时使用 ONE,当输入是消息数组时使用 MANY。 使用 MANY 时,还必须设置一个 dataType
dataType 定义 Functions 如何处理参数值。 默认情况下,值是作为一个字符串获取的,并且 Functions 会尝试将此字符串反序列化为实际的普通旧 Java 对象 (POJO)。 当此元素为 string 时,输入仅仅被视为一个字符串。 当此元素为 binary 时,消息是作为二进制数据接收的,Functions 会尝试将其反序列化为实际参数类型 byte[]。
consumerGroup (可选)触发器使用的 Kafka 使用者组。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
username (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。

配置

下表解释了在 function.json 文件中设置的绑定配置属性。

“function.json”属性 说明
type (必需)必须设置为 kafkaTrigger
direction (必需)必须设置为 in
name (必需)表示函数代码中代理数据的变量的名称。
brokerList (必需)触发器监视的 Kafka 代理的列表。 有关详细信息,请参阅连接
topic (必需)触发器监视的主题。
基数 (可选)指示触发器输入的基数。 支持的值为 ONE(默认值)和 MANY。 当输入是单个消息时使用 ONE,当输入是消息数组时使用 MANY。 使用 MANY 时,还必须设置一个 dataType
dataType 定义 Functions 如何处理参数值。 默认情况下,值是作为一个字符串获取的,并且 Functions 会尝试将此字符串反序列化为实际的普通旧 Java 对象 (POJO)。 当此元素为 string 时,输入仅仅被视为一个字符串。 当此元素为 binary 时,消息是作为二进制数据接收的,Functions 会尝试将其反序列化为实际参数类型 byte[]。
consumerGroup (可选)触发器使用的 Kafka 使用者组。
avroSchema (可选)使用 Avro 协议时的通用记录的架构。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值为 GssapiPlain(默认值)、ScramSha256ScramSha512
username (可选)SASL 身份验证的用户名。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
password (可选)SASL 身份验证的密码。 当 AuthenticationModeGssapi 时不受支持。 有关详细信息,请参阅连接
protocol (可选)与代理通信时使用的安全协议。 支持的值为 plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥 (PEM) 的路径。
sslKeyPassword (可选)客户端证书的密码。

使用情况

目前,采用作为 JSON 有效负载的字符串和字符串数组的形式的 Kafka 事件受支持。

Kafka 消息以作为 JSON 有效负载的字符串和字符串数组的形式传递给函数。

在高级计划中,必须为 Kafka 输出启用运行时规模监视,以便能够横向扩展到多个实例。 若要了解详细信息,请参阅启用运行时缩放

不能在 Azure 门户中使用“代码 + 测试”页面的“测试/运行”功能来处理 Kafka 触发器。 必须改为将测试事件直接发送到由触发器监视的主题。

有关 Kafka 触发器支持的完整一组 host.json 设置,请参阅 host.json 设置

连接

触发器和绑定所需的所有连接信息都应在应用程序设置中(而不是在代码内的绑定定义中)维护。 这适用于凭据,它们永远不应存储在代码中。

重要

凭据设置必须引用应用程序设置。 不要在代码或配置文件中对凭据进行硬编码。 在本地运行时,请对凭据使用 local.settings.json 文件,并且不要发布 local.settings.json 文件。

Azure 中的 Confluent 提供的托管 Kafka 群集进行连接时,请确保在触发器或绑定中设置了适用于 Confluent Cloud 环境的以下身份验证凭据:

设置 建议的值 说明
BrokerList BootstrapServer 名为 BootstrapServer 的应用设置包含在 Confluent Cloud 设置页面中找到的引导服务器的值。 该值类似于 xyz-xyzxzy.chinanorth.azure.confluent.cloud:9092
用户名 ConfluentCloudUsername 名为 ConfluentCloudUsername 的应用设置包含来自 Confluent Cloud 网站的 API 访问密钥。
密码 ConfluentCloudPassword 名为 ConfluentCloudPassword 的应用设置包含从 Confluent Cloud 网站获取的 API 机密。

在本地开发期间,用于这些设置的字符串值必须作为 Azure 中的应用程序设置存在,或存在于 local.settings.json 文件Values 集合中。

还应在绑定定义中设置 ProtocolAuthenticationModeSslCaLocation

后续步骤