使用 REST 代理与 Azure HDInsight 中的 Apache Kafka 群集交互

使用 Kafka REST 代理可以通过基于 HTTPS 的 REST API 与 Kafka 群集交互。 此操作表示你的 Kafka 客户端可位于虚拟网络之外。 客户端可以对 Kafka 群集进行简单的安全 HTTPS 调用,而不必依赖 Kafka 库。 本文演示如何创建启用了 REST 代理的 Kafka 群集。 另外还提供了一个示例代码,演示如何调用 REST 代理。

REST API 参考

有关 Kafka REST API 支持的操作,请参阅 HDInsight Kafka REST 代理 API 参考

背景

Kafka REST 代理设计

有关 API 支持的操作的完整规范,请参阅 Apache Kafka REST 代理 API

REST 代理端点

使用 REST 代理创建 HDInsight Kafka 群集会为群集创建新的公共终结点,你可以在 Azure 门户的 HDInsight 群集“属性”中找到该终结点

安全性

对 Kafka REST 代理的访问通过 Microsoft Entra 安全组进行管理。 创建 Kafka 群集时,请为 Microsoft Entra 安全组提供 REST 终结点访问权限。 需要访问 REST 代理的 Kafka 客户端应由组所有者注册到此组。 组所有者可通过门户或 PowerShell 注册。

对于 REST 代理终结点请求,客户端应用程序应获取 OAuth 令牌。 令牌用于验证安全组成员身份。 查找客户端应用程序示例,其中演示了如何获取 OAuth 令牌。 客户端应用程序会在 HTTPS 请求中将 OAuth 令牌传递给 REST 代理。

注意

若要了解有关 Microsoft Entra 安全组的详细信息,请参阅使用 Microsoft Entra 组管理应用和资源访问。 有关 OAuth 令牌工作原理的详细信息,请参阅使用 OAuth 2.0 代码授权流来授权访问 Microsoft Entra Web 应用程序

包含网络安全组的 Kafka REST 代理

如果你引入自己的 VNet 并通过网络安全组控制网络流量,则除端口 443 外,还应允许端口 9400 上的入站流量。 这确保 Kafka REST 代理服务器可访问。

先决条件

  1. 使用 Microsoft Entra ID 注册应用程序。 编写的用来与 Kafka REST 代理交互的客户端应用程序将使用此应用程序的 ID 和机密对 Azure 进行身份验证。

  2. 创建 Microsoft Entra 安全组。 将已通过 Microsoft Entra ID 注册的应用程序作为安全组的成员添加到该组中。 此安全组将用于控制哪些应用程序允许与 REST 代理交互。 若要详细了解如何创建 Microsoft Entra 组,请参阅使用 Microsoft Entra ID 创建基本组并添加成员

    验证该组的类型是否为“安全”安全组

    验证该应用程序是否为该组的成员。 检查成员身份

创建已启用 REST 代理的 Kafka 群集

这些步骤将使用 Azure 门户。 有关使用 Azure CLI 的示例,请参阅使用 Azure CLI 创建 Apache Kafka REST 代理群集

  1. 在 Kafka 群集创建工作流期间,在“安全 + 网络”选项卡中,选中“启用 Kafka REST 代理”选项 。

    屏幕截图显示了“创建 HDInsight 群集”页,其中已选择“安全性 + 网络”。

  2. 单击“选择安全组”。 从安全组列表中,选择你要允许其访问 REST 代理的安全组。 可以使用搜索框查找适当的安全组。 单击底部的“选择”按钮

    屏幕截图显示了“创建 HDInsight 群集”页,其中包含用于选择安全组的选项。

  3. 根据使用 Azure 门户在 Azure HDInsight 中创建 Apache Kafka 群集中所述,完成创建群集的剩余步骤。

  4. 创建群集后,转到群集属性并记下 Kafka REST 代理 URL。

    查看 REST 代理 URL

客户端应用程序示例

可以使用 Python 代码与 Kafka 群集上的 REST 代理交互。 若要使用代码示例,请执行以下步骤:

  1. 在装有 Python 的计算机上保存示例代码。

  2. 通过执行 pip3 install msal 安装所需的 Python 依赖项。

  3. 修改 Configure these properties 代码部分,并更新你的环境的以下属性:

    属性 说明
    租户 ID 订阅所在的 Azure 租户。
    客户端 ID 在安全组中注册的应用程序的 ID。
    客户端机密 在安全组中注册的应用程序的机密。
    Kafkarest_endpoint 从群集概述的“属性”选项卡中获取此值,如部署部分所述。 此属性应采用以下格式 – https://<clustername>-kafkarest.azurehdinsight.cn
  4. 在命令行中,通过执行 sudo python3 <filename.py> 来执行 Python 文件

此代码执行以下操作:

  1. 从 Microsoft Entra ID 提取 OAuth 令牌。
  2. 演示如何向 Kafka REST 代理发出请求。

若要详细了解如何在 Python 中获取 OAuth 令牌,请参阅 Python AuthenticationContext 类。 如果不是通过 Kafka REST 代理创建或删除的 topics 在该处有所反映,则可能会出现延迟。 此延迟是因为缓存刷新。 生成者 API 的“值”字段的功能已经增强。 现在,它接受 JSON 对象和任何序列化格式。

#Required Python packages
#pip3 install msal

import json
import msal
import random
import requests
import string
import sys
import time

def get_random_string():
    letters = string.ascii_letters
    random_string = ''.join(random.choice(letters) for i in range(7))

    return random_string

#--------------------------Configure these properties-------------------------------#
# Tenant ID for your Azure Subscription
tenant_id = 'ABCDEFGH-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Application Id
client_id = 'XYZABCDE-1234-1234-1234-ABCDEFGHIJKL'
# Your Client Credentials
client_secret = 'password'
# kafka rest proxy -endpoint
kafkarest_endpoint = "https://<clustername>-kafkarest.azurehdinsight.cn"
#--------------------------Configure these properties-------------------------------#

# Get access token
# Scope
scope = 'https://hib.azurehdinsight.cn/.default'
#Authority
authority = 'https://login.chinacloudapi.cn/' + tenant_id

app = msal.ConfidentialClientApplication(
    client_id , client_secret, authority,
    #cache - For details on how look at this example: https://github.com/Azure-Samples/ms-identity-python-webapp/blob/master/app.py
)

# The pattern to acquire a token looks like this.
result = None
result = app.acquire_token_for_client(scopes=[scope])
accessToken = result['access_token']
verify_https = True
request_timeout = 10

# Print access token
print("Access token: " + accessToken)

# API format
api_version = 'v1'
api_format = kafkarest_endpoint + '/{api_version}/{rest_api}'
get_topic_api = 'metadata/topics'
topic_api_format = 'topics/{topic_name}'
producer_api_format = 'producer/topics/{topic_name}'
consumer_api_format = 'consumer/topics/{topic_name}/partitions/{partition_id}/offsets/{offset}?count={count}'  # by default count = 1
partitions_api_format = 'metadata/topics/{topic_name}/partitions'
partition_api_format = 'metadata/topics/{topic_name}/partitions/{partition_id}'

# Request header
headers = {
    'Authorization': 'Bearer ' + accessToken,
    'Content-type': 'application/json'          # set Content-type to 'application/json'
}

# New topic
new_topic = 'hello_topic_' + get_random_string()
print("Topic " + new_topic + " is going to be used for demo.")

topics = []

# Create a  new topic
# Example of topic config
topic_config = {
    "partition_count": 1,
    "replication_factor": 1,
    "topic_properties": {
        "retention.ms": 604800000,
        "min.insync.replicas": "1"
    }
}

create_topic_url = api_format.format(api_version=api_version, rest_api=topic_api_format.format(topic_name=new_topic))
response = requests.put(create_topic_url, headers=headers, json=topic_config, timeout=request_timeout, verify=verify_https)
print(response.content)

if response.ok:
    while new_topic not in topics:
        print("The new topic " + new_topic + " is not visible yet. sleep 30 seconds...")
        time.sleep(30)
        # List Topic
        get_topic_url = api_format.format(api_version=api_version, rest_api=get_topic_api)

        response = requests.get(get_topic_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
        topic_list = response.json()
        topics = topic_list.get("topics", [])
else:
    print("Topic " + new_topic + " was created. Exit.")
    sys.exit(1)

# Produce messages to new_topic
# Example payload of Producer REST API
payload_json = {
    "records": [
        {
            "key": "key1",
            "value": "**********"         # A string                              
        },
        {
            "partition": 0,
            "value": 5                    # An integer
        },
        {
            "value": 3.14                 # A floating number
        },
        {
            "value": {                    # A JSON object
                "id": 1,
                "name": "HDInsight Kafka REST proxy"
            }
        },
        {
            "value": [                    # A list of JSON objects
                {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                {
                    "id": 2,
                    "name": "HDInsight Kafka REST proxy 2"
                },
                {
                    "id": 3,
                    "name": "HDInsight Kafka REST proxy 3"
                }
            ]
        },
        {
            "value": {                  # A nested JSON object
                "group id": 1,
                "HDI Kafka REST": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1"
                },
                "HDI Kafka REST server info": {
                    "id": 1,
                    "name": "HDInsight Kafka REST proxy 1",
                    "servers": [
                        {
                            "server id": 1,
                            "server name": "HDInsight Kafka REST proxy server 1"
                        },
                        {
                            "server id": 2,
                            "server name": "HDInsight Kafka REST proxy server 2"
                        },
                        {
                            "server id": 3,
                            "server name": "HDInsight Kafka REST proxy server 3"
                        }
                    ]
                }
            }
        }
    ]
}

print("Payloads in a Producer request: \n", payload_json)
producer_url = api_format.format(api_version=api_version, rest_api=producer_api_format.format(topic_name=new_topic))
response = requests.post(producer_url, headers=headers, json=payload_json, timeout=request_timeout, verify=verify_https)
print(response.content)

# Consume messages from the topic
partition_id = 0
offset = 0
count = 2

while True:
    consumer_url = api_format.format(api_version=api_version, rest_api=consumer_api_format.format(topic_name=new_topic, partition_id=partition_id, offset=offset, count=count))
    print("Consuming " + str(count) + " messages from offset " + str(offset))

    response = requests.get(consumer_url, headers=headers, timeout=request_timeout, verify=verify_https)

    if response.ok:
        messages = response.json()
        print("Consumed messages: \n" + json.dumps(messages, indent=2))
        next_offset = response.headers.get("NextOffset")
        if offset == next_offset or not messages.get("records", []):
            print("Consumer caught up with producer. Exit for now...")
            break

        offset = next_offset

    else:
        print("Error " + str(response.status_code))
        break

# List partitions
get_partitions_url = api_format.format(api_version=api_version, rest_api=partitions_api_format.format(topic_name=new_topic))
print("Fetching partitions from  " + get_partitions_url)

response = requests.get(get_partitions_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition_list = response.json()
print("Partition list: \n" + json.dumps(partition_list, indent=2))

# List a partition
get_partition_url = api_format.format(api_version=api_version, rest_api=partition_api_format.format(topic_name=new_topic, partition_id=partition_id))
print("Fetching metadata of a partition from  " + get_partition_url)

response = requests.get(get_partition_url, headers={'Authorization': 'Bearer ' + accessToken}, timeout=request_timeout, verify=verify_https)
partition = response.json()
print("Partition metadata: \n" + json.dumps(partition, indent=2))

下面是另外一个示例,说明如何使用 curl 命令从 Azure 获取用于 REST 代理的令牌。 请注意,我们需要在获取令牌时指定 scope=https://hib.azurehdinsight.cn/.default

curl -X POST -H "Content-Type: application/x-www-form-urlencoded" -d 'client_id=<clientid>&client_secret=<clientsecret>&grant_type=client_credentials&scope=https://hib.azurehdinsight.cn/.default' 'https://login.chinacloudapi.cn/<tenantid>/oauth2/v2.0/token'

后续步骤