在 HDInsight 中通过 Apache Hive 和 Apache Pig 使用 Python 用户定义函数 (UDF)Use Python User Defined Functions (UDF) with Apache Hive and Apache Pig in HDInsight

了解如何在 Azure HDInsight 上的 Apache Hadoop 中通过 Apache Hive 和 Apache Pig 使用 Python 用户定义函数 (UDF)。Learn how to use Python user-defined functions (UDF) with Apache Hive and Apache Pig in Apache Hadoop on Azure HDInsight.

Python on HDInsightPython on HDInsight

Python2.7 默认安装在 HDInsight 3.0 和更高版本上。Python2.7 is installed by default on HDInsight 3.0 and later. 可以结合此 Python 版本使用 Apache Hive 进行流式处理。Apache Hive can be used with this version of Python for stream processing. 流式处理使用 STDOUT 和 STDIN 在 Hive 与 UDF 之间传递数据。Stream processing uses STDOUT and STDIN to pass data between Hive and the UDF.

HDInsight 还包含 Jython,后者是用 Java 编写的 Python 实现。HDInsight also includes Jython, which is a Python implementation written in Java. Jython 直接在 Java 虚拟机上运行,不使用流式处理。Jython runs directly on the Java Virtual Machine and does not use streaming. 将 Python 与 Pig 配合使用时,我们建议将 Jython 用作 Python 解释器。Jython is the recommended Python interpreter when using Python with Pig.

必备条件Prerequisites

  • HDInsight 上的 Hadoop 群集A Hadoop cluster on HDInsight. 请参阅 Linux 上的 HDInsight 入门See Get Started with HDInsight on Linux.
  • SSH 客户端An SSH client. 有关详细信息,请参阅使用 SSH 连接到 HDInsight (Apache Hadoop)For more information, see Connect to HDInsight (Apache Hadoop) using SSH.
  • 群集主存储的 URI 方案The URI scheme for your clusters primary storage. 对于 Azure 存储,此值为 wasb://;对于Azure Data Lake Storage Gen2,此值为 abfs://;对于 Azure Data Lake Storage Gen1,此值为 adl://。This would be wasb:// for Azure Storage, abfs:// for Azure Data Lake Storage Gen2 or adl:// for Azure Data Lake Storage Gen1. 如果为 Azure 存储或 Data Lake Storage Gen2 启用了安全传输,则 URI 分别是 wasbs:// 或 abfss://。另请参阅安全传输If secure transfer is enabled for Azure Storage or Data Lake Storage Gen2, the URI would be wasbs:// or abfss://, respectively See also, secure transfer.
  • 对存储配置所做的可能更改。Possible change to storage configuration. 如果使用 BlobStorage 类型的存储帐户,请参阅存储配置See Storage configuration if using storage account kind BlobStorage.
  • 可选。Optional. 如果计划使用 PowerShell,则需要安装 AZ 模块If Planning to use PowerShell, you will need the AZ module installed.

备注

本文中使用的存储帐户是启用了安全传输的 Azure 存储,因此,本文通篇使用 wasbsThe storage account used in this article was Azure Storage with secure transfer enabled and thus wasbs is used throughout the article.

存储配置Storage configuration

如果使用 Storage (general purpose v1)StorageV2 (general purpose v2) 类型的存储帐户,则不需要执行任何操作。No action is required if the storage account used is of kind Storage (general purpose v1) or StorageV2 (general purpose v2). 本文中的过程至少向 /tezstaging 生成输出。The process in this article will produce output to at least /tezstaging. 默认的 Hadoop 配置将在 core-site.xml 中的 fs.azure.page.blob.dir 配置变量内包含服务 HDFS/tezstagingA default hadoop configuration will contain /tezstaging in the fs.azure.page.blob.dir configuration variable in core-site.xml for service HDFS. 此配置会导致将页 Blob 输出到目录,而 BlobStorage 类型的存储帐户不支持页 Blob。This configuration will cause output to the directory to be page blobs, which are not supported for storage account kind BlobStorage. 若要在本文中使用 BlobStorage,请删除 fs.azure.page.blob.dir 配置变量中的 /tezstagingTo use BlobStorage for this article, remove /tezstaging from the fs.azure.page.blob.dir configuration variable. 可以通过 Ambari UI 访问配置。The configuration can be accessed from the Ambari UI. 否则,会收到错误消息:Page blob is not supported for this account type.Otherwise, you will receive the error message: Page blob is not supported for this account type.

警告

本文档中的步骤基于以下假设:The steps in this document make the following assumptions:

  • 在本地开发环境中创建 Python 脚本。You create the Python scripts on your local development environment.
  • 使用 scp 命令或使用提供的 PowerShell 脚本将脚本上传到 HDInsight。You upload the scripts to HDInsight using either the scp command or the provided PowerShell script.

Apache Hive UDFApache Hive UDF

可通过 HiveQL TRANSFORM 语句将 Python 用作 Hive 中的 UDF。Python can be used as a UDF from Hive through the HiveQL TRANSFORM statement. 例如,以下 HiveQL 调用群集的默认 Azure 存储帐户中存储的 hiveudf.py 文件。For example, the following HiveQL invokes the hiveudf.py file stored in the default Azure Storage account for the cluster.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

下面是本示例执行的操作:Here's what this example does:

  1. 文件开头的 add file 语句将 hiveudf.py 文件添加到分布式缓存,使群集中的所有节点都可访问该文件。The add file statement at the beginning of the file adds the hiveudf.py file to the distributed cache, so it's accessible by all nodes in the cluster.
  2. SELECT TRANSFORM ... USING 语句从 hivesampletable 中选择数据。The SELECT TRANSFORM ... USING statement selects data from the hivesampletable. 它还将 clientid、devicemake 和 devicemodel 值传递到 hiveudf.py 脚本。It also passes the clientid, devicemake, and devicemodel values to the hiveudf.py script.
  3. AS 子句描述从 hiveudf.py 返回的字段。The AS clause describes the fields returned from hiveudf.py.

创建文件Create file

在开发环境中,创建名为 hiveudf.py 的文本文件。On your development environment, create a text file named hiveudf.py. 将以下代码用作该文件的内容:Use the following code as the contents of the file:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

此脚本可执行以下操作:This script performs the following actions:

  1. 从 STDIN 读取一行数据。Reads a line of data from STDIN.
  2. 可以使用 string.strip(line, "\n ") 删除尾随的换行符。The trailing newline character is removed using string.strip(line, "\n ").
  3. 执行流式处理时,一个行就包含了所有值,每两个值之间有一个制表符。When doing stream processing, a single line contains all the values with a tab character between each value. 因此,string.split(line, "\t") 可用于在每个制表符处拆分输入,并只返回字段。So string.split(line, "\t") can be used to split the input at each tab, returning just the fields.
  4. 在处理完成后,必须将输出以单行形式写入到 STDOUT,并在每两个字段之间提供一个制表符。When processing is complete, the output must be written to STDOUT as a single line, with a tab between each field. 例如,print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])For example, print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. while 循环会一直重复到无法读取 lineThe while loop repeats until no line is read.

脚本输出是 devicemakedevicemodel 的输入值的连接,并且是连接值的哈希。The script output is a concatenation of the input values for devicemake and devicemodel, and a hash of the concatenated value.

上传文件 (shell)Upload file (shell)

在以下命令中,请将 sshuser 替换为实际用户名(如果两者不同)。In the commands below, replace sshuser with the actual username if different. mycluster 替换为实际群集名称。Replace mycluster with the actual cluster name. 确保工作目录是文件所在的位置。Ensure your working directory is where the file is located.

  1. 使用 scp 将文件复制到 HDInsight 群集。Use scp to copy the files to your HDInsight cluster. 编辑并输入以下命令:Edit and enter the command below:

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.cn:
    
  2. 使用 SSH 连接到群集。Use SSH to connect to the cluster. 编辑并输入以下命令:Edit and enter the command below:

    ssh sshuser@mycluster-ssh.azurehdinsight.cn
    
  3. 从 SSH 会话将前面上传的 python 文件添加到群集的存储中。From the SSH session, add the python files uploaded previously to the storage for the cluster.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

使用 Hive UDF (shell)Use Hive UDF (shell)

  1. 若要连接到 Hive,请在打开的 SSH 会话中使用以下命令:To connect to Hive, use the following command from your open SSH session:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    此命令启动 Beeline 客户端。This command starts the Beeline client.

  2. 0: jdbc:hive2://headnodehost:10001/> 提示符下输入以下查询:Enter the following query at the 0: jdbc:hive2://headnodehost:10001/> prompt:

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. 在输入最后一行后,该作业应该启动。After entering the last line, the job should start. 作业完成后,其返回的输出类似于以下示例:Once the job completes, it returns output similar to the following example:

     100041    RIM 9650    d476f3687700442549a83fac4560c51c
     100041    RIM 9650    d476f3687700442549a83fac4560c51c
     100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
     100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
     100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. 若要退出 Beeline,请输入以下命令:To exit Beeline, enter the following command:

    !q
    

上传文件 (PowerShell)Upload file (PowerShell)

也可以使用 PowerShell 远程运行 Hive 查询。PowerShell can also be used to remotely run Hive queries. 确保工作目录是 hiveudf.py 所在的位置。Ensure your working directory is where hiveudf.py is located. 使用以下 PowerShell 脚本来运行使用 hiveudf.py 脚本的 Hive 查询:Use the following PowerShell script to run a Hive query that uses the hiveudf.py script:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

备注

有关上传文件的详细信息,请参阅在 HDInsight 中上传 Apache Hadoop 作业的数据文档。For more information on uploading files, see the Upload data for Apache Hadoop jobs in HDInsight document.

使用 Hive UDFUse Hive UDF

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Hive 作业的输出应该如以下示例所示:The output for the Hive job should appear similar to the following example:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Apache Pig UDFApache Pig UDF

在整个 GENERATE 语句中,Python 脚本可用作 Pig 中的 UDF。A Python script can be used as a UDF from Pig through the GENERATE statement. 可以使用 Jython 或 C Python 运行脚本。You can run the script using either Jython or C Python.

  • Jython 在 JVM 上运行,并且原本就能从 Pig 调用。Jython runs on the JVM, and can natively be called from Pig.
  • C Python 是外部进程,因此,JVM 上的 Pig 中的数据将发送到 Python 进程中运行的脚本。C Python is an external process, so the data from Pig on the JVM is sent out to the script running in a Python process. Python 脚本的输出将发回到 Pig 中。The output of the Python script is sent back into Pig.

若要指定 Python 解释器,请在引用 Python 脚本时使用 registerTo specify the Python interpreter, use register when referencing the Python script. 以下示例将脚本作为 myfuncs 注册到 Pig:The following examples register scripts with Pig as myfuncs:

  • 使用 Jythonregister '/path/to/pigudf.py' using jython as myfuncs;To use Jython: register '/path/to/pigudf.py' using jython as myfuncs;
  • 使用 C Pythonregister '/path/to/pigudf.py' using streaming_python as myfuncs;To use C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

重要

使用 Jython 时,pig_jython 文件的路径可以是本地路径或 WASBS:// 路径。When using Jython, the path to the pig_jython file can be either a local path or a WASBS:// path. 但是,使用 C Python 时,必须引用用于提交 Pig 作业的节点的本地文件系统上的文件。However, when using C Python, you must reference a file on the local file system of the node that you are using to submit the Pig job.

通过注册后,此示例的 Pig Latin 对于两个脚本是相同的:Once past registration, the Pig Latin for this example is the same for both:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

下面是本示例执行的操作:Here's what this example does:

  1. 第一行代码将示例数据文件 sample.log 加载到 LOGS 中。The first line loads the sample data file, sample.log into LOGS. 它还将每个记录定义为 chararrayIt also defines each record as a chararray.
  2. 第二行代码筛选出所有 null 值,并将操作结果存储在 LOG 中。The next line filters out any null values, storing the result of the operation into LOG.
  3. 接下来,它将循环访问 LOG 中的记录,并使用 GENERATE 来调用作为 myfuncs 加载的 Python/Jython 脚本中包含的 create_structure 方法。Next, it iterates over the records in LOG and uses GENERATE to invoke the create_structure method contained in the Python/Jython script loaded as myfuncs. LINE 用于将当前记录传递给函数。LINE is used to pass the current record to the function.
  4. 最后,使用 DUMP 命令将输出转储到 STDOUT。Finally, the outputs are dumped to STDOUT using the DUMP command. 在操作完成后,此命令会显示结果。This command displays the results after the operation completes.

创建文件Create file

在开发环境中,创建名为 pigudf.py 的文本文件。On your development environment, create a text file named pigudf.py. 将以下代码用作该文件的内容:Use the following code as the contents of the file:

# Uncomment the following if using C Python
#from pig_util import outputSchema

@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

在 Pig Latin 示例中,已将 LINE 输入定义为 chararray,因为此输入没有一致的架构。In the Pig Latin example, the LINE input is defined as a chararray because there is no consistent schema for the input. Python 脚本将数据转换成用于输出的一致架构。The Python script transforms the data into a consistent schema for output.

  1. @outputSchema 语句定义返回到 Pig 的数据的格式。The @outputSchema statement defines the format of the data that is returned to Pig. 在本例中,该格式为数据袋,这是一种 Pig 数据类型。In this case, it's a data bag, which is a Pig data type. 该数据袋包含以下字段,所有这些字段都是 chararray(字符串):The bag contains the following fields, all of which are chararray (strings):

    • date - 创建日志条目的日期date - the date the log entry was created
    • time - 创建日志条目的时间time - the time the log entry was created
    • classname - 为其创建该条目的类名classname - the class name the entry was created for
    • level - 日志级别level - the log level
    • detail - 日志条目的详细信息detail - verbose details for the log entry
  2. 接下来,def create_structure(input) 将定义一个函数,以便 Pig 将行项传递到其中。Next, the def create_structure(input) defines the function that Pig passes line items to.

  3. 示例数据 sample.log 基本上符合日期、时间、类名、级别和详细信息架构。The example data, sample.log, mostly conforms to the date, time, classname, level, and detail schema. 但是,它包含以 *java.lang.Exception* 开头的几个行。However, it contains a few lines that begin with *java.lang.Exception*. 必须修改这些行,使之与架构匹配。These lines must be modified to match the schema. if 语句会检查这些行,然后调整输入数据以将 *java.lang.Exception* 字符串移到末尾,使数据与预期的输出架构相一致。The if statement checks for those, then massages the input data to move the *java.lang.Exception* string to the end, bringing the data in-line with the expected output schema.

  4. 接下来,使用 split 命令在前四个空格字符处拆分数据。Next, the split command is used to split the data at the first four space characters. 输出会分配到 datetimeclassnameleveldetail 中。The output is assigned into date, time, classname, level, and detail.

  5. 最后,将值返回到 Pig。Finally, the values are returned to Pig.

数据返回到 Pig 时,其架构与 @outputSchema 语句中的定义一致。When the data is returned to Pig, it has a consistent schema as defined in the @outputSchema statement.

上传文件 (shell)Upload file (shell)

在以下命令中,请将 sshuser 替换为实际用户名(如果两者不同)。In the commands below, replace sshuser with the actual username if different. mycluster 替换为实际群集名称。Replace mycluster with the actual cluster name. 确保工作目录是文件所在的位置。Ensure your working directory is where the file is located.

  1. 使用 scp 将文件复制到 HDInsight 群集。Use scp to copy the files to your HDInsight cluster. 编辑并输入以下命令:Edit and enter the command below:

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.cn:
    
  2. 使用 SSH 连接到群集。Use SSH to connect to the cluster. 编辑并输入以下命令:Edit and enter the command below:

    ssh sshuser@mycluster-ssh.azurehdinsight.cn
    
  3. 从 SSH 会话将前面上传的 python 文件添加到群集的存储中。From the SSH session, add the python files uploaded previously to the storage for the cluster.

    hdfs dfs -put pigudf.py /pigudf.py
    

使用 Pig UDF (shell)Use Pig UDF (shell)

  1. 若要连接到 Pig,请在打开的 SSH 会话中使用以下命令:To connect to pig, use the following command from your open SSH session:

    pig
    
  2. grunt> 提示符下输入以下语句:Enter the following statements at the grunt> prompt:

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasb:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. 在输入以下行后,作业应会启动。After entering the following line, the job should start. 作业完成后,其返回的输出类似于以下数据:Once the job completes, it returns output similar to the following data:

     ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
     ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
     ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
     ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
     ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. 使用 quit 退出 Grunt shell,并在本地文件系统上使用以下命令编辑 pigudf.py 文件:Use quit to exit the Grunt shell, and then use the following to edit the pigudf.py file on the local file system:

    nano pigudf.py
    
  5. 进入编辑器后,通过删除行开头的 # 字符来取消注释以下行:Once in the editor, uncomment the following line by removing the # character from the beginning of the line:

    #from pig_util import outputSchema
    

    此行会修改 Python 脚本以使用 C Python 而不是 Jython。This line modifies the Python script to work with C Python instead of Jython. 更改后,请使用 Ctrl+X 退出编辑器 。Once the change has been made, use Ctrl+X to exit the editor. 选择 Y,然后选择 Enter 保存更改 。Select Y, and then Enter to save the changes.

  6. 使用 pig 命令再次启动 shell。Use the pig command to start the shell again. grunt> 提示符下,使用以下命令运行带有 Jython 解释器的 Python 脚本。Once you are at the grunt> prompt, use the following to run the Python script using the C Python interpreter.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    完成此作业后,看到的输出应该与之前使用 Jython 运行脚本后的输出相同。Once this job completes, you should see the same output as when you previously ran the script using Jython.

上传文件 (PowerShell)Upload file (PowerShell)

也可以使用 PowerShell 远程运行 Hive 查询。PowerShell can also be used to remotely run Hive queries. 确保工作目录是 pigudf.py 所在的位置。Ensure your working directory is where pigudf.py is located. 使用以下 PowerShell 脚本来运行使用 pigudf.py 脚本的 Hive 查询:Use the following PowerShell script to run a Hive query that uses the pigudf.py script:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount -EnvironmentName AzureChinaCloud
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

使用 Pig UDF (PowerShell)Use Pig UDF (PowerShell)

备注

使用 PowerShell 远程提交作业时,无法使用 C Python 作为解释器。When remotely submitting a job using PowerShell, it is not possible to use C Python as the interpreter.

也可以使用 PowerShell 运行 Pig Latin 作业。PowerShell can also be used to run Pig Latin jobs. 若要运行使用 pigudf.py 脚本的 Pig Latin 作业,请使用以下 PowerShell 脚本:To run a Pig Latin job that uses the pigudf.py script, use the following PowerShell script:

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Pig 作业的输出应类似于以下数据:The output for the Pig job should appear similar to the following data:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

故障排除Troubleshooting

运行作业时出现错误Errors when running jobs

运行 hive 作业时,可能会遇到类似于以下文本的错误:When running the hive job, you may encounter an error similar to the following text:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

此问题可能是由 Python 文件中的行尾结束符号导致的。This problem may be caused by the line endings in the Python file. 许多 Windows 编辑器默认为使用 CRLF 作为行尾结束符号,但 Linux 应用程序通常应使用 LF。Many Windows editors default to using CRLF as the line ending, but Linux applications usually expect LF.

可以使用 PowerShell 语句删除 CR 字符,然后再将文件上传到 HDInsight:You can use the following PowerShell statements to remove the CR characters before uploading the file to HDInsight:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

PowerShell 脚本PowerShell scripts

用于运行示例的两个示例 PowerShell 脚本都包含一个带注释的行,该行显示作业的错误输出。Both of the example PowerShell scripts used to run the examples contain a commented line that displays error output for the job. 如果未看到作业的预期输出,请取消注释以下行,并查看错误信息中是否指明了问题。If you are not seeing the expected output for the job, uncomment the following line and see if the error information indicates a problem.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

错误信息 (STDERR) 和作业的结果 (STDOUT) 也记录到 HDInsight 存储。The error information (STDERR) and the result of the job (STDOUT) are also logged to your HDInsight storage.

对于此作业...For this job... 在 Blob 容器中查看这些文件Look at these files in the blob container
HiveHive /HivePython/stderr/HivePython/stderr

/HivePython/stdout/HivePython/stdout

PigPig /PigPython/stderr/PigPython/stderr

/PigPython/stdout/PigPython/stdout

后续步骤Next steps

如果需要加载默认情况下未提供的 Python 模块,请参阅如何将模块部署到 Azure HDInsightIf you need to load Python modules that aren't provided by default, see How to deploy a module to Azure HDInsight.

若要了解使用 Pig、Hive 的其他方式以及如何使用 MapReduce,请参阅以下文档:For other ways to use Pig, Hive, and to learn about using MapReduce, see the following documents: