了解如何使用 Apache Kafka Connect Azure IoT 中心 连接器在 Apache Kafka on HDInsight 和 Azure IoT 中心之间移动数据。 本文档介绍如何通过群集中的边缘节点运行 IoT 中心连接器。
使用 Kafka Connect API,你可以实施所需的连接器,用于将数据连续提取到 Kafka,或者将数据从 Kafka 推送到另一个系统。 Apache Kafka Connect Azure IoT 中心是一个连接器,用于将数据从 Azure IoT 中心拉取到 Kafka。 该连接器还能将数据从 Kafka 推送到 IoT 中心。
从 IoT 中心中拉取数据时,请使用 源 连接器。 将数据推送到 IoT 中心时,可以使用接收器连接器。 IoT 中心连接器同时提供源连接器和接收器连接器。
下图显示了在使用连接器时,Azure IoT 中心与 Kafka on HDInsight 之间的数据流。
有关如何使用 Connect API 的详细信息,请参阅 https://kafka.apache.org/documentation/#connect。
HDInsight 上的 Apache Kafka 群集。 有关详细信息,请参阅 Kafka on HDInsight 快速入门 文档。
Kafka 群集中的边缘节点。 有关详细信息,请参阅 将边缘节点与 HDInsight 文档配合使用 。
SSH 客户端。 有关详细信息,请参阅使用 SSH 连接到 HDInsight (Apache Hadoop)。
Azure IoT 中心和设备。 在本文中,请考虑使用 将 Raspberry Pi 联机模拟器连接到 Azure IoT 中心。
将连接器的源从 https://github.com/Azure/toketi-kafka-connect-iothub/ 下载到本地环境。
在命令提示符中导航到
toketi-kafka-connect-iothub-master
目录。 然后使用以下命令生成并打包项目:sbt assembly
生成过程需要花费几分钟时间才能完成。 使用此命令在项目的
kafka-connect-iothub-assembly_2.11-0.7.0.jar
目录中创建名为toketi-kafka-connect-iothub-master\target\scala-2.11
的文件。
将 .jar 文件上传到 Kafka on HDInsight 群集的边缘节点。 编辑以下命令,将
CLUSTERNAME
替换为群集的实际名称。 SSH 用户帐户和 边缘节点 名称的默认值用于根据需要进行修改。scp kafka-connect-iothub-assembly*.jar sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.cn:
文件复制完成后,使用 SSH 连接到边缘节点:
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.cn
若要在 Kafka
libs
目录中安装连接器,请使用以下命令:sudo mv kafka-connect-iothub-assembly*.jar /usr/hdp/current/kafka-broker/libs/
使 SSH 连接保持活动状态,以执行剩余步骤。
与边缘节点建立 SSH 连接后,使用以下步骤将 Kafka 配置为在独立模式下运行连接器:
设置密码变量。 将 PASSWORD 替换为群集登录密码,然后输入以下命令:
export password='PASSWORD'
安装 jq 实用工具。 使用 jq 可以更轻松地处理 Ambari 查询返回的 JSON 文档。 输入以下命令:
sudo apt -y install jq
获取 Kafka 代理的地址。 群集中可能有许多的代理,但只需引用其中的一到两个。 若要获取两个代理主机的地址,请使用以下命令:
export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2` echo $KAFKABROKERS
复制这些值供稍后使用。 返回的值类似于下文:
<brokername1>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.chinacloudapp.cn:9092,<brokername2>.w5ijyohcxt5uvdhhuaz5ra4u5f.ex.internal.chinacloudapp.cn:9092
获取 Apache Zookeeper 节点的地址。 群集中有多个 Zookeeper 节点,但只需引用其中的一到两个。 使用以下变量存储变量
KAFKAZKHOSTS
中的地址:export KAFKAZKHOSTS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
在独立模式下运行连接器时,将使用
/usr/hdp/current/kafka-broker/config/connect-standalone.properties
文件来与 Kafka 代理通信。 若要编辑connect-standalone.properties
文件,请使用以下命令:sudo nano /usr/hdp/current/kafka-broker/config/connect-standalone.properties
进行以下编辑:
当前值 新值 注释 bootstrap.servers=localhost:9092
将 localhost:9092
值替换为在上一步骤中获取的代理主机将边缘节点的独立配置配置为查找 Kafka 代理。 key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
做出此项更改后,可以使用 Kafka 随附的控制台生成方执行测试。 对于其他生产方和使用方,可能需要不同的转换器。 有关使用其他转换器值的信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。 value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
与给定内容相同。 不可用 consumer.max.poll.records=10
添加到文件末尾。 此项更改会将接收器连接器限制为每次处理 10 条记录,防止该连接器发生超时。 有关详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。 若要保存文件,请使用 Ctrl + X、 Y,然后按 Enter。
若要创建连接器使用的主题,请使用以下命令:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotin --zookeeper $KAFKAZKHOSTS /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic iotout --zookeeper $KAFKAZKHOSTS
若要验证
iotin
和iotout
主题是否存在,请使用以下命令:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
iotin
主题用于从 IoT 中心接收消息。iotout
主题用于向 IoT 中心发送消息。
若要检索连接器使用的 IoT 中心信息,请使用以下步骤:
获取事件中心兼容的终结点,以及 IoT 中心的事件中心兼容终结点名称。 若要获取此信息,请使用以下方法之一:
在 Azure 门户中,使用以下步骤:
导航到 IoT 中心并选择“终结点”。
在“内置终结点”中,选择“事件”。
在“属性”中,复制以下字段的值:
- 与事件中心兼容的名称
- 与事件中心兼容的终结点
- 分区
重要
门户中的终结点值可能包含本示例不需要的额外文本。 提取与模式
sb://<randomnamespace>.servicebus.chinacloudapi.cn/
匹配的文本。
在 Azure CLI 中,使用以下命令:
az iot hub show --name myhubname --query "{EventHubCompatibleName:properties.eventHubEndpoints.events.path,EventHubCompatibleEndpoint:properties.eventHubEndpoints.events.endpoint,Partitions:properties.eventHubEndpoints.events.partitionCount}"
将
myhubname
替换为 IoT 中心的名称。 响应类似于以下文本:"EventHubCompatibleEndpoint": "sb://ihsuprodbnres006dednamespace.servicebus.chinacloudapi.cn/", "EventHubCompatibleName": "iothub-ehub-myhub08-207673-d44b2a856e", "Partitions": 2
获取 共享访问策略 和 密钥。 对于此示例,请使用 服务 密钥。 若要获取此信息,请使用以下方法之一:
在 Azure 门户中,使用以下步骤:
- 依次选择“共享访问策略”、“服务”。
- 复制“主密钥”值。
- 复制“连接字符串 - 主键” 值。
在 Azure CLI 中,使用以下命令:
若要获取主密钥值,请使用以下命令:
az iot hub policy show --hub-name myhubname --name service --query "primaryKey"
将
myhubname
替换为 IoT 中心的名称。 响应是发送到此中心的service
策略的主密钥。若要获取
service
策略的连接字符串,请使用以下命令:az iot hub connection-string show --name myhubname --policy-name service --query "connectionString"
将
myhubname
替换为 IoT 中心的名称。 响应是service
策略的连接字符串。
若要将源配置为使用 IoT 中心,请在与边缘节点建立 SSH 连接后执行以下操作:
在
connect-iot-source.properties
目录中创建/usr/hdp/current/kafka-broker/config/
文件的副本。 若要从 toketi-kafka-connect-iothub 项目下载文件,请使用以下命令:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-source.properties
若要编辑
connect-iot-source.properties
文件并添加 IoT 中心信息,请使用以下命令:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
在编辑器中,找到并更改以下条目:
当前值 编辑 Kafka.Topic=PLACEHOLDER
将 PLACEHOLDER
替换为iotin
。 从 IoT 中心收到的消息将放入iotin
主题中。IotHub.EventHubCompatibleName=PLACEHOLDER
将 PLACEHOLDER
替换为与事件中心兼容的名称。IotHub.EventHubCompatibleEndpoint=PLACEHOLDER
将 PLACEHOLDER
替换为与事件中心兼容的终结点。IotHub.AccessKeyName=PLACEHOLDER
将 PLACEHOLDER
替换为service
。IotHub.AccessKeyValue=PLACEHOLDER
将 PLACEHOLDER
替换为service
策略的主密钥。IotHub.Partitions=PLACEHOLDER
将 PLACEHOLDER
替换为在上一步骤中获取的分区数。IotHub.StartTime=PLACEHOLDER
将 PLACEHOLDER
替换为 UTC 日期。 此日期是连接器开始检查消息的时间。 日期格式为yyyy-mm-ddThh:mm:ssZ
。BatchSize=100
将 100
替换为5
。 做出此项更改后,如果 IoT 中心出现五条新消息,则连接器会将消息读入 Kafka。有关示例配置,请参阅 适用于 Azure IoT 中心的 Kafka Connect 源连接器。
若要保存更改,请使用 Ctrl + X、 Y,然后按 Enter。
有关配置连接器源的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Source.md。
若要将接收器连接配置为使用 IoT 中心,请在与边缘节点建立 SSH 连接后执行以下操作:
在
connect-iothub-sink.properties
目录中创建/usr/hdp/current/kafka-broker/config/
文件的副本。 若要从 toketi-kafka-connect-iothub 项目下载文件,请使用以下命令:sudo wget -P /usr/hdp/current/kafka-broker/config/ https://raw.githubusercontent.com/Azure/toketi-kafka-connect-iothub/master/connect-iothub-sink.properties
若要编辑
connect-iothub-sink.properties
文件并添加 IoT 中心信息,请使用以下命令:sudo nano /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
在编辑器中,找到并更改以下条目:
当前值 编辑 topics=PLACEHOLDER
将 PLACEHOLDER
替换为iotout
。 写入iotout
主题的消息将转发到 IoT 中心。IotHub.ConnectionString=PLACEHOLDER
将 PLACEHOLDER
替换为service
策略的连接字符串。有关示例配置,请参阅 用于 Azure IoT 中心的 Kafka Connect 下游连接器。
若要保存更改,请使用 Ctrl + X、 Y,然后按 Enter。
有关配置连接器接收器的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。
若要启动源连接器,请在与边缘节点建立 SSH 连接后使用以下命令:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-source.properties
启动连接器后,将消息从设备发送到 IoT 中心。 当连接器从 IoT 中心读取消息并将其存储在 Kafka 主题时,会将信息记录到控制台:
[2017-08-29 20:15:46,112] INFO Polling for data - Obtained 5 SourceRecords from IotHub (com.microsoft.azure.iot.kafka.connect.IotHubSourceTask:39) [2017-08-29 20:15:54,106] INFO Finished WorkerSourceTask{id=AzureIotHubConnector-0} commitOffsets successfully in 4 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:356)
备注
连接器启动时,可能会出现几条警告。 这些警告不会影响从 IoT 中心接收消息。
请在几分钟后按两次 Ctrl + C 停止连接器 。 停止连接器需要几分钟时间。
与边缘节点建立 SSH 连接后,使用以下命令在独立模式下启动接收器连接器:
/usr/hdp/current/kafka-broker/bin/connect-standalone.sh /usr/hdp/current/kafka-broker/config/connect-standalone.properties /usr/hdp/current/kafka-broker/config/connect-iothub-sink.properties
连接器运行时,会显示类似于以下文本的信息:
[2017-08-30 17:49:16,150] INFO Started tasks to send 1 messages to devices. (com.microsoft.azure.iot.kafka.connect.sink.
IotHubSinkTask:47)
[2017-08-30 17:49:16,150] INFO WorkerSinkTask{id=AzureIotHubSinkConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
备注
连接器启动时,可能会出现几条警告。 可以放心地忽略这些警告。
若要通过连接器发送消息,请使用以下步骤:
打开第二个 SSH 会话,连接到 Kafka 群集 :
ssh sshuser@new-edgenode.CLUSTERNAME-ssh.azurehdinsight.cn
获取新 SSH 会话的 Kafka 代理的地址。 将 PASSWORD 替换为群集登录密码,然后输入以下命令:
export password='PASSWORD' export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name') export KAFKABROKERS=`curl -sS -u admin:$password -G http://headnodehost:8080/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
若要将消息发送到
iotout
主题,请使用以下命令:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic iotout
此命令不会将你返回到正常的 Bash 提示符, 而是将键盘输入发送到
iotout
主题。若要将消息发送到设备,请将一个 JSON 文档粘贴到
kafka-console-producer
的 SSH 会话中。重要
必须将
"deviceId"
条目的值设置为设备 ID。 在以下示例中,设备名为myDeviceId
:{"messageId":"msg1","message":"Turn On","deviceId":"myDeviceId"}
https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md 中更详细地介绍了此 JSON 文档的架构。
如果使用模拟的 Raspberry Pi 设备,并且它正在运行,则设备会记录以下消息。
Receive message: Turn On
Resend the JSON document, but change the value of the `"message"` entry. The new value is logged by the device.
有关使用接收器连接器的详细信息,请参阅 https://github.com/Azure/toketi-kafka-connect-iothub/blob/master/README_Sink.md。
本文档已介绍如何使用 Apache Kafka Connect API 在 HDInsight 上启动 IoT Kafka 连接器。 使用以下链接来发现与 Kafka 配合使用的其他方式: