通过 Azure 虚拟网络连接到 Apache Kafka on HDInsightConnect to Apache Kafka on HDInsight through an Azure Virtual Network

了解如何通过 Azure 虚拟网络直接连接到 Apache Kafka on HDInsight。Learn how to directly connect to Apache Kafka on HDInsight through an Azure Virtual Network. 本文档提供有关使用以下配置连接到 Kafka 的信息:This document provides information on connecting to Kafka using the following configurations:

  • 从本地网络中的资源。From resources in an on-premises network. 使用本地网络上的 VPN 设备(软件或硬件)建立此连接。This connection is established by using a VPN device (software or hardware) on your local network.
  • 使用 VPN 软件客户端从开发环境。From a development environment using a VPN software client.

备注

本文进行了更新,以便使用新的 Azure PowerShell Az 模块。This article has been updated to use the new Azure PowerShell Az module. 你仍然可以使用 AzureRM 模块,至少在 2020 年 12 月之前,它将继续接收 bug 修补程序。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. 若要详细了解新的 Az 模块和 AzureRM 兼容性,请参阅新 Azure Powershell Az 模块简介To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. 有关 Az 模块安装说明,请参阅安装 Azure PowerShellFor Az module installation instructions, see Install Azure PowerShell.

架构与规划Architecture and planning

HDInsight 不允许通过公共 Internet 直接连接到 Kafka。HDInsight does not allow direct connection to Kafka over the public internet. Kafka 客户端(生成者和使用者)必须使用以下连接方法之一:Instead, Kafka clients (producers and consumers) must use one of the following connection methods:

  • 在 Kafka on HDInsight 所在的同一个虚拟网络中运行客户端。Run the client in the same virtual network as Kafka on HDInsight. Apache Kafka on HDInsight 快速入门中使用了此配置。This configuration is used in the Start with Apache Kafka on HDInsight document. 客户端直接在 HDInsight 群集节点上运行,或者在同一网络中的另一个虚拟机上运行。The client runs directly on the HDInsight cluster nodes or on another virtual machine in the same network.

  • 将专用网络(例如本地网络)连接到虚拟网络。Connect a private network, such as your on-premises network, to the virtual network. 此配置允许本地网络中的客户端直接使用 Kafka。This configuration allows clients in your on-premises network to directly work with Kafka. 若要启用此配置,请执行以下任务:To enable this configuration, perform the following tasks:

    1. 创建虚拟网络。Create a virtual network.

    2. 创建使用站点到站点配置的 VPN 网关。Create a VPN gateway that uses a site-to-site configuration. 本文档中使用的配置连接到本地网络中的 VPN 网关设备。The configuration used in this document connects to a VPN gateway device in your on-premises network.

    3. 在虚拟网络中创建 DNS 服务器。Create a DNS server in the virtual network.

    4. 在每个网络中的 DNS 服务器之间配置转发。Configure forwarding between the DNS server in each network.

    5. 在虚拟网络中创建 Kafka on HDInsight 群集。Create a Kafka on HDInsight cluster in the virtual network.

      有关详细信息,请参阅从本地网络连接到 Apache Kafka 部分。For more information, see the Connect to Apache Kafka from an on-premises network section.

  • 使用 VPN 网关和 VPN 客户端将单个计算机连接到虚拟网络。Connect individual machines to the virtual network using a VPN gateway and VPN client. 若要启用此配置,请执行以下任务:To enable this configuration, perform the following tasks:

    1. 创建虚拟网络。Create a virtual network.

    2. 创建使用点到站点配置的 VPN 网关。Create a VPN gateway that uses a point-to-site configuration. 此配置可同时用于 Windows 和 MacOS 客户端。This configuration can be used with both Windows and MacOS clients.

    3. 在虚拟网络中创建 Kafka on HDInsight 群集。Create a Kafka on HDInsight cluster in the virtual network.

    4. 为 IP 播发配置 Kafka。Configure Kafka for IP advertising. 此配置允许客户端使用中转站 IP 地址而不是域名进行连接。This configuration allows the client to connect using broker IP addresses instead of domain names.

    5. 在开发系统上下载并使用 VPN 客户端。Download and use the VPN client on the development system.

      有关详细信息,请参阅使用 VPN 客户端连接到 Apache Kafka 部分。For more information, see the Connect to Apache Kafka with a VPN client section.

      警告

      由于存在以下限制,只建议将此配置用于开发目的:This configuration is only recommended for development purposes because of the following limitations:

      • 每个客户端必须使用 VPN 软件客户端建立连接。Each client must connect using a VPN software client.
      • VPN 客户端不会向虚拟网络传递名称解析请求,因此,必须使用 IP 寻址来与 Kafka 通信。The VPN client does not pass name resolution requests to the virtual network, so you must use IP addressing to communicate with Kafka. IP 通信需要在 Kafka 群集上完成其他配置。IP communication requires additional configuration on the Kafka cluster.

有关在虚拟网络中使用 HDInsight 的详细信息,请参阅为 Azure HDInsight 群集规划虚拟网络For more information on using HDInsight in a virtual network, see Plan a virtual network for Azure HDInsight clusters.

从本地网络连接到 Apache KafkaConnect to Apache Kafka from an on-premises network

若要创建可与本地网络通信的 Kafka 群集,请遵循将 HDInsight 连接到本地网络文档中所述的步骤。To create a Kafka cluster that communicates with your on-premises network, follow the steps in the Connect HDInsight to your on-premises network document.

重要

创建 HDInsight 群集时,请选择“Kafka”群集类型。 When creating the HDInsight cluster, select the Kafka cluster type.

这些步骤创建以下配置:These steps create the following configuration:

  • Azure 虚拟网络Azure Virtual Network
  • 站点到站点 VPN 网关Site-to-site VPN gateway
  • Azure 存储帐户(由 HDInsight 使用)Azure Storage account (used by HDInsight)
  • Kafka on HDInsightKafka on HDInsight

若要验证 Kafka 客户端是否可从本地连接到群集,请使用示例:Python 客户端部分中的步骤。To verify that a Kafka client can connect to the cluster from on-premises, use the steps in the Example: Python client section.

使用 VPN 客户端连接到 Apache KafkaConnect to Apache Kafka with a VPN client

使用本部分中的步骤创建以下配置:Use the steps in this section to create the following configuration:

  • Azure 虚拟网络Azure Virtual Network
  • 点到站点 VPN 网关Point-to-site VPN gateway
  • Azure 存储帐户(由 HDInsight 使用)Azure Storage Account (used by HDInsight)
  • Kafka on HDInsightKafka on HDInsight
  1. 遵循为点到站点连接使用自签名证书文档中所述的步骤。Follow the steps in the Working with self-signed certificates for Point-to-site connections document. 本文档创建网关所需的证书。This document creates the certificates needed for the gateway.

  2. 打开 PowerShell 提示符,然后使用下列代码登录 Azure 订阅:Open a PowerShell prompt and use the following code to sign in to your Azure subscription:

    Connect-AzAccount -EnvironmentName AzureChinaCloud
    # If you have multiple subscriptions, uncomment to set the subscription
    #Select-AzSubscription -SubscriptionName "name of your subscription"
    
  3. 使用下列代码创建包含配置信息的变量:Use the following code to create variables that contain configuration information:

    # Prompt for generic information
    $resourceGroupName = Read-Host "What is the resource group name?"
    $baseName = Read-Host "What is the base name? It is used to create names for resources, such as 'net-basename' and 'kafka-basename':"
    $location = Read-Host "What Azure Region do you want to create the resources in?"
    $rootCert = Read-Host "What is the file path to the root certificate? It is used to secure the VPN gateway."
    
    # Prompt for HDInsight credentials
    $adminCreds = Get-Credential -Message "Enter the HTTPS user name and password for the HDInsight cluster" -UserName "admin"
    $sshCreds = Get-Credential -Message "Enter the SSH user name and password for the HDInsight cluster" -UserName "sshuser"
    
    # Names for Azure resources
    $networkName = "net-$baseName"
    $clusterName = "kafka-$baseName"
    $storageName = "store$baseName" # Can't use dashes in storage names
    $defaultContainerName = $clusterName
    $defaultSubnetName = "default"
    $gatewaySubnetName = "GatewaySubnet"
    $gatewayPublicIpName = "GatewayIp"
    $gatewayIpConfigName = "GatewayConfig"
    $vpnRootCertName = "rootcert"
    $vpnName = "VPNGateway"
    
    # Network settings
    $networkAddressPrefix = "10.0.0.0/16"
    $defaultSubnetPrefix = "10.0.0.0/24"
    $gatewaySubnetPrefix = "10.0.1.0/24"
    $vpnClientAddressPool = "172.16.201.0/24"
    
    # HDInsight settings
    $HdiWorkerNodes = 4
    $hdiVersion = "3.6"
    $hdiType = "Kafka"
    
  4. 使用下列代码创建 Azure 资源组和虚拟网络:Use the following code to create the Azure resource group and virtual network:

    # Create the resource group that contains everything
    New-AzResourceGroup -Name $resourceGroupName -Location $location
    
    # Create the subnet configuration
    $defaultSubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -AddressPrefix $defaultSubnetPrefix
    $gatewaySubnetConfig = New-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -AddressPrefix $gatewaySubnetPrefix
    
    # Create the subnet
    New-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AddressPrefix $networkAddressPrefix `
        -Subnet $defaultSubnetConfig, $gatewaySubnetConfig
    
    # Get the network & subnet that were created
    $network = Get-AzVirtualNetwork -Name $networkName `
        -ResourceGroupName $resourceGroupName
    $gatewaySubnet = Get-AzVirtualNetworkSubnetConfig -Name $gatewaySubnetName `
        -VirtualNetwork $network
    $defaultSubnet = Get-AzVirtualNetworkSubnetConfig -Name $defaultSubnetName `
        -VirtualNetwork $network
    
    # Set a dynamic public IP address for the gateway subnet
    $gatewayPublicIp = New-AzPublicIpAddress -Name $gatewayPublicIpName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -AllocationMethod Dynamic
    $gatewayIpConfig = New-AzVirtualNetworkGatewayIpConfig -Name $gatewayIpConfigName `
        -Subnet $gatewaySubnet `
        -PublicIpAddress $gatewayPublicIp
    
    # Get the certificate info
    # Get the full path in case a relative path was passed
    $rootCertFile = Get-ChildItem $rootCert
    $cert = New-Object System.Security.Cryptography.X509Certificates.X509Certificate2($rootCertFile)
    $certBase64 = [System.Convert]::ToBase64String($cert.RawData)
    $p2sRootCert = New-AzVpnClientRootCertificate -Name $vpnRootCertName `
        -PublicCertData $certBase64
    
    # Create the VPN gateway
    New-AzVirtualNetworkGateway -Name $vpnName `
        -ResourceGroupName $resourceGroupName `
        -Location $location `
        -IpConfigurations $gatewayIpConfig `
        -GatewayType Vpn `
        -VpnType RouteBased `
        -EnableBgp $false `
        -GatewaySku Standard `
        -VpnClientAddressPool $vpnClientAddressPool `
        -VpnClientRootCertificates $p2sRootCert
    

    警告

    这个过程可能需要几分钟才能完成。It can take several minutes for this process to complete.

  5. 使用下列代码创建 Azure 存储帐户和 BLob 容器:Use the following code to create the Azure Storage Account and blob container:

    # Create the storage account
    New-AzStorageAccount `
        -ResourceGroupName $resourceGroupName `
        -Name $storageName `
        -SkuName Standard_GRS `
        -Location $location `
        -Kind StorageV2 `
        -EnableHttpsTrafficOnly 1
    
    # Get the storage account keys and create a context
    $defaultStorageKey = (Get-AzStorageAccountKey -ResourceGroupName $resourceGroupName `
        -Name $storageName)[0].Value
    $storageContext = New-AzStorageContext -StorageAccountName $storageName `
        -StorageAccountKey $defaultStorageKey
    
    # Create the default storage container
    New-AzStorageContainer -Name $defaultContainerName `
        -Context $storageContext
    
  6. 使用下列代码创建 HDInsight 群集:Use the following code to create the HDInsight cluster:

    # Create the HDInsight cluster
    New-AzHDInsightCluster `
        -ResourceGroupName $resourceGroupName `
        -ClusterName $clusterName `
        -Location $location `
        -ClusterSizeInNodes $hdiWorkerNodes `
        -ClusterType $hdiType `
        -OSType Linux `
        -Version $hdiVersion `
        -HttpCredential $adminCreds `
        -SshCredential $sshCreds `
        -DefaultStorageAccountName "$storageName.blob.core.chinacloudapi.cn" `
        -DefaultStorageAccountKey $defaultStorageKey `
        -DefaultStorageContainer $defaultContainerName `
        -DisksPerWorkerNode 2 `
        -VirtualNetworkId $network.Id `
        -SubnetName $defaultSubnet.Id
    

    警告

    此过程完成时间大约为 15 分钟。This process takes around 15 minutes to complete.

为 IP 播发配置 KafkaConfigure Kafka for IP advertising

默认情况下,Apache Zookeeper 向客户端返回 Kafka 中转站的域名。By default, Apache Zookeeper returns the domain name of the Kafka brokers to clients. 此配置不使用 VPN 软件客户端,因为它无法对虚拟网络中的实体使用名称解析。This configuration does not work with the VPN software client, as it cannot use name resolution for entities in the virtual network. 对于此配置,请使用以下步骤来配置 Kafka,以播发 IP 地址而不是域名:For this configuration, use the following steps to configure Kafka to advertise IP addresses instead of domain names:

  1. 使用 Web 浏览器时,请访问 https://CLUSTERNAME.azurehdinsight.cnUsing a web browser, go to https://CLUSTERNAME.azurehdinsight.cn. CLUSTERNAME 替换为 Kafka on HDInsight 群集的名称。Replace CLUSTERNAME with the name of the Kafka on HDInsight cluster.

    出现提示时,使用群集的 HTTPS 用户名称密码。When prompted, use the HTTPS user name and password for the cluster. 将显示群集的 Ambari Web UI。The Ambari Web UI for the cluster is displayed.

  2. 要查看 Kafka 的相关信息,请从左侧列表中选择“Kafka”。 To view information on Kafka, select Kafka from the list on the left.

    服务列表,其中突出显示了 Kafka

  3. 要查看 Kafka 配置,请在顶端的中间位置选择“配置”。 To view Kafka configuration, select Configs from the top middle.

    Kafka 的配置链接

  4. 要查找“kafka-env” 配置,请在右上方的“筛选器”字段中输入 kafka-envTo find the kafka-env configuration, enter kafka-env in the Filter field on the upper right.

    针对 kafka-env 的 Kafka 配置

  5. 要配置 Kafka 来播发 IP 地址,请将下列文本添加到“kafka-env-template” 字段的底部:To configure Kafka to advertise IP addresses, add the following text to the bottom of the kafka-env-template field:

    # Configure Kafka to advertise IP addresses instead of FQDN
    IP_ADDRESS=$(hostname -i)
    echo advertised.listeners=$IP_ADDRESS
    sed -i.bak -e '/advertised/{/advertised@/!d;}' /usr/hdp/current/kafka-broker/conf/server.properties
    echo "advertised.listeners=PLAINTEXT://$IP_ADDRESS:9092" >> /usr/hdp/current/kafka-broker/conf/server.properties
    
  6. 要配置 Kafka 侦听的接口,请在右上方的“筛选器”字段中输入 listenersTo configure the interface that Kafka listens on, enter listeners in the Filter field on the upper right.

  7. 要将 Kafka 配置为侦听所有网络接口,请将“侦听器”字段的值更改为 PLAINTEXT://0.0.0.0:9092To configure Kafka to listen on all network interfaces, change the value in the listeners field to PLAINTEXT://0.0.0.0:9092.

  8. 单击“保存”按钮以保存配置。 To save the configuration changes, use the Save button. 输入描述更改的文本消息。Enter a text message describing the changes. 保存更改后,选择“确定”。 Select OK once the changes have been saved.

    保存配置按钮

  9. 要防止在重启 Kafka 时出错,请使用“服务操作”按钮,并选择“打开维护模式”。 To prevent errors when restarting Kafka, use the Service Actions button and select Turn On Maintenance Mode. 选择“确定”完成操作。Select OK to complete this operation.

    服务操作,其中已突出显示了“打开维护”

  10. 要重启 Kafka,请使用“重启”按钮,然后选择“重启所有受影响的项”。 To restart Kafka, use the Restart button and select Restart All Affected. 确认重启,在操作完成后再使用“确定”按钮。 Confirm the restart, and then use the OK button after the operation has completed.

    重启按钮,其中突出显示了所有受影响的重启项

  11. 要禁用维护模式,请使用“服务操作”按钮,然后选择“关闭维护模式”。 To disable maintenance mode, use the Service Actions button and select Turn Off Maintenance Mode. 选择“确定”完成操作。 Select OK to complete this operation.

连接到 VPN 网关Connect to the VPN gateway

若要连接到 VPN 网关,请按配置点到站点连接文档中__连接到 Azure__ 部分进行操作。To connect to the VPN gateway, use the Connect to Azure section of the Configure a Point-to-Site connection document.

示例:Python 客户端Example: Python client

若要验证与 Kafka 的连接,请使用以下步骤来创建并运行 Python 生成者和使用者:To validate connectivity to Kafka, use the following steps to create and run a Python producer and consumer:

  1. 使用以下方法之一检索 Kafka 群集中节点的完全限定的域名 (FQDN) 和 IP 地址:Use one of the following methods to retrieve the fully qualified domain name (FQDN) and IP addresses of the nodes in the Kafka cluster:

    $resourceGroupName = "The resource group that contains the virtual network used with HDInsight"
    
    $clusterNICs = Get-AzNetworkInterface -ResourceGroupName $resourceGroupName | where-object {$_.Name -like "*node*"}
    
    $nodes = @()
    foreach($nic in $clusterNICs) {
        $node = new-object System.Object
        $node | add-member -MemberType NoteProperty -name "Type" -value $nic.Name.Split('-')[1]
        $node | add-member -MemberType NoteProperty -name "InternalIP" -value $nic.IpConfigurations.PrivateIpAddress
        $node | add-member -MemberType NoteProperty -name "InternalFQDN" -value $nic.DnsSettings.InternalFqdn
        $nodes += $node
    }
    $nodes | sort-object Type
    
    az network nic list --resource-group <resourcegroupname> --output table --query "[?contains(name,'node')].{NICname:name,InternalIP:ipConfigurations[0].privateIpAddress,InternalFQDN:dnsSettings.internalFqdn}"
    

    此脚本假设 $resourceGroupName 是包含虚拟网络的 Azure 资源组的名称。This script assumes that $resourceGroupName is the name of the Azure resource group that contains the virtual network.

    保存返回的信息供后续步骤使用。Save the returned information for use in the next steps.

  2. 使用下列命令安装 kafka-python 客户端:Use the following to install the kafka-python client:

    pip install kafka-python
    
  3. 要将数据发送到 Kafka,请使用下列 Python 代码:To send data to Kafka, use the following Python code:

    from kafka import KafkaProducer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # NOTE: you don't need the full list of worker nodes, just one or two.
    producer = KafkaProducer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'])
    for _ in range(50):
        producer.send('testtopic', b'test message')
    

    'kafka_broker' 条目替换为本部分中步骤 1 返回的地址:Replace the 'kafka_broker' entries with the addresses returned from step 1 in this section:

    • 如果使用__软件 VPN 客户端__,请将 kafka_broker 条目替换为工作节点的 IP 地址。If you are using a Software VPN client, replace the kafka_broker entries with the IP address of your worker nodes.

    • 如果__已启用通过自定义 DNS 服务器进行名称解析__,请将 kafka_broker 条目替换为工作节点的 FQDN。If you have enabled name resolution through a custom DNS server, replace the kafka_broker entries with the FQDN of the worker nodes.

      备注

      此代码将字符串 test message 发送给主题 testtopicThis code sends the string test message to the topic testtopic. Kafka on HDInsight 的默认配置是创建主题(如果它尚不存在)。The default configuration of Kafka on HDInsight is to create the topic if it does not exist.

  4. 要从 Kafka 检索消息,请使用下列 Python 代码:To retrieve the messages from Kafka, use the following Python code:

    from kafka import KafkaConsumer
    # Replace the `ip_address` entries with the IP address of your worker nodes
    # Again, you only need one or two, not the full list.
    # Note: auto_offset_reset='earliest' resets the starting offset to the beginning
    #       of the topic
    consumer = KafkaConsumer(bootstrap_servers=['kafka_broker_1','kafka_broker_2'],auto_offset_reset='earliest')
    consumer.subscribe(['testtopic'])
    for msg in consumer:
      print (msg)
    

    'kafka_broker' 条目替换为本部分中步骤 1 返回的地址:Replace the 'kafka_broker' entries with the addresses returned from step 1 in this section:

    • 如果使用__软件 VPN 客户端__,请将 kafka_broker 条目替换为工作节点的 IP 地址。If you are using a Software VPN client, replace the kafka_broker entries with the IP address of your worker nodes.

    • 如果__已启用通过自定义 DNS 服务器进行名称解析__,请将 kafka_broker 条目替换为工作节点的 FQDN。If you have enabled name resolution through a custom DNS server, replace the kafka_broker entries with the FQDN of the worker nodes.

后续步骤Next steps

有关在虚拟网络中使用 HDInsight 的详细信息,请参阅为 Azure HDInsight 群集规划虚拟网络部署文档。For more information on using HDInsight with a virtual network, see the Plan a virtual network deployment for Azure HDInsight clusters document.

有关使用点到站点 VPN 网关创建 Azure 虚拟网络的详细信息,请参阅下列文档:For more information on creating an Azure Virtual Network with Point-to-Site VPN gateway, see the following documents:

有关使用 Apache Kafka on HDInsight 的详细信息,请参阅以下文档:For more information on working with Apache Kafka on HDInsight, see the following documents: