在本快速入门中,你将使用 Bicep 在 Azure HDInsight 中创建 Apache Kafka 群集。 Kafka 是开源分布式流式处理平台。 通常用作消息代理,因为它可提供类似于发布-订阅消息队列的功能。
Bicep 是一种特定于域的语言 (DSL),使用声明性语法来部署 Azure 资源。 它提供简明的语法、可靠的类型安全性以及对代码重用的支持。 Bicep 会针对你的 Azure 基础结构即代码解决方案提供最佳创作体验。
仅可通过相同虚拟网络内的资源访问 Kafka API。 本快速入门使用 SSH 直接访问群集。 若要将其他服务、网络或虚拟机连接到 Kafka,则必须首先创建虚拟机,然后才能在网络中创建资源。 有关详细信息,请参阅使用虚拟网络连接到 Apache Kafka 文档。
如果没有 Azure 订阅,请在开始前创建试用版订阅。
本快速入门中使用的 Bicep 文件来自 Azure 快速入门模板。
@description('The name of the Kafka cluster to create. This must be a unique name.')
param clusterName string
@description('These credentials can be used to submit jobs to the cluster and to log into cluster dashboards.')
param clusterLoginUserName string
@description('The password must be at least 10 characters in length and must contain at least one digit, one upper case letter, one lower case letter, and one non-alphanumeric character except (single-quote, double-quote, backslash, right-bracket, full-stop). Also, the password must not contain 3 consecutive characters from the cluster username or SSH username.')
@minLength(10)
@secure()
param clusterLoginPassword string
@description('These credentials can be used to remotely access the cluster.')
param sshUserName string
@description('SSH password must be 6-72 characters long and must contain at least one digit, one upper case letter, and one lower case letter. It must not contain any 3 consecutive characters from the cluster login name')
@minLength(6)
@maxLength(72)
@secure()
param sshPassword string
@description('Location for all resources.')
param location string = resourceGroup().location
@description('This is the headnode Azure Virtual Machine size, and will affect the cost. If you don\'t know, just leave the default value.')
@allowed([
'Standard_A4_v2'
'Standard_A8_v2'
'Standard_E2_v3'
'Standard_E4_v3'
'Standard_E8_v3'
'Standard_E16_v3'
'Standard_E20_v3'
'Standard_E32_v3'
'Standard_E48_v3'
])
param HeadNodeVirtualMachineSize string = 'Standard_E4_v3'
@description('This is the worerdnode Azure Virtual Machine size, and will affect the cost. If you don\'t know, just leave the default value.')
@allowed([
'Standard_A4_v2'
'Standard_A8_v2'
'Standard_E2_v3'
'Standard_E4_v3'
'Standard_E8_v3'
'Standard_E16_v3'
'Standard_E20_v3'
'Standard_E32_v3'
'Standard_E48_v3'
])
param WorkerNodeVirtualMachineSize string = 'Standard_E4_v3'
@description('This is the Zookeepernode Azure Virtual Machine size, and will affect the cost. If you don\'t know, just leave the default value.')
@allowed([
'Standard_A4_v2'
'Standard_A8_v2'
'Standard_E2_v3'
'Standard_E4_v3'
'Standard_E8_v3'
'Standard_E16_v3'
'Standard_E20_v3'
'Standard_E32_v3'
'Standard_E48_v3'
])
param ZookeeperNodeVirtualMachineSize string = 'Standard_E4_v3'
var defaultStorageAccount = {
name: uniqueString(resourceGroup().id)
type: 'Standard_LRS'
}
resource storageAccount 'Microsoft.Storage/storageAccounts@2023-01-01' = {
name: defaultStorageAccount.name
location: location
sku: {
name: defaultStorageAccount.type
}
kind: 'StorageV2'
properties: {
minimumTlsVersion: 'TLS1_2'
supportsHttpsTrafficOnly: true
allowBlobPublicAccess: false
}
}
resource cluster 'Microsoft.HDInsight/clusters@2023-08-15-preview' = {
name: clusterName
location: location
properties: {
clusterVersion: '4.0'
osType: 'Linux'
clusterDefinition: {
kind: 'kafka'
configurations: {
gateway: {
'restAuthCredential.isEnabled': true
'restAuthCredential.username': clusterLoginUserName
'restAuthCredential.password': clusterLoginPassword
}
}
}
storageProfile: {
storageaccounts: [
{
name: replace(replace(concat(reference(storageAccount.id, '2021-08-01').primaryEndpoints.blob), 'https:', ''), '/', '')
isDefault: true
container: clusterName
key: listKeys(storageAccount.id, '2021-08-01').keys[0].value
}
]
}
computeProfile: {
roles: [
{
name: 'headnode'
targetInstanceCount: 2
hardwareProfile: {
vmSize: HeadNodeVirtualMachineSize
}
osProfile: {
linuxOperatingSystemProfile: {
username: sshUserName
password: sshPassword
}
}
}
{
name: 'workernode'
targetInstanceCount: 4
hardwareProfile: {
vmSize: WorkerNodeVirtualMachineSize
}
dataDisksGroups: [
{
disksPerNode: 2
}
]
osProfile: {
linuxOperatingSystemProfile: {
username: sshUserName
password: sshPassword
}
}
}
{
name: 'zookeepernode'
targetInstanceCount: 3
hardwareProfile: {
vmSize: ZookeeperNodeVirtualMachineSize
}
osProfile: {
linuxOperatingSystemProfile: {
username: sshUserName
password: sshPassword
}
}
}
]
}
}
}
output name string = cluster.name
output resourceId string = cluster.id
output cluster object = cluster.properties
output resourceGroupName string = resourceGroup().name
output location string = location
该 Bicep 文件中定义了两个 Azure 资源:
- Microsoft.Storage/storageAccounts:创建 Azure 存储帐户。
- Microsoft.HDInsight/cluster:创建 HDInsight 群集。
将该 Bicep 文件另存为本地计算机上的 main.bicep。
使用 Azure CLI 或 Azure PowerShell 来部署该 Bicep 文件。
az group create --name exampleRG --location chinaeast az deployment group create --resource-group exampleRG --template-file main.bicep --parameters clusterName=<cluster-name> clusterLoginUserName=<cluster-username> sshUserName=<ssh-username>
需要提供参数的值:
- 将 <cluster-name> 替换为要创建的 HDInsight 群集的名称。 群集名称需要以字母开头,并且只能包含小写字母、数字和破折号。
- 将 <cluster-username> 替换为用于向群集提交作业以及登录到群集仪表板的凭据。 群集用户名中不允许使用大写字母。
- 将 <ssh-username> 替换为用于远程访问群集的凭据。
系统将提示你输入以下信息:
- clusterLoginPassword,长度必须至少为 10 个字符,必须至少包含一个数字、一个大写字母、一个小写字母和一个非字母数字字符,但不能包含单引号、双引号、反斜杠、右括号和句号。 此外,不能包含群集用户名或 SSH 用户名中的三个连续字符。
- sshPassword,长度必须为 6-72 个字符,必须至少包含一个数字、一个大写字母和一个小写字母。 不能包含群集登录名中的任意三个连续字符。
注意
部署完成后,应会看到一条指出部署成功的消息。
使用 Azure 门户、Azure CLI 或 Azure PowerShell 列出资源组中已部署的资源。
az resource list --resource-group exampleRG
使用 Kafka 时,必须了解 Apache Zookeeper 和代理主机 。 Kafka API 以及 Kafka 随附的许多实用工具都使用这些主机。
在本部分中,可以从群集上的 Ambari REST API 获取主机信息。
使用 ssh 命令连接到群集。 编辑以下命令,将 CLUSTERNAME 替换为群集的名称,然后输入该命令:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
进行 SSH 连接时,使用以下命令安装
jq
实用程序。 此实用工具用于分析 JSON 文档且有助于检索主机的信息:sudo apt -y install jq
若要将环境变量设置为群集名称,请使用以下命令:
read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME
出现提示时,请输入 Kafka 群集的名称。
若要使用 Zookeeper 主机信息来设置环境变量,请使用以下命令。 此命令检索所有 Zookeeper 主机,然后仅返回前两个条目。 这是由于某个主机无法访问时,需要一些冗余。
export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.cn/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
出现提示时,请输入群集登录帐户(不是 SSH 帐户)的密码。
若要验证是否已正确设置了环境变量,请使用以下命令:
echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
此命令返回类似于以下文本的信息:
<zookeepername1>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.chinacloudapp.cn:2181,<zookeepername2>.eahjefxxp1netdbyklgqj5y1ud.ex.internal.chinacloudapp.cn:2181
若要使用 Kafka 代理主机信息来设置环境变量,请使用以下命令:
export KAFKABROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.cn/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
出现提示时,请输入群集登录帐户(不是 SSH 帐户)的密码。
若要验证是否已正确设置了环境变量,请使用以下命令:
echo '$KAFKABROKERS='$KAFKABROKERS
此命令返回类似于以下文本的信息:
<brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.chinacloudapp.cn:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.chinacloudapp.cn:9092
Kafka 在主题中存储数据流。 可以使用 kafka-topics.sh
实用工具来管理主题。
若要创建主题,请在 SSH 连接中使用以下命令:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
此命令使用存储在
$KAFKAZKHOSTS
中的主机信息连接到 Zookeeper, 然后创建名为 test 的 Kafka 主题。本主题中存储的数据已分区到八个分区。
每个分区在群集中的三个辅助角色节点上进行复制。
如果在 Azure 区域中已创建提供三个容错域的群集,则复制因子使用 3。 否则,复制因子使用 4.
在具有三个容错域的区域中,复制因子为 3 可让副本分布在容错域中。 在具有两个容错域的区域中,复制因子为 4 可将副本均匀分布在域中。
有关区域中容错域数的信息,请参阅 Linux 虚拟机的可用性文档。
Kafka 不识别 Azure 容错域。 在创建主题的分区副本时,它可能未针对高可用性正确分发副本。
若要确保高可用性,请使用 Apache Kafka 分区重新均衡工具。 必须通过 SSH 连接运行此工具,以便连接到 Kafka 群集的头节点。
为确保 Kafka 数据的最高可用性,应在出现以下情况时为主题重新均衡分区副本:
创建新主题或分区
纵向扩展群集
若要列出主题,请使用以下命令:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS
此命令列出 Kafka 群集上可用的主题。
若要删除主题,使用以下命令:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic topicname --zookeeper $KAFKAZKHOSTS
此命令删除名为
topicname
的主题。警告
如果删除了之前创建的
test
主题,则必须重新创建。 稍后会在本文档中使用此主题。
有关适用于 kafka-topics.sh
实用工具的命令的详细信息,请使用以下命令:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh
Kafka 将记录存储在主题中。 记录由生成者生成,由使用者使用。 生产者与使用者通过 Kafka 代理服务通信。 HDInsight 群集中的每个工作节点都是 Kafka 代理主机。
若要将记录存储到之前创建的测试主题,并通过使用者对其进行读取,请使用以下步骤:
若要为该主题写入记录,请从 SSH 连接使用
kafka-console-producer.sh
实用工具:/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test
此命令之后是一个空行。
在空行中键入文本消息,然后点击 Enter。 以这种方式输入一些消息,然后使用 Ctrl + C 返回到正常的提示符处。 每行均作为单独的记录发送到 Kafka 主题。
若要读取该主题的记录,请从 SSH 连接使用
kafka-console-consumer.sh
实用工具:/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning
此命令从主题中检索并显示记录。 使用
--from-beginning
告知使用者从流的开头开始,以检索所有记录。如果使用的是较旧版本的 Kafka,请将
--bootstrap-server $KAFKABROKERS
替换为--zookeeper $KAFKAZKHOSTS
。使用 Ctrl + C 阻止使用者。
还可以以编程方式创建生产者和使用者。 有关如何使用此 API 的示例,请参阅将 Apache Kafka 生产者和使用者 API 与 HDInsight 配合使用文档。
如果不再需要资源组及其资源,请使用 Azure 门户、Azure CLI 或 Azure PowerShell 将其删除。
az group delete --name exampleRG
在本快速入门中,你已了解了如何使用 Bicep 在 HDInsight 中创建 Apache Kafka 群集。 下一篇文章介绍如何创建一个使用 Apache Kafka Streams API 的应用程序并在 Kafka on HDInsight 上运行该应用程序。