创建 Azure 数据资源管理器群集和数据库
Azure 数据资源管理器是一项快速、完全托管的数据分析服务,用于实时分析从应用程序、网站和 IoT 设备等资源流式传输的海量数据。 若要使用 Azure 数据资源管理器,请先创建群集,再在该群集中创建一个或多个数据库。 然后,可将数据引入(加载)到数据库,并对其运行查询。
本文介绍如何使用 C#、Python、Go、Azure CLI、PowerShell 或 Azure 资源管理器 (ARM) 模板创建群集和数据库。 若要了解如何使用 Azure 门户创建群集和数据库,请参阅快速入门:创建 Azure 数据资源管理器群集和数据库。
有关基于以前的 SDK 版本的代码示例,请参阅存档的文章。
群集和数据库创建方法的先决条件:
- Azure 订阅。 创建 Azure 帐户。
- 安装 Git。
- 安装合适版本的 Go。 有关支持的版本,请参阅适用于 Go 的 Azure Kusto 模块。
- 可以访问资源的 Microsoft Entra 应用程序和服务主体。 保存“目录(租户) ID”、“应用程序 ID”和“客户端机密”值。
本部分指导你完成创建 Azure 数据资源管理器群集的过程。 选择与你的首选方法相关的选项卡来创建群集。
以下代码演示如何创建群集。
设置所需的环境变量,包括先决条件中所述的服务主体信息。 输入要在其中创建群集的订阅 ID、资源组和区域。
export AZURE_CLIENT_ID="<enter service principal client ID>" export AZURE_CLIENT_SECRET="<enter service principal client secret>" export AZURE_TENANT_ID="<enter tenant ID>" export SUBSCRIPTION="<enter subscription ID>" export RESOURCE_GROUP="<enter resource group name>" export LOCATION="<enter azure location e.g. China East 2>" export CLUSTER_NAME_PREFIX="<enter prefix (cluster name will be [prefix]-ADXTestCluster)>" export DATABASE_NAME_PREFIX="<enter prefix (database name will be [prefix]-ADXTestDB)>"
提示
如果已安装 Azure CLI 并配置了身份验证,请使用 auth.NewAuthorizerFromCLIWithResource。 在这种情况下,不需要创建服务主体。
运行以下代码以创建群集:
import ( "context" "log" "os" "strconv" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/kusto/armkusto" "github.com/olekukonko/tablewriter" ) const ( subscriptionEnvVar = "AZURE_SUBSCRIPTION_ID" resourceGroupEnvVar = "AZURE_RESOURCE_GROUP" locationEnvVar = "AZURE_LOCATION" clusterNamePrefixEnvVar = "CLUSTER_NAME_PREFIX" dbNamePrefixEnvVar = "DATABASE_NAME_PREFIX" clusterName = "ADXTestCluster" databaseName = "ADXTestDB" ) func init() { subscription = os.Getenv(subscriptionEnvVar) if subscription == "" { log.Fatalf("missing environment variable %s", subscriptionEnvVar) } rgName = os.Getenv(resourceGroupEnvVar) if rgName == "" { log.Fatalf("missing environment variable %s", resourceGroupEnvVar) } location = os.Getenv(locationEnvVar) if location == "" { log.Fatalf("missing environment variable %s", locationEnvVar) } clusterNamePrefix = os.Getenv(clusterNamePrefixEnvVar) if clusterNamePrefix == "" { log.Fatalf("missing environment variable %s", clusterNamePrefixEnvVar) } dbNamePrefix = os.Getenv(dbNamePrefixEnvVar) if dbNamePrefix == "" { log.Fatalf("missing environment variable %s", dbNamePrefixEnvVar) } } func getClustersClient(subscription string) *armkusto.ClustersClient { cred, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { log.Fatal(err) } client, err := armkusto.NewClustersClient(subscription, cred, nil) if err != nil { log.Fatal(err) } return client } // 1 instance, Basic tier with compute type Dev(No SLA)_Standard_D11_v2 func createCluster(sub, name, location, rgName string) { ctx := context.Background() numInstances := int32(1) client := getClustersClient(sub) result, err := client.BeginCreateOrUpdate( ctx, rgName, name, armkusto.Cluster{ Location: &location, SKU: &armkusto.AzureSKU{ Name: to.Ptr(armkusto.AzureSKUNameDevNoSLAStandardD11V2), Capacity: &numInstances, Tier: to.Ptr(armkusto.AzureSKUTierBasic), }, }, nil, ) if err != nil { log.Fatal("failed to start cluster creation ", err) } log.Printf("waiting for cluster creation to complete - %s\n", name) r, err := result.PollUntilDone(ctx, nil) if err != nil { log.Fatal(err) } log.Printf("created cluster %s\n", *r.Name) } createCluster(subscription, clusterNamePrefix+clusterName, location, rgName)
列出群集以确保成功完成创建操作:
func listClusters(sub, rgName string) { log.Printf("listing clusters in resource group %s\n", rgName) ctx := context.Background() result := getClustersClient(sub).NewListByResourceGroupPager(rgName, nil) data := [][]string{} for result.More() { temp, err := result.NextPage(ctx) if err != nil { log.Fatal(err) } for _, c := range temp.Value { data = append(data, []string{*c.Name, string(*c.Properties.State), *c.Location, strconv.Itoa(int(*c.SKU.Capacity)), *c.Properties.URI}) } } table := tablewriter.NewWriter(os.Stdout) table.SetHeader([]string{"Name", "State", "Location", "Instances", "URI"}) for _, v := range data { table.Append(v) } table.Render() } listClusters(subscription, rgName)
在本部分,你将在上一部分中创建的群集内创建一个数据库。
以下代码演示如何创建数据库。 包导入和环境变量启动与上一部分相同。
运行以下代码以创建数据库:
func createDatabase(sub, rgName, clusterName, location, dbName string) { ctx := context.Background() client := getDBClient(sub) future, err := client.BeginCreateOrUpdate(ctx, rgName, clusterName, dbName, &armkusto.ReadWriteDatabase{Kind: to.Ptr(armkusto.KindReadWrite), Location: &location}, nil) if err != nil { log.Fatal("failed to start database creation ", err) } log.Printf("waiting for database creation to complete - %s\n", dbName) resp, err := future.PollUntilDone(ctx, nil) if err != nil { log.Fatal(err) } kdb := resp.GetDatabase() log.Printf("created DB %s with ID %s and type %s\n", *kdb.Name, *kdb.ID, *kdb.Type) } createDatabase(subscription, rgName, clusterNamePrefix+clusterName, location, dbNamePrefix+databaseName)
列出数据库以确保成功完成创建操作:
func listDatabases(sub, rgName, clusterName string) { log.Printf("listing databases in cluster %s\n", clusterName) ctx := context.Background() result := getDBClient(sub).NewListByClusterPager(rgName, clusterName, nil) data := [][]string{} for result.More() { temp, err := result.NextPage(ctx) if err != nil { log.Fatal(err) } for _, db := range temp.Value { if *db.GetDatabase().Kind == armkusto.KindReadWrite { data = append(data, []string{*db.GetDatabase().Name, string(*db.GetDatabase().Kind), *db.GetDatabase().Location, *db.GetDatabase().Type}) } } } table := tablewriter.NewWriter(os.Stdout) table.SetHeader([]string{"Name", "State", "Location", "Type"}) for _, v := range data { table.Append(v) } table.Render() } listDatabases(subscription, rgName, clusterNamePrefix+clusterName)