使用 Azure 数据资源管理器 Go SDK 引入数据Ingest data using the Azure Data Explorer Go SDK

Azure 数据资源管理器是一项快速且高度可缩放的数据探索服务,适用于日志和遥测数据。Azure Data Explorer is a fast and highly scalable data exploration service for log and telemetry data. 它提供了一个用于与 Azure 数据资源管理器服务进行交互的 Go SDK 客户端库It provides a Go SDK client library for interacting with the Azure Data Explorer service. 你可以使用 Go SDK 在 Azure 数据资源管理器群集中引入、控制和查询数据。You can use the Go SDK to ingest, control, and query data in Azure Data Explorer clusters.

本文首先在测试群集中创建一个表和数据映射。In this article, you first create a table and data mapping in a test cluster. 然后你将使用 Go SDK 将到群集的引入排队并验证结果。You then queue an ingestion to the cluster using the Go SDK and validate the results.

先决条件Prerequisites

安装 Go SDKInstall the Go SDK

运行使用 Go 模块的示例应用程序时,会自动安装 Azure 数据资源管理器 Go SDK。The Azure Data Explorer Go SDK will be automatically installed when you run the [sample application that uses Go modules. 如果已为另一应用程序安装了 Go SDK,请创建一个 Go 模块并提取 Azure 数据资源管理器程序包(使用 go get),例如:If you installed the Go SDK for another application, create a Go module and fetch the Azure Data Explorer package (using go get), for example:

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

程序包依赖关系将添加到 go.mod 文件中。The package dependency will be added to the go.mod file. 在你的 Go 应用程序中使用它。Use it in your Go application.

查看代码Review the code

查看代码部分是可选的。This Review the code section is optional. 如果有兴趣了解代码如何工作,可以查看以下代码片段。If you're interested to learn how the code works, you can review the following code snippets. 否则,可以跳到运行应用程序Otherwise, you can skip ahead to Run the application.

AuthenticateAuthenticate

在执行任何操作之前,程序都需要向 Azure 数据资源管理器服务进行身份验证。The program needs to authenticate to Azure Data Explorer service before executing any operations.

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

将使用服务主体凭据创建 kusto.Authorization 的一个实例。An instance of kusto.Authorization is created using the service principal credentials. 然后使用该实例以及也接受群集终结点的 New 函数来创建 kusto.ClientIt's then used to create a kusto.Client with the New function that also accepts the cluster endpoint.

创建表Create table

create table 命令是通过一个 Kusto 语句提供的。Mgmt 函数用来执行管理命令。The create table command is represented by a Kusto statement.The Mgmt function is used to execute management commands. 它用于执行命令以创建表。It's used to execute the command to create a table.

func createTable(kc *kusto.Client, kustoDB string) {
    _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
    if err != nil {
        log.Fatal("failed to create table", err)
    }
    log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

提示

默认情况下 Kusto 语句是常量,可以提高安全性。A Kusto statement is constant, by default, for better security. NewStmt 接受字符串常量。NewStmt accepts string constants. UnsafeStmt API 允许使用非常量语句段,但我们不建议使用该 API。The UnsafeStmt API allows for use of non-constant statement segments, but isn't recommended.

Kusto 的 create table 命令如下所示:The Kusto create table command is as follows:

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

创建映射Create mapping

数据映射在引入过程中使用,可将传入数据映射到 Azure 数据资源管理器表中的列。Data mappings are used during ingestion to map incoming data to columns inside Azure Data Explorer tables. 有关详细信息,请参阅数据映射For more information, see data mapping. 映射是按照与表相同的方式创建的,使用 Mgmt 函数与数据库名称以及相应的命令。Mapping is created, in the same way as a table, using the Mgmt function with the database name and the appropriate command. 示例的 GitHub 存储库中提供了完整命令。The complete command is available in the GitHub repo for the sample.

func createMapping(kc *kusto.Client, kustoDB string) {
    _, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
    if err != nil {
        log.Fatal("failed to create mapping - ", err)
    }
    log.Printf("Mapping %s created\n", kustoMappingRefName)
}

引入数据Ingest data

引入将使用现有 Azure Blob 存储容器中的文件进行排队。An ingestion is queued using a file from an existing Azure Blob Storage container.

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
    kIngest, err := ingest.New(kc, kustoDB, kustoTable)
    if err != nil {
        log.Fatal("failed to create ingestion client", err)
    }
    blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
    err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

    if err != nil {
        log.Fatal("failed to ingest file", err)
    }
    log.Println("Ingested file from -", blobStorePath)
}

引入客户端是使用 ingest.New 创建的。The Ingestion client is created using ingest.New. FromFile 函数用来引用 Azure Blob 存储 URI。The FromFile function is used to refer to the Azure Blob Storage URI. 映射引用名称和数据类型是以 FileOption 形式传递的。The mapping reference name and the data type are passed in the form of FileOption.

运行此应用程序Run the application

  1. 从 GitHub 克隆示例代码:Clone the sample code from GitHub:

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. 运行示例代码,如 main.go 中的以下代码片段所示:Run the sample code as seen in this snippet from main.go:

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    提示

    若要尝试不同的操作组合,可以在 main.go 中取消注释/注释相应的函数。To try different combinations of operations, you can uncomment/comment the respective functions in main.go.

    运行示例代码时,将执行以下操作:When you run the sample code, the following actions are performed:

    1. 删除表:删除 StormEvents 表(如果存在)。Drop table: StormEvents table is dropped (if it exists).
    2. 创建表:创建 StormEvents 表。Table creation: StormEvents table is created.
    3. 创建映射:创建 StormEvents_CSV_Mapping 映射。Mapping creation: StormEvents_CSV_Mapping mapping is created.
    4. 引入文件:一个 CSV 文件(在 Azure Blob 存储中)将排队等待引入。File ingestion: A CSV file (in Azure Blob Storage) is queued for ingestion.
  3. 若要创建用于身份验证的服务主体,请通过 Azure CLI 使用 az ad sp create-for-rbac 命令。To create a service principal for authentication, use Azure CLI with the az ad sp create-for-rbac command. 采用程序将使用的环境变量的形式设置服务主体信息,包括群集终结点和数据库名称:Set the service principal information with the cluster endpoint and the database name in the form of environment variables that will be used by the program:

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.chinacloudapi.cn"
    export KUSTO_DB="name of the database"
    
  4. 运行该程序:Run the program:

    go run main.go
    

    你将得到类似以下内容的输出:You'll get a similar output:

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamplefiles.blob.core.windows.net/samplefiles/StormEvents.csv?st=2018-08-31T22%3A02%3A25Z&se=2020-09-01T22%3A02%3A00Z&sp=r&sv=2018-03-28&sr=b&sig=LQIbomcKI8Ooz425hWtjeq6d61uEaq21UVX7YrM61N4%3D
    

验证和故障排除Validate and troubleshoot

等待 5 到 10 分钟,以便已排队的引入调度引入进程并将数据加载到 Azure 数据资源管理器中。Wait for 5 to 10 minutes for the queued ingestion to schedule the ingestion process and load the data into Azure Data Explorer.

  1. 登录到 https://dataexplorer.azure.cn 并连接到群集。Sign in to https://dataexplorer.azure.cn and connect to your cluster. 然后运行以下命令,以获取 StormEvents 表中的记录的计数。Then run the following command to get the count of records in the StormEvents table.

    StormEvents | count
    
  2. 在数据库中运行以下命令以查看过去四个小时内是否存在任何失败引入。Run the following command in your database to see if there were any ingestion failures in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. 运行以下命令以查看过去四个小时内所有引入操作的状态。Run the following command to view the status of all ingestion operations in the last four hours. 在运行之前替换数据库名称。Replace the database name before running.

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

清理资源Clean up resources

如果计划学习我们的其他文章,请保留已创建的资源。If you plan to follow our other articles, keep the resources you created. 否则,请在数据库中运行以下命令以删除 StormEvents 表。If not, run the following command in your database to drop the StormEvents table.

.drop table StormEvents

后续步骤Next steps

编写查询Write queries