使用 Kusto Java SDK 引入数据

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。 Java 客户端库可用于引入数据、发出管理命令,以及在 Azure 数据资源管理器群集中查询数据。

本文介绍如何使用 Azure 数据资源管理器 Java 库引入数据。 首先,你将在测试群集中创建一个表和数据映射。 然后你将使用 Java SDK 从 blob 存储中将“引入”排入群集的队列中并验证结果。

先决条件

查看代码

本部分是可选的。 查看以下代码片段,了解代码的工作原理。 若要跳过此部分,请转到运行应用程序

身份验证

该程序将 Microsoft Entra 身份验证凭据与 ConnectionStringBuilder`结合使用。

  1. 创建用于查询和管理的 com.microsoft.azure.kusto.data.Client

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. 创建并使用 com.microsoft.azure.kusto.ingest.IngestClient 将“数据引入”排入 Azure 数据资源管理器中的队列:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

管理命令

通过在 com.microsoft.azure.kusto.data.Client 对象上调用 execute 来执行管理命令,如 .drop.create

例如,按如下所示创建 StormEvents 表:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

数据引入

使用现有 Azure Blob 存储容器中的文件将“引入”排入队列。

  • 使用 BlobSourceInfo 来指定 Blob 存储路径。
  • 使用 IngestionProperties 定义表、数据库、映射名称和数据类型。 在以下示例中,数据类型是 CSV
    ...
    static final String blobPathFormat = "https://%s.blob.core.chinacloudapi.cn/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

引入过程在单独的线程中开始,main 线程等待引入线程完成。 此过程使用 CountdownLatch。 引入 API (IngestClient#ingestFromBlob) 不是异步的。 while 循环每 5 秒轮询一次当前状态,并等待引入状态从 Pending 更改为不同状态。 最终状态可以是 SucceededFailedPartiallySucceeded

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

提示

还有其他为不同应用程序异步处理引入的方法。 例如,可以使用 CompletableFuture 来创建定义引入后操作的管道(如查询表),或处理向 IngestionStatus 报告的异常。

运行此应用程序

常规

运行示例代码时,将执行以下操作:

  1. 删除表:删除 StormEvents 表(如果存在)。
  2. 创建表:创建 StormEvents 表。
  3. 创建映射:创建 StormEvents_CSV_Mapping 映射。
  4. 引入文件:一个 CSV 文件(在 Azure Blob 存储中)将排队等待引入。

以下为示例代码来自 App.java

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

提示

若要尝试不同的操作组合,请在 App.java 中取消注释/注释相应的方法。

运行此应用程序

  1. 从 GitHub 克隆示例代码:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. 使用以下信息作为程序使用的环境变量来设置服务主体信息:

    • 群集终结点
    • 数据库名称
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.chinacloudapi.cn"
    export KUSTO_DB="name of the database"
    
  3. 生成并运行:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    输出结果会类似于:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

等待几分钟完成引入过程。 成功完成后,你将看到以下日志消息:Ingestion completed successfully。 此时,你可以退出程序,并转到下一步,这不会影响已排队的引入进程。

验证

等待 5 到 10 分钟,以便已排队的引入调度引入进程并将数据加载到 Azure 数据资源管理器中。

  1. 登录到 https://dataexplorer.azure.cn 并连接到群集。

  2. 运行以下命令,以获取 StormEvents 表中的记录的计数:

    StormEvents | count
    

故障排除

  1. 若要查看过去四个小时内是否存在任何失败引入,请在数据库中运行以下命令:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. 若要查看过去四个小时内所有引入操作的状态,请运行以下命令:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

清理资源

如果不打算使用已创建的资源,请在数据库中运行以下命令,删除 StormEvents 表。

.drop table StormEvents