将 Apache Kafka on HDInsight 与 Azure IoT 中心配合使用Use Apache Kafka on HDInsight with Azure IoT Hub

了解如何使用 Apache Kafka Connect Azure IoT 中心连接器在 Apache Kafka on HDInsight 与 Azure IoT 中心之间移动数据。Learn how to use the Apache Kafka Connect Azure IoT Hub connector to move data between Apache Kafka on HDInsight and Azure IoT Hub. 本文档介绍如何通过群集中的边缘节点运行 IoT 中心连接器。In this document, you learn how to run the IoT Hub connector from an edge node in the cluster.

使用 Kafka Connect API 可以实施所需的连接器,用于将数据连续提取到 Kafka,或者将数据从 Kafka 推送到另一个系统。The Kafka Connect API allows you to implement connectors that continuously pull data into Kafka, or push data from Kafka to another system. Apache Kafka Connect Azure IoT 中心是可将数据从 Azure IoT 中心提取到 Kafka 的连接器。The Apache Kafka Connect Azure IoT Hub is a connector that pulls data from Azure IoT Hub into Kafka. 该连接器还能将数据从 Kafka 推送到 IoT 中心。It can also push data from Kafka to the IoT Hub.

从 IoT 中心提取数据时,可以使用__源__ 连接器。When pulling from the IoT Hub, you use a source connector. 将数据推送到 IoT 中心时,可以使用__接收器__连接器。When pushing to IoT Hub, you use a sink connector. IoT 中心连接器同时提供源连接器和接收器连接器。The IoT Hub connector provides both the source and sink connectors.

下图显示了在使用连接器时,Azure IoT 中心与 Kafka on HDInsight 之间的数据流。The following diagram shows the data flow between Azure IoT Hub and Kafka on HDInsight when using the connector.

显示通过连接器将数据从 IoT 中心传送到 Kafka 的图像

有关 Connect API 的详细信息,请参阅 https://kafka.apache.org/documentation/#connectFor more information on the Connect API, see https://kafka.apache.org/documentation/#connect.

必备条件Prerequisites

生成连接器Build the connector

  1. 将连接器的源从 https://github.com/Azure/toketi-kafka-connect-iothub/ 下载到本地环境。Download the source for the connector from https://github.com/Azure/toketi-kafka-connect-iothub/ to your local environment.

  2. 在命令提示符中导航到 toketi-kafka-connect-iothub-master 目录。From a command prompt, navigate to the toketi-kafka-connect-iothub-master directory. 然后使用以下命令生成并打包项目:Then use the following command to build and package the project:

    sbt assembly
    

    该生成可能需要几分钟时间才能完成。The build will take a few minutes to complete. 使用此命令在项目的 toketi-kafka-connect-iothub-master\target\scala-2.11 目录中创建名为 kafka-connect-iothub-assembly_2.11-0.7.0.jar 的文件。The command creates a file named kafka-connect-iothub-assembly_2.11-0.7.0.jar in the toketi-kafka-connect-iothub-master\target\scala-2.11 directory for the project.

安装连接器Install the connector

  1. 将 .jar 文件上传到 Kafka on HDInsight 群集的边缘节点。Upload the .jar file to the edge node of your Kafka on HDInsight cluster. 编辑以下命令,将 CLUSTERNAME 替换为自己的群集名称。Edit the command below by replacing CLUSTERNAME with the actual name of your cluster. 下面使用 SSH 用户帐户的默认值和边缘节点的名称,可根据需要进行修改。The default values for the SSH user account and name of edge node are used below, modify as needed.

    scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.cn:
    
  2. 文件复制完成后,使用 SSH 连接到边缘节点:Once the file copy completes, connect to the edge node using SSH:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.cn
    
  3. 若要在 Kafka libs 目录中安装连接器,请使用以下命令:To install the connector into the Kafka libs directory, use the following command:

    sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
    

使 SSH 连接保持活动状态,以执行剩余步骤。Keep your SSH connection active for the remaining steps.

配置 Apache KafkaConfigure Apache Kafka

与边缘节点建立 SSH 连接后,使用以下步骤将 Kafka 配置为在独立模式下运行连接器:From your SSH connection to the edge node, use the following steps to configure Kafka to run the connector in standalone mode:

  1. 设置密码变量。Set up password variable. 将 PASSWORD 替换为群集登录密码,然后输入以下命令:Replace PASSWORD with the cluster login password, then enter the command:

    export password='PASSWORD'
    
  2. 安装 jq 实用工具。Install the jq utility. 使用 jq 可以更轻松地处理 Ambari 查询返回的 JSON 文档。jq makes it easier to process JSON documents returned from Ambari queries. 输入以下命令:Enter the following command:

    sudo apt -y install jq
    
  3. 获取 Kafka 代理的地址。Get the address of the Kafka brokers. 群集中可能有许多的代理,但只需引用其中的一到两个。There may be many brokers in your cluster, but you only need to reference one or two. 若要获取两个代理主机的地址,请使用以下命令:To get the address of two broker hosts, use the following command:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    echo $KAFKABROKERS
    

    复制这些值供稍后使用。Copy the values for later use. 返回的值类似于下文:The value returned is similar to the following text:

    wn0-kafka.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.chinacloudapp.cn:9092,wn1-kafka.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.chinacloudapp.cn:9092

  4. 获取 Apache Zookeeper 节点的地址。Get the address of the Apache Zookeeper nodes. 群集中有多个 Zookeeper 节点,但只需引用其中的一到两个。There are several Zookeeper nodes in the cluster, but you only need to reference one or two. 使用以下变量存储变量 KAFKAZKHOSTS 中的地址:Use the following command to the store the addresses in the variable KAFKAZKHOSTS:

    export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
  5. 在独立模式下运行连接器时,将使用 /usr/hdp/current/kafka-broker/config/connect-standalone.properties 文件来与 Kafka 代理通信。When running the connector in standalone mode, the /usr/hdp/current/kafka-broker/config/connect-standalone.properties file is used to communicate with the Kafka brokers. 若要编辑 connect-standalone.properties 文件,请使用以下命令:To edit the connect-standalone.properties file, use the following command:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
    
  6. 进行以下编辑:Make the following edits:

    当前值Current value 新值New value 注释Comment
    bootstrap.servers=localhost:9092 localhost:9092 值替换为在上一步骤中获取的代理主机Replace the localhost:9092 value with the broker hosts from the previous step 将边缘节点的独立配置配置为查找 Kafka 代理。Configures the standalone configuration for the edge node to find the Kafka brokers.
    key.converter=org.apache.kafka.connect.json.JsonConverter key.converter=org.apache.kafka.connect.storage.StringConverter 做出此项更改后,可以使用 Kafka 随附的控制台生成方执行测试。This change allows you to test using the console producer included with Kafka. 对于其他生产方和使用方,可能需要不同的转换器。You may need different converters for other producers and consumers. 有关使用其他转换器值的信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mdFor information on using other converter values, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
    value.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.storage.StringConverter 同上。Same as above.
    空值N/A consumer.max.poll.records=10 添加到文件末尾。Add to end of file. 此项更改会将接收器连接器限制为每次处理 10 条记录,防止该连接器发生超时。This change is to prevent timeouts in the sink connector by limiting it to 10 records at a time. 有关详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mdFor more information, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.
  7. 要保存文件,请使用 Ctrl + XY,并按 EnterTo save the file, use Ctrl + X, Y, and then Enter.

  8. 若要创建连接器使用的主题,请使用以下命令:To create the topics used by the connector, use the following commands:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS
    
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
    

    若要验证 iotiniotout 主题是否存在,请使用以下命令:To verify that the iotin and iotout topics exist, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
    

    iotin 主题用于从 IoT 中心接收消息。The iotin topic is used to receive messages from IoT Hub. iotout 主题用于向 IoT 中心发送消息。The iotout topic is used to send messages to IoT Hub.

获取 IoT 中心连接信息Get IoT Hub connection information

若要检索连接器使用的 IoT 中心信息,请使用以下步骤:To retrieve IoT hub information used by the connector, use the following steps:

  1. 获取事件中心兼容的终结点,以及 IoT 中心的事件中心兼容终结点名称。Get the Event Hub-compatible endpoint and Event Hub-compatible endpoint name for your IoT hub. 若要获取此信息,请使用以下方法之一:To get this information, use one of the following methods:

    • Azure 门户 使用以下步骤:From the Azure portal, use the following steps:

      1. 导航到 IoT 中心并选择“终结点”。 Navigate to your IoT Hub and select Endpoints.

      2. 在“内置终结点”中,选择“事件”。 From Built-in endpoints, select Events.

      3. 在“属性”中,复制以下字段的值: From Properties, copy the value of the following fields:

        • 与事件中心兼容的名称Event Hub-compatible name
        • 与事件中心兼容的终结点Event Hub-compatible endpoint
        • 分区Partitions

        重要

        门户中的终结点值可能包含本示例不需要的额外文本。The endpoint value from the portal may contain extra text that is not needed in this example. 提取与模式 sb://<randomnamespace>.servicebus.chinacloudapi.cn/ 匹配的文本。Extract the text that matches this pattern sb://<randomnamespace>.servicebus.chinacloudapi.cn/.

    • Azure CLI 中使用以下命令:From the Azure CLI, use the following command:

      az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
      

      myhubname 替换为 IoT 中心的名称。Replace myhubname with the name of your IoT hub. 响应类似于以下文本:The response is similar to the following text:

      "EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.chinacloudapi.cn/",
      "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e",
      "Partitions": 2
      
  2. 获取 共享访问策略密钥Get the shared access policy and key. 本示例使用__服务__密钥。For this example, use the service key. 若要获取此信息,请使用以下方法之一:To get this information, use one of the following methods:

    • __在 Azure 门户中__使用以下步骤:From the Azure portal, use the following steps:

      1. 依次选择“共享访问策略”、“服务”。 Select Shared access policies, and then select service.
      2. 复制“主密钥”值。 Copy the Primary key value.
      3. 复制“连接字符串 - 主键” 值。Copy the Connection string--primary key value.
    • Azure CLI 中使用以下命令:From the Azure CLI, use the following command:

      1. 若要获取主密钥值,请使用以下命令:To get the primary key value, use the following command:

        az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
        

        myhubname 替换为 IoT 中心的名称。Replace myhubname with the name of your IoT hub. 响应是发送到此中心的 service 策略的主密钥。The response is the primary key to the service policy for this hub.

      2. 若要获取 service 策略的连接字符串,请使用以下命令:To get the connection string for the service policy, use the following command:

        az iot hub show-connection-string --name myhubname --policy-name service --query "connectionString"
        

        myhubname 替换为 IoT 中心的名称。Replace myhubname with the name of your IoT hub. 响应是 service 策略的连接字符串。The response is the connection string for the service policy.

配置源连接Configure the source connection

若要将源配置为使用 IoT 中心,请在与边缘节点建立 SSH 连接后执行以下操作:To configure the source to work with your IoT Hub, perform the following actions from an SSH connection to the edge node:

  1. /usr/hdp/current/kafka-broker/config/ 目录中创建 connect-iot-source.properties 文件的副本。Create a copy of the connect-iot-source.properties file in the /usr/hdp/current/kafka-broker/config/ directory. 若要从 toketi-kafka-connect-iothub 项目下载文件,请使用以下命令:To download the file from the toketi-kafka-connect-iothub project, use the following command:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
    
  2. 若要编辑 connect-iot-source.properties 文件并添加 IoT 中心信息,请使用以下命令:To edit the connect-iot-source.properties file and add the IoT hub information, use the following command:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
    

    在编辑器中,找到并更改以下条目:In the editor, find and change the following entries:

    当前值Current value 编辑Edit
    Kafka.Topic=PLACEHOLDER PLACEHOLDER 替换为 iotinReplace PLACEHOLDER with iotin. 从 IoT 中心收到的消息将放入 iotin 主题中。Messages received from IoT hub are placed in the iotin topic.
    IotHub.EventHubCompatibleName=PLACEHOLDER PLACEHOLDER 替换为与事件中心兼容的名称。Replace PLACEHOLDER with the Event Hub-compatible name.
    IotHub.EventHubCompatibleEndpoint=PLACEHOLDER PLACEHOLDER 替换为与事件中心兼容的终结点。Replace PLACEHOLDER with the Event Hub-compatible endpoint.
    IotHub.AccessKeyName=PLACEHOLDER PLACEHOLDER 替换为 serviceReplace PLACEHOLDER with service.
    IotHub.AccessKeyValue=PLACEHOLDER PLACEHOLDER 替换为 service 策略的主密钥。Replace PLACEHOLDER with the primary key of the service policy.
    IotHub.Partitions=PLACEHOLDER PLACEHOLDER 替换为在上一步骤中获取的分区数。Replace PLACEHOLDER with the number of partitions from the previous steps.
    IotHub.StartTime=PLACEHOLDER PLACEHOLDER 替换为 UTC 日期。Replace PLACEHOLDER with a UTC date. 此日期是连接器开始检查消息的时间。This date is when the connector starts checking for messages. 日期格式为 yyyy-mm-ddThh:mm:ssZThe date format is yyyy-mm-ddThh:mm:ssZ.
    BatchSize=100 100 替换为 5Replace 100 with 5. 做出此项更改后,如果 IoT 中心出现五条新消息,则连接器会将消息读入 Kafka。This change causes the connector to read messages into Kafka once there are five new messages in IoT hub.

    有关示例配置,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.mdFor an example configuration, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

  3. 若要保存更改,请依次按 Ctrl + XYEnterTo save changes, use Ctrl + X, Y, and then Enter.

有关配置连接器源的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.mdFor more information on configuring the connector source, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md.

配置接收器连接Configure the sink connection

若要将接收器连接配置为使用 IoT 中心,请在与边缘节点建立 SSH 连接后执行以下操作:To configure the sink connection to work with your IoT Hub, perform the following actions from an SSH connection to the edge node:

  1. /usr/hdp/current/kafka-broker/config/ 目录中创建 connect-iothub-sink.properties 文件的副本。Create a copy of the connect-iothub-sink.properties file in the /usr/hdp/current/kafka-broker/config/ directory. 若要从 toketi-kafka-connect-iothub 项目下载文件,请使用以下命令:To download the file from the toketi-kafka-connect-iothub project, use the following command:

    sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
    
  2. 若要编辑 connect-iothub-sink.properties 文件并添加 IoT 中心信息,请使用以下命令:To edit the connect-iothub-sink.properties file and add the IoT hub information, use the following command:

    sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
    

    在编辑器中,找到并更改以下条目:In the editor, find and change the following entries:

    当前值Current value 编辑Edit
    topics=PLACEHOLDER PLACEHOLDER 替换为 iotoutReplace PLACEHOLDER with iotout. 写入 iotout 主题的消息将转发到 IoT 中心。Messages written to iotout topic are forwarded to the IoT hub.
    IotHub.ConnectionString=PLACEHOLDER PLACEHOLDER 替换为 service 策略的连接字符串。Replace PLACEHOLDER with the connection string for the service policy.

    有关示例配置,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mdFor an example configuration, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

  3. 若要保存更改,请依次按 Ctrl + XYEnterTo save changes, use Ctrl + X, Y, and then Enter.

有关配置连接器接收器的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mdFor more information on configuring the connector sink, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

启动源连接器Start the source connector

若要启动源连接器,请在与边缘节点建立 SSH 连接后使用以下命令:To start the source connector, use the following command from an SSH connection to the edge node:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties

启动连接器后,将消息从设备发送到 IoT 中心。Once the connector starts, send messages to IoT hub from your device(s). 当连接器从 IoT 中心读取消息并将其存储在 Kafka 主题时,会将信息记录到控制台:As the connector reads messages from the IoT hub and stores them in the Kafka topic, it logs information to the console:

[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.co
nnect.IotHubSourceTask:39)
[2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (
org.apache.kafka.connect.runtime.WorkerSourceTask:356)

备注

连接器启动时,可能会出现几条警告。You may see several warnings as the connector starts. 这些警告不会影响从 IoT 中心接收消息。These warnings do not cause problems with receiving messages from IoT hub.

  1. 请在几分钟后按两次 Ctrl + C 停止连接器 。Stop the connector after a few minutes using Ctrl + C twice. 停止连接器需要几分钟时间。It will take a few minutes for the connector to stop.

启动接收器连接器Start the sink connector

与边缘节点建立 SSH 连接后,使用以下命令在独立模式下启动接收器连接器:From an SSH connection to the edge node, use the following command to start the sink connector in standalone mode:

/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties

连接器运行时,会显示类似于以下文本的信息:As the connector runs, information similar to the following text is displayed:

[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connec
t.runtime.WorkerSinkTask:262)

备注

连接器启动时,可能会出现几条警告。You may notice several warnings as the connector starts. 可以放心地忽略这些警告。You can safely ignore these.

发送消息Send messages

若要通过连接器发送消息,请使用以下步骤:To send messages through the connector, use the following steps:

  1. 打开第二个 SSH 会话,连接到 Kafka 群集 :Open a second SSH session to the Kafka cluster:

    ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.cn
    
  2. 获取新 SSH 会话的 Kafka 代理的地址。Get the address of the Kafka brokers for the new ssh session. 将 PASSWORD 替换为群集登录密码,然后输入以下命令:Replace PASSWORD with the cluster login password, then enter the command:

    export password='PASSWORD'
    
    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    
    export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  3. 若要将消息发送到 iotout 主题,请使用以下命令:To send messages to the iotout topic, use the following command:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
    

    此命令不会将你返回到正常的 Bash 提示符,This command does not return you to the normal Bash prompt. 而是将键盘输入发送到 iotout 主题。Instead, it sends keyboard input to the iotout topic.

  4. 若要将消息发送到设备,请将一个 JSON 文档粘贴到 kafka-console-producer 的 SSH 会话中。To send a message to your device, paste a JSON document into the SSH session for the kafka-console-producer.

    重要

    必须将 "deviceId" 条目的值设置为设备 ID。You must set the value of the "deviceId" entry to the ID of your device. 在以下示例中,设备名为 myDeviceIdIn the following example, the device is named myDeviceId:

    {"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
    

    https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md 中更详细地介绍了此 JSON 文档的架构。The schema for this JSON document is described in more detail at https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

    如果使用模拟的 Raspberry Pi 设备,并且该设备正在运行,则设备会记录以下消息:If you are using the simulated Raspberry Pi device, and it is running, the following message is logged by the device:

    Receive message: Turn On
    

    重新发送 JSON 文档,但这次请更改 "message" 条目的值。Resend the JSON document, but change the value of the "message" entry. 设备会记录新值。The new value is logged by the device.

有关使用接收器连接器的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.mdFor more information on using the sink connector, see https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md.

后续步骤Next steps

本文档已介绍如何使用 Apache Kafka Connect API 在 HDInsight 上启动 IoT Kafka 连接器。In this document, you learned how to use the Apache Kafka Connect API to start the IoT Kafka Connector on HDInsight. 请使用以下链接探索 Kafka 的其他用法:Use the following links to discover other ways to work with Kafka: