使用 Azure 数据资源管理器 Java SDK 引入数据Ingest data using the Azure Data Explorer Java SDK

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. Java 客户端库可用于引入数据、发出控制命令和在 Azure 数据资源管理器群集中查询数据。The Java client library can be used to ingest data, issue control commands, and query data in Azure Data Explorer clusters.

本文介绍如何使用 Azure 数据资源管理器 Java 库引入数据。In this article, learn how to ingest data using the Azure Data Explorer Java library. 首先,你将在测试群集中创建一个表和数据映射。First, you'll create a table and a data mapping in a test cluster. 然后你将使用 Java SDK 从 blob 存储中将“引入”排入群集的队列中并验证结果。Then you'll queue an ingestion from blob storage to the cluster using the Java SDK and validate the results.

先决条件Prerequisites

查看代码Review the code

本部分是可选的。This section is optional. 查看以下代码片段,了解代码的工作原理。Review the following code snippets to learn how the code works. 若要跳过此部分,请转到运行应用程序To skip this section, go to run the application.

身份验证Authentication

该程序将 Azure Active Directory 身份验证凭据与 ConnectionStringBuilder` 结合使用。The program uses Azure Active Directory authentication credentials with ConnectionStringBuilder`.

  1. 创建用于查询和管理的 com.microsoft.azure.kusto.data.ClientCreate a com.microsoft.azure.kusto.data.Client for query and management.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. 创建并使用 com.microsoft.azure.kusto.ingest.IngestClient 将“数据引入”排入 Azure 数据资源管理器中的队列:Create and use a com.microsoft.azure.kusto.ingest.IngestClient to queue data ingestion into Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

管理命令Management commands

通过调用 com.microsoft.azure.kusto.data.Client 对象上的 execute 来执行控制命令(如 .drop.create)。Control commands, such as .drop and .create, are executed by calling execute on a com.microsoft.azure.kusto.data.Client object.

例如,按如下所示创建 StormEvents 表:For example, the StormEvents table is created as follows:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }
    
}

数据引入Data ingestion

使用现有 Azure Blob 存储容器中的文件将“引入”排入队列。Queue ingestion by using a file from an existing Azure Blob Storage container.

  • 使用 BlobSourceInfo 来指定 Blob 存储路径。Use BlobSourceInfo to specify the Blob Storage path.
  • 使用 IngestionProperties 定义表、数据库、映射名称和数据类型。Use IngestionProperties to define table, database, mapping name, and data type. 在以下示例中,数据类型是 CSVIn the following example, the data type is CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.chinacloudapi.cn/%s/%s%s";
    static final String blobStorageAccountName = "kustosamplefiles";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = "??st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D";
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

引入过程在单独的线程中开始,main 线程等待引入线程完成。The ingestion process starts in a separate thread and the main thread waits for the ingestion thread to complete. 此过程使用 CountdownLatchThis process uses CountdownLatch. 引入 API (IngestClient#ingestFromBlob) 不是异步的。The ingestion API (IngestClient#ingestFromBlob) isn't asynchronous. while 循环每 5 秒轮询一次当前状态,并等待引入状态从 Pending 更改为不同状态。A while loop is used to poll the current status every 5 secs and waits for the ingestion status to change from Pending to a different status. 最终状态可以是 SucceededFailedPartiallySucceededThe final status can be Succeeded, Failed, or PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

提示

还有其他为不同应用程序异步处理引入的方法。There are other methods to handle ingestion asynchronously for different applications. 例如,可以使用 CompletableFuture 来创建定义引入后操作的管道(如查询表),或处理向 IngestionStatus 报告的异常。For example, you could use CompletableFuture to create a pipeline defining the action post-ingestion, such as query the table, or handle exceptions that were reported to the IngestionStatus.

运行此应用程序Run the application

常规General

运行示例代码时,将执行以下操作:When you run the sample code, the following actions are performed:

  1. 删除表:删除 StormEvents 表(如果存在)。Drop table: StormEvents table is dropped (if it exists).
  2. 创建表:创建 StormEvents 表。Table creation: StormEvents table is created.
  3. 创建映射:创建 StormEvents_CSV_Mapping 映射。Mapping creation: StormEvents_CSV_Mapping mapping is created.
  4. 引入文件:一个 CSV 文件(在 Azure Blob 存储中)将排队等待引入。File ingestion: A CSV file (in Azure Blob Storage) is queued for ingestion.

以下为示例代码来自 App.javaThe following sample code is from App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

提示

若要尝试不同的操作组合,请在 App.java 中取消注释/注释相应的方法。To try different combinations of operations, uncomment/comment the respective methods in App.java.

运行此应用程序Run the application

  1. 从 GitHub 克隆示例代码:Clone the sample code from GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. 使用以下信息作为程序使用的环境变量来设置服务主体信息:Set the service principal information with the following information as environment variables used by the program:

    • 群集终结点Cluster endpoint
    • 数据库名称Database name
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.chinacloudapi.cn"
    export KUSTO_DB="name of the database"
    
  3. 生成并运行:Build and run:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    输出结果会类似于:The output will be similar to:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

等待几分钟完成引入过程。Wait a few minutes for the ingestion process to complete. 成功完成后,你将看到以下日志消息:Ingestion completed successfullyAfter successful completion, you will see the following log message: Ingestion completed successfully. 此时,你可以退出程序,并转到下一步,这不会影响已排队的引入进程。You can exit the program at this point and move to the next step without impacting the ingestion process, which has already been queued.

验证Validate

等待 5 到 10 分钟,以便已排队的引入调度引入进程并将数据加载到 Azure 数据资源管理器中。Wait five to 10 minutes for the queued ingestion to schedule the ingestion process and load data into Azure Data Explorer.

  1. 登录到 https://dataexplorer.azure.cn 并连接到群集。Sign in to https://dataexplorer.azure.cn and connect to your cluster.

  2. 运行以下命令,以获取 StormEvents 表中的记录的计数:Run the following command to get the count of records in the StormEvents table:

    StormEvents | count
    

故障排除Troubleshoot

  1. 若要查看过去四个小时内是否存在任何失败引入,请在数据库中运行以下命令:To see ingestion failures in the last four hours, run the following command on your database:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. 若要查看过去四个小时内所有引入操作的状态,请运行以下命令:To view the status of all ingestion operations in the last four hours, run the following command:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

清理资源Clean up resources

如果不打算使用刚刚创建的资源,请在数据库中运行以下命令,删除 StormEvents 表。If you don't plan to use the resources you have just created, run the following command in your database to drop the StormEvents table.

.drop table StormEvents

后续步骤Next steps

编写查询Write queries