使用 HDInsight 中用于 Apache Hadoop 的 Azure PowerShell 运行 Apache Sqoop 作业Run Apache Sqoop jobs by using Azure PowerShell for Apache Hadoop in HDInsight

了解如何使用 Azure PowerShell 运行 Azure HDInsight 中的 Apache Sqoop 作业,以便在 HDInsight 群集与 Azure SQL 数据库或 SQL Server 数据库之间导入和导出数据。Learn how to use Azure PowerShell to run Apache Sqoop jobs in Azure HDInsight to import and export data between an HDInsight cluster and an Azure SQL Database or SQL Server database. 本文是在 HDInsight 中将 Apache Sqoop 与 Hadoop 配合使用的续篇。This article is a continuation of Use Apache Sqoop with Hadoop in HDInsight.

必备条件Prerequisites

Sqoop 导出Sqoop export

从 Hive 到 SQL Server。From Hive to SQL Server.

此示例将数据从 Hive hivesampletable 表导出到 SQL 数据库中的 mobiledata 表。This example exports data from the Hive hivesampletable table to the mobiledata table in SQL Database. 设置以下变量的值,然后执行命令。Set the values for the variables below and then execute the command.

$hdinsightClusterName = ""
$httpPassword = ''
$sqlDatabasePassword = ''

# These values only need to be changed if the template was not followed.
$httpUserName = "admin"
$sqlServerLogin = "sqluser"
$sqlServerName = $hdinsightClusterName + "dbserver"
$sqlDatabaseName = $hdinsightClusterName + "db"

$pw = ConvertTo-SecureString -String $httpPassword -AsPlainText -Force
$httpCredential = New-Object System.Management.Automation.PSCredential($httpUserName,$pw)

# Connection string
$connectionString = "jdbc:sqlserver://$sqlServerName.database.windows.net;user=$sqlServerLogin@$sqlServerName;password=$sqlDatabasePassword;database=$sqlDatabaseName"

# start export
New-AzHDInsightSqoopJobDefinition `
    -Command "export --connect $connectionString --table mobiledata --hcatalog-table hivesampletable" `
    | Start-AzHDInsightJob `
        -ClusterName $hdinsightClusterName `
        -HttpCredential $httpCredential

替代执行Alternative execution

  1. 以下代码执行相同的导出;不过,它提供了一种方法来读取输出日志。The code below performs the same export; however, it provides a way to read the output logs. 执行代码,开始导出。Execute the code to begin the export.

    $sqoopCommand = "export --connect $connectionString --table mobiledata --hcatalog-table hivesampletable"
    
    $sqoopDef = New-AzHDInsightSqoopJobDefinition `
        -Command $sqoopCommand
    
    $sqoopJob = Start-AzHDInsightJob `
                    -ClusterName $hdinsightClusterName `
                    -HttpCredential $httpCredential `
                    -JobDefinition $sqoopDef
    
  2. 下面的代码显示输出日志。The code below displays the output logs. 执行下面的代码:Execute the code below:

    Get-AzHDInsightJobOutput `
        -ClusterName $hdinsightClusterName `
        -HttpCredential $httpCredential `
        -JobId $sqoopJob.JobId `
        -DisplayOutputType StandardError
    
    Get-AzHDInsightJobOutput `
        -ClusterName $hdinsightClusterName `
        -HttpCredential $httpCredential `
        -JobId $sqoopJob.JobId `
        -DisplayOutputType StandardOutput
    

如果收到错误消息“The specified blob does not exist.”,请在几分钟后重试。If you receive the error message, The specified blob does not exist., try again after a few minutes.

Sqoop 导入Sqoop import

从 SQL Server 到 Azure 存储。From SQL Server to Azure Storage. 此示例将数据从 SQL 数据库中的 mobiledata 表导入 HDInsight 上的 wasb:///tutorials/usesqoop/importeddata 目录。This example imports data from the mobiledata table in SQL Database, to the wasb:///tutorials/usesqoop/importeddata directory on HDInsight. 数据中的字段将通过制表符分隔,并且相关行由换行符终止。The fields in the data are separated by a tab character, and the lines are terminated by a new-line character. 此示例假定你已完成上一示例。This example assumes you've completed the prior example.

$sqoopCommand = "import --connect $connectionString --table mobiledata --target-dir wasb:///tutorials/usesqoop/importeddata --fields-terminated-by '\t' --lines-terminated-by '\n' -m 1"


$sqoopDef = New-AzHDInsightSqoopJobDefinition `
    -Command $sqoopCommand

$sqoopJob = Start-AzHDInsightJob `
                -ClusterName $hdinsightClusterName `
                -HttpCredential $httpCredential `
                -JobDefinition $sqoopDef

Get-AzHDInsightJobOutput `
    -ClusterName $hdinsightClusterName `
    -HttpCredential $httpCredential `
    -JobId $sqoopJob.JobId `
    -DisplayOutputType StandardError

Get-AzHDInsightJobOutput `
    -ClusterName $hdinsightClusterName `
    -HttpCredential $httpCredential `
    -JobId $sqoopJob.JobId `
    -DisplayOutputType StandardOutput

其他 Sqoop 导出示例Additional Sqoop export example

这是一个说服力很强的示例,它从默认存储帐户中的 /tutorials/usesqoop/data/sample.log 导出数据,然后将其导入到 SQL Server 数据库中名为 log4jlogs 的表中。This is a robust example that exports data from /tutorials/usesqoop/data/sample.log from the default storage account, and then imports it to a table called log4jlogs in a SQL Server database. 此示例不依赖于前面的示例。This example isn't dependent on the prior examples.

下面的 PowerShell 脚本预处理源文件,然后将它导出到 Azure SQL 数据库中的 log4jlogs 表中。The following PowerShell script pre-processes the source file and then exports it to an Azure SQL Database to table log4jlogs. CLUSTERNAMECLUSTERPASSWORDSQLPASSWORD 替换为你在先决条件中使用的值。Replace CLUSTERNAME, CLUSTERPASSWORD, and SQLPASSWORD with the values you used from the prerequisite.

<#------ BEGIN USER INPUT ------#>
$hdinsightClusterName = "CLUSTERNAME"
$httpUserName = "admin"  #default is admin, update as needed
$httpPassword = 'CLUSTERPASSWORD'
$sqlDatabasePassword = 'SQLPASSWORD'
<#------- END USER INPUT -------#>

# Other fixed variable that should be used as is
$sqlServerName = $hdinsightClusterName + "dbserver"
$sqlDatabaseName = $hdinsightClusterName + "db"
$tableName_log4j = "log4jlogs"
$exportDir_log4j = "/tutorials/usesqoop/data"
$sourceBlobName = "example/data/sample.log"
$destBlobName = "tutorials/usesqoop/data/sample.log"
$sqljdbcdriver = "/user/oozie/share/lib/sqoop/mssql-jdbc-7.0.0.jre8.jar"

$cluster = Get-AzHDInsightCluster -ClusterName $hdinsightClusterName
$defaultStorageAccountName = $cluster.DefaultStorageAccount -replace '.blob.core.chinacloudapi.cn'
$defaultStorageContainer = $cluster.DefaultStorageContainer
$resourceGroup = $cluster.ResourceGroup

$sqlServer = Get-AzSqlServer -ResourceGroupName $resourceGroup -ServerName $sqlServerName
$sqlServerLogin = $sqlServer.SqlAdministratorLogin
$sqlServerFQDN = $sqlServer.FullyQualifiedDomainName

#Connect to Azure subscription
Write-Host "`nConnecting to your Azure subscription ..." -ForegroundColor Green
try{Get-AzContext}
catch{Connect-AzAccount}

#pre-process the source file
Write-Host "`nPreprocessing the source file ..." -ForegroundColor Green

# This procedure creates a new file with $destBlobName
# Define the connection string
$defaultStorageAccountKey = (Get-AzStorageAccountKey `
                                -ResourceGroupName $resourceGroup `
                                -Name $defaultStorageAccountName)[0].Value

# Create block blob objects referencing the source and destination blob.
$storageAccount = Get-AzStorageAccount `
    -ResourceGroupName $resourceGroup `
    -Name $defaultStorageAccountName

$storageContainer = ($storageAccount |Get-AzStorageContainer -Name $defaultStorageContainer).CloudBlobContainer

$sourceBlob = $storageContainer.GetBlockBlobReference($sourceBlobName)
$destBlob = $storageContainer.GetBlockBlobReference($destBlobName)

# Define a MemoryStream and a StreamReader for reading from the source file
$stream = New-Object System.IO.MemoryStream
$stream = $sourceBlob.OpenRead()
$sReader = New-Object System.IO.StreamReader($stream)

# Define a MemoryStream and a StreamWriter for writing into the destination file
$memStream = New-Object System.IO.MemoryStream
$writeStream = New-Object System.IO.StreamWriter $memStream

# Pre-process the source blob
$exString = "java.lang.Exception:"
while(-Not $sReader.EndOfStream){
    $line = $sReader.ReadLine()
    $split = $line.Split(" ")

    # remove the "java.lang.Exception" from the first element of the array
    # for example: java.lang.Exception: 2012-02-03 19:11:02 SampleClass8 [WARN] problem finding id 153454612
    if ($split[0] -eq $exString){
        #create a new ArrayList to remove $split[0]
        $newArray = [System.Collections.ArrayList] $split
        $newArray.Remove($exString)

        # update $split and $line
        $split = $newArray
        $line = $newArray -join(" ")
    }

    # remove the lines that has less than 7 elements
    if ($split.count -ge 7){
        write-host $line
        $writeStream.WriteLine($line)
    }
}

# Write to the destination blob
$writeStream.Flush()
$memStream.Seek(0, "Begin")
$destBlob.UploadFromStream($memStream)

#export the log file from the cluster to the SQL database
Write-Host "Exporting the log file ..." -ForegroundColor Green

$pw = ConvertTo-SecureString -String $httpPassword -AsPlainText -Force
$httpCredential = New-Object System.Management.Automation.PSCredential($httpUserName,$pw)

# Connection string
$connectionString = "jdbc:sqlserver://$sqlServerFQDN;user=$sqlServerLogin@$sqlServerName;password=$sqlDatabasePassword;database=$sqlDatabaseName"

# Submit a Sqoop job
$sqoopDef = New-AzHDInsightSqoopJobDefinition `
    -Command "export --connect $connectionString --table $tableName_log4j --export-dir $exportDir_log4j --input-fields-terminated-by \0x20 -m 1" `
    -Files $sqljdbcdriver

$sqoopJob = Start-AzHDInsightJob `
                -ClusterName $hdinsightClusterName `
                -HttpCredential $httpCredential `
                -JobDefinition $sqoopDef

Wait-AzHDInsightJob `
    -ResourceGroupName $resourceGroup `
    -ClusterName $hdinsightClusterName `
    -HttpCredential $httpCredential `
    -JobId $sqoopJob.JobId

Write-Host "Standard Error" -BackgroundColor Green
Get-AzHDInsightJobOutput `
    -ResourceGroupName $resourceGroup `
    -ClusterName $hdinsightClusterName `
    -DefaultStorageAccountName $defaultStorageAccountName `
    -DefaultStorageAccountKey $defaultStorageAccountKey `
    -DefaultContainer $defaultStorageContainer `
    -HttpCredential $httpCredential `
    -JobId $sqoopJob.JobId `
    -DisplayOutputType StandardError

Write-Host "Standard Output" -BackgroundColor Green
Get-AzHDInsightJobOutput `
    -ResourceGroupName $resourceGroupName `
    -ClusterName $hdinsightClusterName `
    -DefaultStorageAccountName $defaultStorageAccountName `
    -DefaultStorageAccountKey $defaultStorageAccountKey `
    -DefaultContainer $defaultStorageContainer `
    -HttpCredential $httpCredential `
    -JobId $sqoopJob.JobId `
    -DisplayOutputType StandardOutput

限制Limitations

基于 Linux 的 HDInsight 存在以下限制:Linux-based HDInsight presents the following limitations:

  • 批量导出:用于将数据导出到 Microsoft SQL Server 或 Azure SQL 数据库的 Sqoop 连接器目前不支持批量插入。Bulk export: The Sqoop connector that's used to export data to Microsoft SQL Server or Azure SQL Database does not currently support bulk inserts.

  • 批处理:如果在执行插入时使用 -batch 开关,Sqoop 将执行多次插入而不是批处理插入操作。Batching: By using the -batch switch when it performs inserts, Sqoop performs multiple inserts instead of batching the insert operations.

后续步骤Next steps

现在你已了解如何使用 Sqoop。Now you have learned how to use Sqoop. 若要了解更多信息,请参阅以下文章:To learn more, see: