使用 HDInsight .NET SDK 运行 Apache Hive 查询

HDInsight .NET SDK 提供与 Azure HDInsight 群集上的 Apache Hive 交互的编程方式。 使用 SDK,你可以对 Hive 查询进行身份验证、提交 Hive 查询、监视其执行,以及直接从 .NET 应用程序检索结果。

此方法使开发人员能够将大数据处理集成到现有的 .NET 解决方案、自动化分析工作流,以及构建应用 Hive on HDInsight 功能的自定义应用程序。 借助启用了 Microsoft Entra ID 的群集,还可以获得基于标识的安全访问和集中控制。

先决条件

在开始本文之前,必须具有以下项:

Note

从 2017 年 9 月 15 日开始,HDInsight .NET SDK 仅支持从 Azure 存储帐户返回 Hive 查询结果。 如果将此示例与使用 Azure Data Lake Storage 作为主存储的 HDInsight 群集配合使用,则无法使用 .NET SDK 检索搜索结果。

  • Visual Studio 2013 及更高版本。 至少应安装 .NET 桌面开发工作负载。

运行 Hive 查询

HDInsight .NET SDK 提供 .NET 客户端库,使从 .NET 使用 HDInsight 群集更容易。

  1. 在 Visual Studio 中创建 C# 控制台应用程序。

  2. 在 NuGet 包管理器控制台中运行以下命令:

    Install-Package Microsoft.Azure.HDInsight.Job -Version 3.0.0-preview.3
    
    
  3. 编辑代码以初始化变量的值: ExistingClusterName, TenantId, ClientId,ClientSecret, ExistingClusterPassword,DefaultStorageAccountName,DefaultStorageAccountKey,DefaultStorageContainerName 然后使用修订后的代码作为 Visual Studio 中 Program.cs 的全部内容。

    
     								using System;
       						using System.Collections.Generic;
       						using System.IO;
       						using System.Text;
       						using System.Threading;
       						using System.Threading.Tasks;
       						using Microsoft.Azure.HDInsight.Job;
       						using Microsoft.Azure.HDInsight.Job.Models;
       						using Microsoft.Rest;
       						using Azure.Identity;
       						using Azure.Core;
    
       						namespace SubmitHDInsightJobDotNet
       						{
       						    class Program
       						    {
       						        private static HDInsightJobClient _hdiJobManagementClient;
    
       						        // HDInsight Cluster Configuration
       						        private const string ExistingClusterName = "<cluster_name>";
       						        private const string ExistingClusterUri = ExistingClusterName + ".azurehdinsight.cn";
    
       						        // Service Principal Configuration
       						        private const string TenantId = "<tenant_ID>";
       						        private const string ClientId = "<client_ID>";
       						        private const string ClientSecret = "<secret>";
    
       						        // Storage Account Configuration linked to HDI Cluster
       						        private const string DefaultStorageAccountName = "<storage_acc_name>";
       						        private const string DefaultStorageAccountKey = "<storage_acc_key>";
       						        private const string DefaultStorageContainerName = "<container_name>";
    
       						        static async Task Main(string[] args)
       						        {
       						            System.Console.WriteLine("The application is running ...");
    
       						            try
       						            {
       						                await InitializeHDInsightClientAsync();
    
       						                // Submit Hive job
       						                SubmitHiveJob(); 
       						            }
       						            catch (Exception ex)
       						            {
       						                System.Console.WriteLine($"Error: {ex.Message}");
       						                System.Console.WriteLine($"Stack Trace: {ex.StackTrace}");
       						            }
    
       						            System.Console.WriteLine("Press ENTER to continue ...");
       						            System.Console.ReadLine();
       						        }
    
       						        private static async Task InitializeHDInsightClientAsync()
       						        {
       						            try
       						            {
       						               var credential = new ClientSecretCredential(TenantId, ClientId, ClientSecret);
       						                var tokenRequestContext = new TokenRequestContext(new[] { "https://" + ExistingClusterName + ".clusteraccess.azurehdinsight.cn/.default" });
       						                var tokenResponse = await credential.GetTokenAsync(tokenRequestContext);
    
       						                var tokenCredentials = new TokenCredentials(tokenResponse.Token);
    
       						                var retryPolicy = HDInsightJobClient.HDInsightRetryPolicy;
       						                _hdiJobManagementClient = new HDInsightJobClient(ExistingClusterUri, tokenCredentials, retryPolicy);
    
       						                _hdiJobManagementClient.Username = "admin";
    
       						                System.Console.WriteLine("HDInsight client initialized successfully with Service Principal authentication.");
       						            }
       						            catch (Exception ex)
       						            {
       						                System.Console.WriteLine($"Failed to initialize HDInsight client: {ex.Message}");
       						                throw;
       						            }
       						        }
    
       						        private static void SubmitHiveJob()
       						        {
       						            try
       						            {
       						                Dictionary<string, string> defines = new Dictionary<string, string>
       						                {
       						                    { "hive.execution.engine", "tez" },
       						                    { "hive.exec.reducers.max", "1" }
       						                };
    
       						                List<string> args = new List<string> { "argA", "argB" };
    
       						                var parameters = new HiveJobSubmissionParameters
       						                {
       						                    Query = "SHOW TABLES",
       						                    Defines = defines,
       						                    Arguments = args
       						                };
    
       						                System.Console.WriteLine("Submitting the Hive job to the cluster...");
    
       						                var jobResponse = _hdiJobManagementClient.Job.SubmitHiveJob(parameters);
       						                var jobId = jobResponse.Id;
    
       						                System.Console.WriteLine("JobId is " + jobId);
       						                System.Console.WriteLine("Waiting for the job completion ...");
    
       						                WaitForJobCompletion(jobId);
    
       						                // Get job output
       						                GetJobOutput(jobId);
       						            }
       						            catch (Exception ex)
       						            {
       						                System.Console.WriteLine($"Error submitting Hive job: {ex.Message}");
       						                throw;
       						            }
       						        }
    
       						        private static void WaitForJobCompletion(string jobId)
       						        {
       						            try
       						            {
       						                var jobResponse = _hdiJobManagementClient.Job.GetWithHttpMessagesAsync(jobId).Result;
       						                var jobDetail = jobResponse.Body;
       						                int attempts = 0;
       						                int backoffSeconds = 1;
       						                const int maxAttempts = 300; // Maximum wait time
    
       						                while (jobDetail.Status.JobComplete != true && attempts < maxAttempts)
       						                {
       						                    Thread.Sleep(TimeSpan.FromSeconds(backoffSeconds));
    
       						                    try
       						                    {
       						                        jobResponse = _hdiJobManagementClient.Job.GetWithHttpMessagesAsync(jobId).Result;
       						                        jobDetail = jobResponse.Body;
       						                    }
       						                    catch (Exception ex)
       						                    {
       						                        System.Console.WriteLine($"Error checking job status: {ex.Message}");
       						                        backoffSeconds = Math.Min(backoffSeconds * 2, 30);
       						                        attempts++;
       						                        continue;
       						                    }
    
       						                    attempts++;
    
       						                    if (attempts % 30 == 0)
       						                    {
       						                        System.Console.WriteLine($"Job still running... Status: {jobDetail.Status.State}, Attempt: {attempts}");
       						                    }
    
       						                    backoffSeconds = Math.Max(1, backoffSeconds / 2);
       						                }
    
       						                if (attempts >= maxAttempts)
       						                {
       						                    throw new TimeoutException("Job did not complete within the expected time frame.");
       						                }
    
       						                System.Console.WriteLine($"Job completed with status: {jobDetail.Status.State}");
       						                System.Console.WriteLine($"Job exit value: {jobDetail.ExitValue}");
       						            }
       						            catch (Exception ex)
       						            {
       						                System.Console.WriteLine($"Error waiting for job completion: {ex.Message}");
       						                throw;
       						            }
       						        }
    
       						        private static void GetJobOutput(string jobId)
       						        {
       						            try
       						            {
       						                var jobResponse = _hdiJobManagementClient.Job.GetWithHttpMessagesAsync(jobId).Result;
       						                var jobDetail = jobResponse.Body;
       						                var storageAccess = new AzureStorageAccess(DefaultStorageAccountName, DefaultStorageAccountKey,
       						                    DefaultStorageContainerName);
    
       						                Stream output;
       						                if (jobDetail.ExitValue == 0)
       						                {
       						                    System.Console.WriteLine("Job completed successfully. Fetching output...");
       						                    output = _hdiJobManagementClient.Job.GetJobOutput(jobId, storageAccess);
       						                }
       						                else
       						                {
       						                    System.Console.WriteLine($"Job failed with exit code: {jobDetail.ExitValue}. Fetching error logs...");
       						                    output = _hdiJobManagementClient.Job.GetJobErrorLogs(jobId, storageAccess);
       						                }
    
       						                System.Console.WriteLine("Job output/logs:");
       						                System.Console.WriteLine(new string('=', 50));
    
       						                using (var reader = new StreamReader(output, Encoding.UTF8))
       						                {
       						                    string content = reader.ReadToEnd();
       						                    System.Console.WriteLine(content);
       						                }
    
       						                System.Console.WriteLine(new string('=', 50));
       						            }
       						            catch (Exception ex)
       						            {
       						                System.Console.WriteLine($"Error retrieving job output: {ex.Message}");
       						                throw;
       						            }
       						        }
       						    }
       						}
    
    
  4. F5 运行应用程序。

应用程序的输出应类似于:

显示程序输出的输出的屏幕截图。