将数据从 Apache Kafka 引入到 Azure 数据资源管理器中Ingest data from Apache Kafka into Azure Data Explorer

Azure 数据资源管理器支持从 Apache Kafka 进行数据引入Azure Data Explorer supports data ingestion from Apache Kafka. Apache Kafka 是一个分布式流式处理平台,可用于构建实时流式处理数据管道,在系统或应用程序之间可靠地移动数据。Apache Kafka is a distributed streaming platform for building real-time streaming data pipelines that reliably move data between systems or applications. Kafka Connect 是一个工具,用于在 Apache Kafka 和其他数据系统之间以可缩放且可靠的方式流式传输数据。Kafka Connect is a tool for scalable and reliable streaming of data between Apache Kafka and other data systems. Azure 数据资源管理器 Kafka 接收器充当来自 Kafka 的连接器,并且不需要使用代码。The Azure Data Explorer Kafka Sink serves as the connector from Kafka and doesn't require using code. 从此 Git 存储库Confluent 连接器中心下载接收器连接器 jar。Download the sink connector jar from this Git repo or Confluent Connector Hub.

本文介绍了如何通过 Kafka 将数据引入到 Azure 数据资源管理器,使用自包含 Docker 安装程序简化 Kafka 群集和 Kafka 连接器群集设置。This article shows how to ingest data with Kafka into Azure Data Explorer, using a self-contained Docker setup to simplify the Kafka cluster and Kafka connector cluster setup.

有关详细信息,请参阅连接器 Git 存储库版本具体信息For more information, see the connector Git repo and version specifics.

先决条件Prerequisites

创建 Azure Active Directory 服务主体Create an Azure Active Directory service principal

Azure Active Directory 服务主体可以通过 Azure 门户或通过编程方式进行创建,如以下示例所示。The Azure Active Directory service principal can be created through the Azure portal or programatically, as in the following example.

此服务主体将是连接器用于写入到 Azure 数据资源管理器表的标识。This service principal will be the identity leveraged by the connector to write to the Azure Data Explorer table. 稍后我们将授予此服务主体访问 Azure 数据资源管理器所需的权限。We'll later grant permissions for this service principal to access Azure Data Explorer.

  1. 通过 Azure CLI 登录到你的 Azure 订阅。Log in to your Azure subscription via Azure CLI. 然后在浏览器中进行身份验证。Then authenticate in the browser.

    az cloud set -n AzureChinaCloud
    az login
    
  2. 选择要用于运行实验室的订阅。Choose the subscription you want use to run the lab. 当你有多个订阅时,此步骤是必需的。This step is needed when you have multiple subscriptions.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. 创建服务主体。Create the service principal. 在此示例中,服务主体名为 kusto-kafka-spnIn this example, the service principal is called kusto-kafka-spn.

    az ad sp create-for-rbac -n "kusto-kafka-spn"
    
  4. 你将得到一个 JSON 响应,如下所示。You'll get a JSON response as shown below. 复制 appIdpasswordtenant,因为你在后面的步骤中将需要它们。Copy the appId, password, and tenant, as you'll need them in later steps.

    {
      "appId": "fe7280c7-5705-4789-b17f-71a472340429",
      "displayName": "kusto-kafka-spn",
      "name": "http://kusto-kafka-spn",
      "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
      "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
    }
    

在 Azure 数据资源管理器中创建目标表Create a target table in Azure Data Explorer

  1. 登录到 Azure 门户Sign in to the Azure portal

  2. 转到你的 Azure 数据资源管理器群集。Go to your Azure Data Explorer cluster.

  3. 使用以下命令创建一个名为 Storms 的表:Create a table called Storms using the following command:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    

    在 Azure 数据资源管理器门户中创建表

  4. 使用以下命令为引入的数据创建对应的表映射 Storms_CSV_MappingCreate the corresponding table mapping Storms_CSV_Mapping for ingested data using the following command:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  5. 针对可配置的引入延迟,在表上创建批量引入策略。Create a batch ingestion policy on the table for configurable ingestion latency.

    提示

    引入批处理策略是一个性能优化器,包含三个参数。The ingestion batching policy is a performance optimizer and includes three parameters. 满足第一个参数将触发到 Azure 数据资源管理器表的引入。The first parameter met triggers ingestion into the Azure Data Explorer table.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  6. 使用创建 Azure Active Directory 服务主体中的服务主体来授予使用数据库的权限。Use the service principal from Create an Azure Active Directory service principal to grant permission to work with the database.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

运行实验室Run the lab

以下实验室旨在为你提供开始创建数据、设置 Kafka 连接器以及使用连接器将此数据流式传输到 Azure 数据资源管理器的体验。The following lab is designed to give you the experience of starting to create data, setting up the Kafka connector, and streaming this data to Azure Data Explorer with the connector. 然后,你可以查看引入的数据。You can then look at the ingested data.

克隆 git 存储库Clone the git repo

克隆实验室的 git 存储库Clone the lab's git repo.

  1. 在计算机上创建一个本地目录。Create a local directory on your machine.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. 克隆存储库。Clone the repo.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

克隆的存储库的内容Contents of the cloned repo

运行以下命令以列出克隆的存储库的内容:Run the following command to list the contents of the cloned repo:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

此搜索的该结果是:This result of this search is:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

查看克隆的存储库中的文件Review the files in the cloned repo

以下各部分介绍了上述文件树中的文件的重要部分。The following sections explain the important parts of the files in the file tree above.

adx-sink-config.jsonadx-sink-config.json

此文件包含 Kusto 接收器属性文件,你将在其中更新特定配置详细信息:This file contains the Kusto sink properties file where you'll update specific configuration details:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.url": "https://ingest-<name of cluster>.<region>.kusto.chinacloudapi.cn",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

根据你的 Azure 数据资源管理器设置替换以下属性的值:aad.auth.authorityaad.auth.appidaad.auth.appkeykusto.tables.topics.mapping(数据库名称)和 kusto.urlReplace the values for the following attributes as per your Azure Data Explorer setup: aad.auth.authority, aad.auth.appid, aad.auth.appkey, kusto.tables.topics.mapping (the database name), and kusto.url.

连接器 - DockerfileConnector - Dockerfile

此文件包含用于为连接器实例生成 docker 映像的命令。This file has the commands to generate the docker image for the connector instance. 它包括 git 存储库版本目录中的连接器下载。It includes the connector download from the git repo release directory.

Storm-events-producer 目录Storm-events-producer directory

此目录包含一个用于读取本地“StormEvents.csv”文件并将数据发布到 Kafka 主题的 Go 程序。This directory has a Go program that reads a local "StormEvents.csv" file and publishes the data to a Kafka topic.

docker-compose.yamldocker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

启动容器Start the containers

  1. 在终端启动容器:In a terminal, start the containers:

    docker-compose up
    

    生成者应用程序将开始向 storm-events 主题发送事件。The producer application will start sending events to the storm-events topic. 你应当会看到类似于以下日志的日志:You should see logs similar to the following logs:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  | 
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. 若要检查日志,请在单独的终端中运行以下命令:To check the logs, run the following command in a separate terminal:

    docker-compose logs -f | grep kusto-connect
    

启动连接器Start the connector

使用 Kafka Connect REST 调用来启动连接器。Use a Kafka Connect REST call to start the connector.

  1. 在单独的终端中,使用以下命令启动接收器任务:In a separate terminal, launch the sink task with the following command:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. 若要检查状态,请在单独的终端中运行以下命令:To check the status, run the following command in a separate terminal:

    curl http://localhost:8083/connectors/storm/status
    

连接器将开始对到 Azure 数据资源管理器的引入进程排队。The connector will start queueing ingestion processes to Azure Data Explorer.

备注

如果有日志连接器问题,请创建问题If you have log connector issues, create an issue.

查询和查看数据Query and review data

确认数据引入Confirm data ingestion

  1. 等待数据到达 Storms 表。Wait for data to arrive in the Storms table. 若要确认数据的传输,请检查行计数:To confirm the transfer of data, check the row count:

    Storms | count
    
  2. 确认引入进程中没有失败:Confirm that there are no failures in the ingestion process:

    .show ingestion failures
    

    看到数据后,请尝试一些查询。Once you see data, try out a few queries.

查询数据Query the data

  1. 若要查看所有记录,请运行以下查询To see all the records, run the following query:

    Storms
    
  2. 使用 whereproject 来筛选特定数据:Use where and project to filter specific data:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. 使用 summarize 运算符:Use the summarize operator:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Azure 数据资源管理器中的 Kafka 查询柱形图结果

如需更多的查询示例和指南,请参阅针对 Azure 数据资源管理器编写查询Kusto 查询语言文档For more query examples and guidance, see Write queries for Azure Data Explorer and Kusto query language documentation.

重置Reset

若要进行重置,请执行以下步骤:To reset, do the following steps:

  1. 停止容器 (docker-compose down -v)Stop the containers (docker-compose down -v)
  2. 删除 (drop table Storms)Delete (drop table Storms)
  3. 重新创建 StormsRe-create the Storms table
  4. 重新创建表映射Recreate table mapping
  5. 重启容器 (docker-compose up)Restart containers (docker-compose up)

清理资源Clean up resources

若要删除 Azure 数据资源管理器资源,请使用 az cluster deleteaz Kusto database deleteTo delete the Azure Data Explorer resources, use az cluster delete or az Kusto database delete:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

后续步骤Next Steps