适用于 Azure Functions 的 Apache Kafka 触发器

可以使用 Azure Functions 中的 Apache Kafka 触发器来运行函数代码,以响应 Kafka 主题中的消息。 还可以使用 Kafka 输出绑定 从函数写入主题。 有关设置和配置详细信息的信息,请参阅 适用于 Azure Functions 的 Apache Kafka 绑定概述

重要

Kafka 绑定仅可用于弹性高级计划专用(应用服务)计划中的 Functions。 它们仅在 3.x 版及更高版本的 Functions 运行时上受支持。

Example

触发器的用法取决于函数应用中使用的 C# 形式,这可以是以下模式之一:

独立工作进程类库编译的 C# 函数在独立于运行时的进程中运行。

使用的属性取决于特定的事件提供程序。

以下示例演示了一个 C# 函数,该函数读取 Kafka 消息并将其记录为 Kafka 事件:

        [Function("KafkaTrigger")]
        public static void Run(
            [KafkaTrigger("BrokerList",
                          "topic",
                          Username = "ConfluentCloudUserName",
                          Password = "ConfluentCloudPassword",
                          Protocol = BrokerProtocol.SaslSsl,
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default")] string eventData, FunctionContext context)
        {
            var logger = context.GetLogger("KafkaFunction");
            logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
        }

若要在批处理中接收事件,请使用字符串数组作为输入,如以下示例所示:

        [Function("KafkaTriggerMany")]
        public static void Run(
            [KafkaTrigger("BrokerList",
                          "topic",
                          Username = "ConfluentCloudUserName",
                          Password = "ConfluentCloudPassword",
                          Protocol = BrokerProtocol.SaslSsl,
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default",
                          IsBatched = true)] string[] events, FunctionContext context)
        {
            foreach (var kevent in events)
            {
                var logger = context.GetLogger("KafkaFunction");
                logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
            }

以下函数记录 Kafka 事件的消息和标头:

        [Function("KafkaTriggerWithHeaders")]
        public static void Run(
            [KafkaTrigger("BrokerList",
                          "topic",
                          Username = "ConfluentCloudUserName",
                          Password = "ConfluentCloudPassword",
                          Protocol = BrokerProtocol.SaslSsl,
                          AuthenticationMode = BrokerAuthenticationMode.Plain,
                          ConsumerGroup = "$Default")] string eventData, FunctionContext context)
        {
            var eventJsonObject = JObject.Parse(eventData);
            var logger = context.GetLogger("KafkaFunction");
            logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
            var headersJArr = eventJsonObject["Headers"] as JArray;
            logger.LogInformation("Headers for this event: ");
            foreach (JObject header in headersJArr)
            {
                logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

            }
        }

有关一组完整的工作 .NET 示例,请参阅 Kafka 扩展存储库

注释

有关一组等效的 TypeScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于事件提供程序,这些示例中为 Confluent 或 Azure 事件中心。 以下示例演示读取和记录 Kafka 消息的函数的 Kafka 触发器。

以下 function.json 定义特定提供程序的触发器:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "topic": "topic",
            "brokerList": "%BrokerList%",
            "username": "%ConfluentCloudUserName%",
            "password": "%ConfluentCloudPassword%",
            "protocol": "saslSsl",
            "authenticationMode": "plain",
            "consumerGroup" : "$Default",
            "dataType": "string"
        }
    ]
}

然后,在触发函数时运行以下代码:

module.exports = async function (context, event) {
    // context.log.info(event)
    context.log.info(`JavaScript Kafka trigger function called for message ${event.Value}`);
};

若要在批处理中接收事件,请将 cardinality 值设置为 many function.json 文件中,如以下示例所示:

{
    "bindings": [
        {
            "type": "kafkaTrigger",
            "name": "event",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%"
        }
    ]
}

然后,以下代码分析事件数组并记录事件数据:

module.exports = async function (context, events) {
    function print(event) {
        var eventJson = JSON.parse(event)
        context.log.info(`JavaScript Kafka trigger function called for message ${eventJson.Value}`);
    }
    events.map(print);
};

以下代码还会记录标头数据:

module.exports = async function (context, event) {
  function print(kevent) {
    var keventJson = JSON.parse(kevent)
    context.log.info(`JavaScript Kafka trigger function called for message ${keventJson.Value}`);
    context.log.info(`Headers for this message:`)
    let headers =  keventJson.Headers;
    headers.forEach(element => {
        context.log.info(`Key: ${element.Key} Value:${Buffer.from(element.Value, 'base64')}`) 
    });
  }
  event.map(print);
};

可以为传递给触发器的事件定义通用 Avro 架构 。 以下 function.json 使用通用 Avro 架构定义特定提供程序的触发器:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaAvroGenericSingle",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "authenticationMode" : "PLAIN",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然后,在触发函数时运行以下代码:

module.exports = async function (context, event) {
    context.log.info(`JavaScript Kafka trigger function called for message ${JSON.stringify(event)}`);
};

有关一组完整的工作 JavaScript 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于事件提供程序,这些示例中为 Confluent 或 Azure 事件中心。 以下示例演示读取和记录 Kafka 消息的函数的 Kafka 触发器。

以下 function.json 定义特定提供程序的触发器:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

然后,在触发函数时运行以下代码:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

若要在批处理中接收事件,请将 cardinality 值设置为 many function.json 文件中,如以下示例所示:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

然后,以下代码分析事件数组并记录事件数据:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

以下代码还会记录标头数据:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

可以为传递给触发器的事件定义通用 Avro 架构 。 以下 function.json 使用通用 Avro 架构定义特定提供程序的触发器:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然后,在触发函数时运行以下代码:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

有关一组完整的工作 PowerShell 示例,请参阅 Kafka 扩展存储库

function.json 文件的特定属性取决于事件提供程序,这些示例中为 Confluent 或 Azure 事件中心。 以下示例演示读取和记录 Kafka 消息的函数的 Kafka 触发器。

以下 function.json 定义特定提供程序的触发器:

{
      "scriptFile": "main.py",
      "bindings": [
        {
          "type": "kafkaTrigger",
          "name": "kevent",
          "topic": "topic",
          "brokerList": "%BrokerList%",
          "username": "%ConfluentCloudUserName%",
          "password": "%ConfluentCloudPassword%",
          "consumerGroup" : "functions",
          "protocol": "saslSsl",
          "authenticationMode": "plain"
        }
    ]
}

然后,在触发函数时运行以下代码:

import logging
from azure.functions import KafkaEvent

def main(kevent : KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

若要在批处理中接收事件,请将 cardinality 值设置为 many function.json 文件中,如以下示例所示:

{
      "scriptFile": "main.py",
      "bindings": [
        {
            "type" : "kafkaTrigger",
            "direction": "in",
            "name" : "kevents",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "topic" : "message_python",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "dataType": "string",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "BrokerList" : "%BrokerList%"    
        }
    ]
}

然后,以下代码分析事件数组并记录事件数据:

import logging
import typing
from azure.functions import KafkaEvent

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

以下代码还会记录标头数据:

import logging
import typing
from azure.functions import KafkaEvent
import json
import base64

def main(kevents : typing.List[KafkaEvent]):
    for event in kevents:
        event_dec = event.get_body().decode('utf-8')
        event_json = json.loads(event_dec)
        logging.info("Python Kafka trigger function called for message " + event_json["Value"])
        headers = event_json["Headers"]
        for header in headers:
            logging.info("Key: "+ header['Key'] + " Value: "+ str(base64.b64decode(header['Value']).decode('ascii')))

可以为传递给触发器的事件定义通用 Avro 架构 。 以下 function.json 使用通用 Avro 架构定义特定提供程序的触发器:

{
  "scriptFile": "main.py",
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaTriggerAvroGeneric",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

然后,在触发函数时运行以下代码:

import logging
from azure.functions import KafkaEvent

def main(kafkaTriggerAvroGeneric : KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

有关一组完整的工作 Python 示例,请参阅 Kafka 扩展存储库

用于配置触发器的批注取决于特定的事件提供程序。

以下示例演示一个 Java 函数,该函数读取和记录 Kafka 事件的内容:

    @FunctionName("KafkaTrigger")
    public void runSingle(
            @KafkaTrigger(
                name = "KafkaTrigger",
                topic = "topic",  
                brokerList="%BrokerList%",
                consumerGroup="$Default", 
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
                dataType = "string"
             ) String kafkaEventData,
            final ExecutionContext context) {
            context.getLogger().info(kafkaEventData);
    }

若要在批处理中接收事件,请使用输入字符串作为数组,如以下示例所示:

    @FunctionName("KafkaTriggerMany")
    public void runMany(
            @KafkaTrigger(
                name = "kafkaTriggerMany",
                topic = "topic",  
                brokerList="%BrokerList%",
                consumerGroup="$Default", 
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
                cardinality = Cardinality.MANY,
                dataType = "string"
             ) String[] kafkaEvents,
            final ExecutionContext context) {
            for (String kevent: kafkaEvents) {
                context.getLogger().info(kevent);
            }    
    }

以下函数记录 Kafka 事件的消息和标头:

    @FunctionName("KafkaTriggerManyWithHeaders")
    public void runSingle(
            @KafkaTrigger(
                name = "KafkaTrigger",
                topic = "topic",  
                brokerList="%BrokerList%",
                consumerGroup="$Default", 
                username = "%ConfluentCloudUsername%", 
                password = "ConfluentCloudPassword",
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL,
                // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
                dataType = "string",
                cardinality = Cardinality.MANY
             ) List<String> kafkaEvents,
            final ExecutionContext context) {
                Gson gson = new Gson(); 
                for (String keventstr: kafkaEvents) {
                    KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                    context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                    context.getLogger().info("Headers for the message:");
                    for (KafkaHeaders header : kevent.Headers) {
                        String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                        context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                    }                
                }
            }     

可以为传递给触发器的事件定义通用 Avro 架构 。 以下函数为具有通用 Avro 架构的特定提供程序定义触发器:

    private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

    @FunctionName("KafkaAvroGenericTrigger")
    public void runOne(
            @KafkaTrigger(
                    name = "kafkaAvroGenericSingle",
                    topic = "topic",
                    brokerList="%BrokerList%",
                    consumerGroup="$Default",
                    username = "ConfluentCloudUsername",
                    password = "ConfluentCloudPassword",
                    avroSchema = schema,
                    authenticationMode = BrokerAuthenticationMode.PLAIN,
                    protocol = BrokerProtocol.SASLSSL) Payment payment,
            final ExecutionContext context) {
        context.getLogger().info(payment.toString());
    }

有关 Confluent 的完整工作 Java 示例集,请参阅 Kafka 扩展存储库

特性

进程内独立工作进程 C# 库都使用KafkaTriggerAttribute定义函数触发器。

下表说明了可以使用此触发器属性设置的属性:

参数 Description
BrokerList (必需)由触发器监视的 Kafka 中转站的列表。 有关详细信息,请参阅 “连接 ”。
Topic (必需)由触发器监视的主题。
ConsumerGroup (可选)触发器使用的 Kafka 使用者组。
AvroSchema (可选)使用 Avro 协议时通用记录的架构。
AuthenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证模式。 支持的值是 Gssapi,(默认值),ScramSha256ScramSha512Plain
用户名 (可选)SASL 身份验证的用户名。 不支持何时AuthenticationModeGssapi为 . 有关详细信息,请参阅 “连接 ”。
密码 (可选)SASL 身份验证的密码。 不支持何时AuthenticationModeGssapi为 . 有关详细信息,请参阅 “连接 ”。
协议 (可选)与代理通信时使用的安全协议。 支持的值是plaintext(默认值)、sslsasl_plaintextsasl_ssl
SslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
SslCertificateLocation (可选)客户端证书的路径。
SslKeyLocation (可选)用于身份验证的客户端私钥(PEM)的路径。
SslKeyPassword (可选)客户端证书的密码。

Annotations

KafkaTrigger 注允许创建在收到主题时运行的函数。 支持的选项包括以下元素:

元素 Description
name (必需)代表函数代码中的队列或主题消息的变量的名称。
brokerList (必需)由触发器监视的 Kafka 中转站的列表。 有关详细信息,请参阅 “连接 ”。
主题 (必需)由触发器监视的主题。
基数 (可选)指示触发器输入的基数。 支持的值是 ONE (默认值) 和 MANY。 当输入是单个消息以及MANY输入是消息数组时使用ONE。 使用 MANY时,还必须设置一个 dataType
dataType 定义 Functions 如何处理参数值。 默认情况下,该值作为字符串获取,Functions 会尝试将字符串反序列化为实际的纯旧 Java 对象(POJO)。 当 string输入被视为字符串时。 当消息作为二进制数据接收时 binary,Functions 会尝试将其反序列化为实际的参数类型 byte[]。
consumerGroup (可选)触发器使用的 Kafka 使用者组。
avroSchema (可选)使用 Avro 协议时通用记录的架构。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证模式。 支持的值是 Gssapi,(默认值),ScramSha256ScramSha512Plain
username (可选)SASL 身份验证的用户名。 不支持何时AuthenticationModeGssapi为 . 有关详细信息,请参阅 “连接 ”。
password (可选)SASL 身份验证的密码。 不支持何时AuthenticationModeGssapi为 . 有关详细信息,请参阅 “连接 ”。
protocol (可选)与代理通信时使用的安全协议。 支持的值是plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥(PEM)的路径。
sslKeyPassword (可选)客户端证书的密码。

配置

下表解释了在 function.json 文件中设置的绑定配置属性。

“function.json”属性 Description
类型 (必需)必须设置为 kafkaTrigger.
方向 (必需)必须设置为 in.
name (必需)代表函数代码中中转数据的变量的名称。
brokerList (必需)由触发器监视的 Kafka 中转站的列表。 有关详细信息,请参阅 “连接 ”。
主题 (必需)由触发器监视的主题。
基数 (可选)指示触发器输入的基数。 支持的值是 ONE (默认值) 和 MANY。 当输入是单个消息以及MANY输入是消息数组时使用ONE。 使用 MANY时,还必须设置一个 dataType
dataType 定义 Functions 如何处理参数值。 默认情况下,该值作为字符串获取,Functions 会尝试将字符串反序列化为实际的纯旧 Java 对象(POJO)。 当 string输入被视为字符串时。 当消息作为二进制数据接收时 binary,Functions 会尝试将其反序列化为实际的参数类型 byte[]。
consumerGroup (可选)触发器使用的 Kafka 使用者组。
avroSchema (可选)使用 Avro 协议时通用记录的架构。
authenticationMode (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证模式。 支持的值是 Gssapi,(默认值),ScramSha256ScramSha512Plain
username (可选)SASL 身份验证的用户名。 不支持何时AuthenticationModeGssapi为 . 有关详细信息,请参阅 “连接 ”。
password (可选)SASL 身份验证的密码。 不支持何时AuthenticationModeGssapi为 . 有关详细信息,请参阅 “连接 ”。
protocol (可选)与代理通信时使用的安全协议。 支持的值是plaintext(默认值)、sslsasl_plaintextsasl_ssl
sslCaLocation (可选)用于验证代理证书的 CA 证书文件的路径。
sslCertificateLocation (可选)客户端证书的路径。
sslKeyLocation (可选)用于身份验证的客户端私钥(PEM)的路径。
sslKeyPassword (可选)客户端证书的密码。

Usage

Kafka 事件当前支持为 JSON 有效负载的字符串和字符串数组。

Kafka 消息作为字符串和作为 JSON 有效负载的字符串数组传递给函数。

在高级计划中,必须为 Kafka 输出启用运行时缩放监视,以便能够横向扩展到多个实例。 若要了解详细信息,请参阅 “启用运行时缩放”。

不能在 Azure 门户中使用 Code + 测试页的“测试/运行”功能来处理 Kafka 触发器。 必须改为将测试事件直接发送到由触发器监视的主题。

有关 Kafka 触发器支持的 host.json 设置的完整集,请参阅 host.json 设置

连接

触发器和绑定所需的所有连接信息都应保留在应用程序设置中,而不是在代码中的绑定定义中维护。 对于不应存储在代码中的凭据,这是事实。

重要

凭据设置必须引用 应用程序设置。 不要在代码或配置文件中硬编码凭据。 在本地运行时,请将 local.settings.json 文件 用于凭据,并且不发布 local.settings.json 文件。

连接到 Azure 中 Confluent 提供的托管 Kafka 群集时,请确保在触发器或绑定中设置了 Confluent Cloud 环境的以下身份验证凭据:

设置 建议的值 Description
BrokerList BootstrapServer 命名 BootstrapServer 的应用设置包含 Confluent Cloud 设置页中找到的启动服务器的值。 值类似于 xyz-xyzxzy.chinanorth.azure.confluent.cloud:9092.
用户名 ConfluentCloudUsername 命名 ConfluentCloudUsername 的应用设置包含来自 Confluent Cloud 网站的 API 访问密钥。
密码 ConfluentCloudPassword 命名 ConfluentCloudPassword 的应用设置包含从 Confluent Cloud 网站获取的 API 机密。

用于这些设置的字符串值必须作为 Azure 中的应用程序设置或在本地开发期间 local.settings.json 文件中Values集合中提供。

还应在Protocol绑定定义中设置SslCaLocationAuthenticationMode设置。

后续步骤