在 HDInsight 中使用 Azure PowerShell 运行 Apache Sqoop 作业
了解如何使用 Azure PowerShell 运行 Azure HDInsight 中的 Apache Sqoop 作业,以便在 HDInsight 群集与 Azure SQL 数据库或 SQL Server 之间导入和导出数据。 本文是在 HDInsight 中将 Apache Sqoop 与 Hadoop 配合使用一文的延续。
先决条件
一个安装有 Azure PowerShell AZ 模块的工作站。
熟悉 Sqoop。 有关详细信息,请参阅 Sqoop 用户指南。
Sqoop 导出
从 Hive 到 SQL。
此示例将数据从 Hive hivesampletable
表导出到 SQL 中的 mobiledata
表。 设置以下变量的值,然后执行命令。
$hdinsightClusterName = ""
$httpPassword = ''
$sqlDatabasePassword = ''
# These values only need to be changed if the template was not followed.
$httpUserName = "admin"
$sqlServerLogin = "sqluser"
$sqlServerName = $hdinsightClusterName + "dbserver"
$sqlDatabaseName = $hdinsightClusterName + "db"
$pw = ConvertTo-SecureString -String $httpPassword -AsPlainText -Force
$httpCredential = New-Object System.Management.Automation.PSCredential($httpUserName,$pw)
# Connection string
$connectionString = "jdbc:sqlserver://$sqlServerName.database.chinacloudapi.cn;user=$sqlServerLogin@$sqlServerName;password=$sqlDatabasePassword;database=$sqlDatabaseName"
# start export
New-AzHDInsightSqoopJobDefinition `
-Command "export --connect $connectionString --table mobiledata --hcatalog-table hivesampletable" `
| Start-AzHDInsightJob `
-ClusterName $hdinsightClusterName `
-HttpCredential $httpCredential
替代执行
以下代码执行相同的导出;不过,它提供了一种方法来读取输出日志。 执行代码,开始导出。
$sqoopCommand = "export --connect $connectionString --table mobiledata --hcatalog-table hivesampletable" $sqoopDef = New-AzHDInsightSqoopJobDefinition ` -Command $sqoopCommand $sqoopJob = Start-AzHDInsightJob ` -ClusterName $hdinsightClusterName ` -HttpCredential $httpCredential ` -JobDefinition $sqoopDef
下面的代码显示输出日志。 执行下面的代码:
Get-AzHDInsightJobOutput ` -ClusterName $hdinsightClusterName ` -HttpCredential $httpCredential ` -JobId $sqoopJob.JobId ` -DisplayOutputType StandardError Get-AzHDInsightJobOutput ` -ClusterName $hdinsightClusterName ` -HttpCredential $httpCredential ` -JobId $sqoopJob.JobId ` -DisplayOutputType StandardOutput
如果收到错误消息“The specified blob does not exist.
”,请在几分钟后重试。
Sqoop 导入
从 SQL 到 Azure 存储。 此示例将数据从 SQL 中的 mobiledata
表导入 HDInsight 上的 wasb:///tutorials/usesqoop/importeddata
目录。 数据中的字段将通过制表符分隔,并且相关行由换行符终止。 此示例假定你已完成上一示例。
$sqoopCommand = "import --connect $connectionString --table mobiledata --target-dir wasb:///tutorials/usesqoop/importeddata --fields-terminated-by '\t' --lines-terminated-by '\n' -m 1"
$sqoopDef = New-AzHDInsightSqoopJobDefinition `
-Command $sqoopCommand
$sqoopJob = Start-AzHDInsightJob `
-ClusterName $hdinsightClusterName `
-HttpCredential $httpCredential `
-JobDefinition $sqoopDef
Get-AzHDInsightJobOutput `
-ClusterName $hdinsightClusterName `
-HttpCredential $httpCredential `
-JobId $sqoopJob.JobId `
-DisplayOutputType StandardError
Get-AzHDInsightJobOutput `
-ClusterName $hdinsightClusterName `
-HttpCredential $httpCredential `
-JobId $sqoopJob.JobId `
-DisplayOutputType StandardOutput
其他 Sqoop 导出示例
这是一个说服力很强的示例,它从默认存储帐户中的 /tutorials/usesqoop/data/sample.log
导出数据,然后将其导入到 SQL Server 数据库中名为 log4jlogs
的表中。 此示例不依赖于前面的示例。
下面的 PowerShell 脚本预处理源文件,然后将其导出到 log4jlogs
表。 请将 CLUSTERNAME
、CLUSTERPASSWORD
和 SQLPASSWORD
替换为在先决条件部分使用的值。
<#------ BEGIN USER INPUT ------#>
$hdinsightClusterName = "CLUSTERNAME"
$httpUserName = "admin" #default is admin, update as needed
$httpPassword = 'CLUSTERPASSWORD'
$sqlDatabasePassword = 'SQLPASSWORD'
<#------- END USER INPUT -------#>
# Other fixed variable that should be used as is
$sqlServerName = $hdinsightClusterName + "dbserver"
$sqlDatabaseName = $hdinsightClusterName + "db"
$tableName_log4j = "log4jlogs"
$exportDir_log4j = "/tutorials/usesqoop/data"
$sourceBlobName = "example/data/sample.log"
$destBlobName = "tutorials/usesqoop/data/sample.log"
$sqljdbcdriver = "/user/oozie/share/lib/sqoop/mssql-jdbc-7.0.0.jre8.jar"
$cluster = Get-AzHDInsightCluster -ClusterName $hdinsightClusterName
$defaultStorageAccountName = $cluster.DefaultStorageAccount -replace '.blob.core.chinacloudapi.cn'
$defaultStorageContainer = $cluster.DefaultStorageContainer
$resourceGroup = $cluster.ResourceGroup
$sqlServer = Get-AzSqlServer -ResourceGroupName $resourceGroup -ServerName $sqlServerName
$sqlServerLogin = $sqlServer.SqlAdministratorLogin
$sqlServerFQDN = $sqlServer.FullyQualifiedDomainName
#Connect to Azure subscription
Write-Host "`nConnecting to your Azure subscription ..." -ForegroundColor Green
try{Get-AzContext}
catch{Connect-AzAccount -Environment AzureChinaCloud}
#pre-process the source file
Write-Host "`nPreprocessing the source file ..." -ForegroundColor Green
# This procedure creates a new file with $destBlobName
# Define the connection string
$defaultStorageAccountKey = (Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $defaultStorageAccountName)[0].Value
# Create block blob objects referencing the source and destination blob.
$storageAccount = Get-AzStorageAccount `
-ResourceGroupName $resourceGroup `
-Name $defaultStorageAccountName
$storageContainer = ($storageAccount |Get-AzStorageContainer -Name $defaultStorageContainer).CloudBlobContainer
$sourceBlob = $storageContainer.GetBlockBlobReference($sourceBlobName)
$destBlob = $storageContainer.GetBlockBlobReference($destBlobName)
# Define a MemoryStream and a StreamReader for reading from the source file
$stream = New-Object System.IO.MemoryStream
$stream = $sourceBlob.OpenRead()
$sReader = New-Object System.IO.StreamReader($stream)
# Define a MemoryStream and a StreamWriter for writing into the destination file
$memStream = New-Object System.IO.MemoryStream
$writeStream = New-Object System.IO.StreamWriter $memStream
# Pre-process the source blob
$exString = "java.lang.Exception:"
while(-Not $sReader.EndOfStream){
$line = $sReader.ReadLine()
$split = $line.Split(" ")
# remove the "java.lang.Exception" from the first element of the array
# for example: java.lang.Exception: 2012-02-03 19:11:02 SampleClass8 [WARN] problem finding id 153454612
if ($split[0] -eq $exString){
#create a new ArrayList to remove $split[0]
$newArray = [System.Collections.ArrayList] $split
$newArray.Remove($exString)
# update $split and $line
$split = $newArray
$line = $newArray -join(" ")
}
# remove the lines that has less than 7 elements
if ($split.count -ge 7){
write-host $line
$writeStream.WriteLine($line)
}
}
# Write to the destination blob
$writeStream.Flush()
$memStream.Seek(0, "Begin")
$destBlob.UploadFromStream($memStream)
#export the log file from the cluster to SQL
Write-Host "Exporting the log file ..." -ForegroundColor Green
$pw = ConvertTo-SecureString -String $httpPassword -AsPlainText -Force
$httpCredential = New-Object System.Management.Automation.PSCredential($httpUserName,$pw)
# Connection string
$connectionString = "jdbc:sqlserver://$sqlServerFQDN;user=$sqlServerLogin@$sqlServerName;password=$sqlDatabasePassword;database=$sqlDatabaseName"
# Submit a Sqoop job
$sqoopDef = New-AzHDInsightSqoopJobDefinition `
-Command "export --connect $connectionString --table $tableName_log4j --export-dir $exportDir_log4j --input-fields-terminated-by \0x20 -m 1" `
-Files $sqljdbcdriver
$sqoopJob = Start-AzHDInsightJob `
-ClusterName $hdinsightClusterName `
-HttpCredential $httpCredential `
-JobDefinition $sqoopDef
Wait-AzHDInsightJob `
-ResourceGroupName $resourceGroup `
-ClusterName $hdinsightClusterName `
-HttpCredential $httpCredential `
-JobId $sqoopJob.JobId
Write-Host "Standard Error" -BackgroundColor Green
Get-AzHDInsightJobOutput `
-ResourceGroupName $resourceGroup `
-ClusterName $hdinsightClusterName `
-DefaultStorageAccountName $defaultStorageAccountName `
-DefaultStorageAccountKey $defaultStorageAccountKey `
-DefaultContainer $defaultStorageContainer `
-HttpCredential $httpCredential `
-JobId $sqoopJob.JobId `
-DisplayOutputType StandardError
Write-Host "Standard Output" -BackgroundColor Green
Get-AzHDInsightJobOutput `
-ResourceGroupName $resourceGroupName `
-ClusterName $hdinsightClusterName `
-DefaultStorageAccountName $defaultStorageAccountName `
-DefaultStorageAccountKey $defaultStorageAccountKey `
-DefaultContainer $defaultStorageContainer `
-HttpCredential $httpCredential `
-JobId $sqoopJob.JobId `
-DisplayOutputType StandardOutput
限制
基于 Linux 的 HDInsight 存在以下限制:
批量导出:用于将数据导出到 SQL 的 Sqoop 连接器目前不支持批量插入。
批处理:如果在执行插入时使用
-batch
开关,Sqoop 将执行多次插入而不是批处理插入操作。
后续步骤
现在,你已了解了如何使用 Sqoop。 若要了解更多信息,请参阅以下文章:
- 将 Apache Oozie 和 HDInsight 配合使用:在 Oozie 工作流中使用 Sqoop 操作。
- 将数据上传到 HDInsight:了解将数据上传到 HDInsight 或 Azure Blob 存储的其他方法。