使用 Hive 查询创建用于 Hadoop 群集中数据的功能Create features for data in a Hadoop cluster using Hive queries

本文档将演示如何使用 Hive 查询创建用于 Hadoop 群集中数据的功能。This document shows how to create features for data stored in an Azure HDInsight Hadoop cluster using Hive queries. 这些 Hive 查询使用嵌入式 Hive 用户的定义函数 (UDF) 以及为其提供的脚本。These Hive queries use embedded Hive User-Defined Functions (UDFs), the scripts for which are provided.

创建功能所需要的操作可以是内存密集型。The operations needed to create features can be memory intensive. 在这种情况下,Hive 查询的性能将变得更加重要,可通过优化某些参数来对其进行改善。The performance of Hive queries becomes more critical in such cases and can be improved by tuning certain parameters. 将在最后部分中讨论这些参数的优化。The tuning of these parameters is discussed in the final section.

显示的查询示例特定于 NYC Taxi Trip Data(纽约出租车行程数据)方案,GitHub 存储库中也提供了这些方案。Examples of the queries that are presented are specific to the NYC Taxi Trip Data scenarios are also provided in GitHub repository. 这些查询已具有指定的数据架构,并准备好提交以运行。These queries already have data schema specified and are ready to be submitted to run. 最后部分也会讨论用户可对其进行优化以改善 Hive 查询性能的参数。In the final section, parameters that users can tune so that the performance of Hive queries can be improved are also discussed.

此任务是团队数据科学过程 (TDSP) 中的一个步骤。This task is a step in the Team Data Science Process (TDSP).

必备条件Prerequisites

本文假设用户具备以下条件:This article assumes that you have:

功能生成Feature generation

在本部分中,介绍使用 Hive 查询可用其生成功能的方法的几个示例。In this section, several examples of the ways in which features can be generating using Hive queries are described. 如果已生成其他功能,则可将其作为列添加到现有表,或创建具有这些其他功能和主密钥的新表,新表之后可联接到原始表。Once you have generated additional features, you can either add them as columns to the existing table or create a new table with the additional features and primary key, which can then be joined with the original table. 以下是显示的示例:Here are the examples presented:

  1. 基于频率的功能生成Frequency-based Feature Generation
  2. 二元分类中分类变量的风险Risks of Categorical Variables in Binary Classification
  3. 从日期时间字段中提取功能Extract features from Datetime Field
  4. 从文本字段中提取功能Extract features from Text Field
  5. 计算 GPS 坐标之间的距离Calculate distance between GPS coordinates

基于频率的功能生成Frequency-based feature generation

通常用于计算分类变量级别的频率,或多个分类变量中某些组合级别的频率。It is often useful to calculate the frequencies of the levels of a categorical variable, or the frequencies of certain combinations of levels from multiple categorical variables. 用户可使用以下脚本计算这些频率:Users can use the following script to calculate these frequencies:

select
    a.<column_name1>, a.<column_name2>, a.sub_count/sum(a.sub_count) over () as frequency
from
(
    select
        <column_name1>,<column_name2>, count(*) as sub_count
    from <databasename>.<tablename> group by <column_name1>, <column_name2>
)a
order by frequency desc;

二元分类中分类变量的风险Risks of categorical variables in binary classification

在二元分类中,如果使用的模型只采用数字功能,则需要将非数字分类变量转换为数字功能。In binary classification, non-numeric categorical variables must be converted into numeric features when the models being used only take numeric features. 通过使用数字风险替代非数字级别来完成此转换。This conversion is done by replacing each non-numeric level with a numeric risk. 此部分将演示计算分类变量的风险值(对数几率)的某些泛型 Hive 查询。This section shows some generic Hive queries that calculate the risk values (log odds) of a categorical variable.

set smooth_param1=1;
set smooth_param2=20;
select
    <column_name1>,<column_name2>,
    ln((sum_target+${hiveconf:smooth_param1})/(record_count-sum_target+${hiveconf:smooth_param2}-${hiveconf:smooth_param1})) as risk
from
    (
    select
        <column_nam1>, <column_name2>, sum(binary_target) as sum_target, sum(1) as record_count
    from
        (
        select
            <column_name1>, <column_name2>, if(target_column>0,1,0) as binary_target
        from <databasename>.<tablename>
        )a
    group by <column_name1>, <column_name2>
    )b

在此示例中,将变量 smooth_param1smooth_param2 设置为平滑处理数据中计算的风险值。In this example, variables smooth_param1 and smooth_param2 are set to smooth the risk values calculated from the data. 风险值必须介于 -Inf 和 Inf 之间。Risks have a range between -Inf and Inf. 风险值 > 0,指示目标值等于 1 的概率大于 0.5。A risk > 0 indicates that the probability that the target is equal to 1 is greater than 0.5.

计算风险表之后,用户可通过将其联接到风险表来将风险值分配到表。After the risk table is calculated, users can assign risk values to a table by joining it with the risk table. 前面部分中提供了 Hive 联接查询。The Hive joining query was provided in previous section.

从日期时间字段中提取功能Extract features from datetime fields

Hive 附带一组用于处理日期时间字段的 UDF。Hive comes with a set of UDFs for processing datetime fields. 在 Hive 中,默认日期时间格式为“yyyy-MM-dd 00:00:00”(例如,“1970-01-01 12:21:32”)。In Hive, the default datetime format is 'yyyy-MM-dd 00:00:00' ('1970-01-01 12:21:32' for example). 本部分将演示从日期时间字段中提取某月、某月中某天的示例,以及将非默认格式的日期时间字符串转换为默认格式的日期时间字符串。This section shows examples that extract the day of a month, the month from a datetime field, and other examples that convert a datetime string in a format other than the default format to a datetime string in default format.

select day(<datetime field>), month(<datetime field>)
from <databasename>.<tablename>;

此 Hive 查询假定 <datetime field> 使用的是默认日期时间格式。This Hive query assumes that the <datetime field> is in the default datetime format.

如果日期时间字段并未使用默认格式,则需要先将日期时间字段转换为 Unix 时间戳,然后将 Unix 时间戳转换为默认格式的日期时间字符串。If a datetime field is not in the default format, you need to convert the datetime field into Unix time stamp first, and then convert the Unix time stamp to a datetime string that is in the default format. 如果日期时间使用默认格式,那么用户可以应用嵌入的日期时间 UDF 以提取功能。When the datetime is in default format, users can apply the embedded datetime UDFs to extract features.

select from_unixtime(unix_timestamp(<datetime field>,'<pattern of the datetime field>'))
from <databasename>.<tablename>;

在此查询中,如果 <datetime field> 的模式类似于 03/26/2015 12:04:39,则 <pattern of the datetime field> 应为 'MM/dd/yyyy HH:mm:ss'In this query, if the <datetime field> has the pattern like 03/26/2015 12:04:39, the <pattern of the datetime field>' should be 'MM/dd/yyyy HH:mm:ss'. 若要对其进行测试,用户可以运行To test it, users can run

select from_unixtime(unix_timestamp('05/15/2015 09:32:10','MM/dd/yyyy HH:mm:ss'))
from hivesampletable limit 1;

预配群集时,此查询中的 hivesampletable 会在所有 Azure HDInsight Hadoop 群集上默认预安装。The hivesampletable in this query comes preinstalled on all Azure HDInsight Hadoop clusters by default when the clusters are provisioned.

从文本字段中提取功能Extract features from text fields

如果 Hive 表具有包含以空格分隔的单词的字符串的文本字段,那么以下查询将提取字符串的长度和字符串中的单词数。When the Hive table has a text field that contains a string of words that are delimited by spaces, the following query extracts the length of the string, and the number of words in the string.

select length(<text field>) as str_len, size(split(<text field>,' ')) as word_num
from <databasename>.<tablename>;

计算 GPS 坐标集之间的距离Calculate distances between sets of GPS coordinates

本部分中给出的查询可直接应用于纽约出租车行程数据。The query given in this section can be directly applied to the NYC Taxi Trip Data. 此查询旨在演示如何应用 Hive 中嵌入的数学函数来生成功能。The purpose of this query is to show how to apply an embedded mathematical function in Hive to generate features.

此查询中使用的字段为提取和减少位置的 GPS 坐标,名为 提取_经度提取_纬度减少_经度减少_纬度The fields that are used in this query are the GPS coordinates of pickup and dropoff locations, named pickup_longitude, pickup_latitude, dropoff_longitude, and dropoff_latitude. 计算提取和减少坐标之间直接距离的查询为:The queries that calculate the direct distance between the pickup and dropoff coordinates are:

set R=3959;
set pi=radians(180);
select pickup_longitude, pickup_latitude, dropoff_longitude, dropoff_latitude,
    ${hiveconf:R}*2*2*atan((1-sqrt(1-pow(sin((dropoff_latitude-pickup_latitude)
    *${hiveconf:pi}/180/2),2)-cos(pickup_latitude*${hiveconf:pi}/180)
    *cos(dropoff_latitude*${hiveconf:pi}/180)*pow(sin((dropoff_longitude-pickup_longitude)*${hiveconf:pi}/180/2),2)))
    /sqrt(pow(sin((dropoff_latitude-pickup_latitude)*${hiveconf:pi}/180/2),2)
    +cos(pickup_latitude*${hiveconf:pi}/180)*cos(dropoff_latitude*${hiveconf:pi}/180)*
    pow(sin((dropoff_longitude-pickup_longitude)*${hiveconf:pi}/180/2),2))) as direct_distance
from nyctaxi.trip
where pickup_longitude between -90 and 0
and pickup_latitude between 30 and 90
and dropoff_longitude between -90 and 0
and dropoff_latitude between 30 and 90
limit 10;

计算两个 GPS 坐标之间距离的数学等式可在 Movable Type Scripts(可移动类型脚本)站点上找到,该站点由 Peter Lapisu 创作。The mathematical equations that calculate the distance between two GPS coordinates can be found on the Movable Type Scripts site, authored by Peter Lapisu. 在此 Javascript 中,函数 toRad() 就是 lat_or_lonpi/180,它将度数转换为弧度 。In this Javascript, the function toRad() is just lat_or_lon pi/180, which converts degrees to radians. 此处,lat_or_lon 是纬度或经度。Here, lat_or_lon is the latitude or longitude. 由于 Hive 不提供函数 atan2,但提供函数 atan,所以 atan2 函数通过 Wikipedia 中提供的定义由上述的 Hive 查询中的函数 atan 来实现。Since Hive does not provide the function atan2, but provides the function atan, the atan2 function is implemented by atan function in the above Hive query using the definition provided in Wikipedia.

创建工作区

嵌入 UDF 的 Hive 的完整列表可在 Apache Hive wiki 上的 内置函数 部分中找到)。A full list of Hive embedded UDFs can be found in the Built-in Functions section on the Apache Hive wiki).

高级主题:优化 Hive 参数以加快查询速度Advanced topics: Tune Hive parameters to improve query speed

Hive 群集的默认参数设置可能不适合 Hive 查询以及正在处理查询的数据。The default parameter settings of Hive cluster might not be suitable for the Hive queries and the data that the queries are processing. 在本部分中,讨论用户可对其进行优化以改进 Hive 查询性能的某些参数。This section discusses some parameters that users can tune to improve the performance of Hive queries. 用户需要在查询处理数据之前,先添加优化查询参数。Users need to add the parameter tuning queries before the queries of processing data.

  1. Java 堆空间:对于涉及联接大数据集或处理长记录的查询,一个常见错误为“堆空间不足” 。Java heap space: For queries involving joining large datasets, or processing long records, running out of heap space is one of the common errors. 可以通过将参数 mapreduce.map.java.opts 和 mapreduce.task.io.sort.mb 设置为所需的值来避免此错误 。This error can be avoided by setting parameters mapreduce.map.java.opts and mapreduce.task.io.sort.mb to desired values. 以下是示例:Here is an example:

    set mapreduce.map.java.opts=-Xmx4096m;
    set mapreduce.task.io.sort.mb=-Xmx1024m;
    

    此参数会将 4 GB 内存分配到 Java 堆空间,并通过为其分配更多内存来提高排序效率。This parameter allocates 4-GB memory to Java heap space and also makes sorting more efficient by allocating more memory for it. 如果有任何与堆空间相关的作业失败错误,最好进行这些分配。It is a good idea to play with these allocations if there are any job failure errors related to heap space.

  2. DFS 块大小:此参数设置文件系统存储的最小数据单位。DFS block size: This parameter sets the smallest unit of data that the file system stores. 例如,如果 DFS 块的大小为 128 MB,那么任何小于等于 128 MB 的数据都存储在单个块中。As an example, if the DFS block size is 128 MB, then any data of size less than and up to 128 MB is stored in a single block. 大于 128 MB 的数据会被分配到额外的块。Data that is larger than 128 MB is allotted extra blocks.

  3. 选择较小的块大小会导致 Hadoop 中开销变大,因为名称节点必须处理更多查找属于文件的相关块的请求。Choosing a small block size causes large overheads in Hadoop since the name node has to process many more requests to find the relevant block pertaining to the file. 处理千兆字节(或更大)数据的推荐设置为:A recommended setting when dealing with gigabytes (or larger) data is:

    set dfs.block.size=128m;
    
  4. 优化 Hive 中的联接操作:映射/归约框架中的联接操作通常发生在归约阶段,有时可通过规划映射阶段(也称为“映射联接”)中的联接来实现巨大的提升。Optimizing join operation in Hive: While join operations in the map/reduce framework typically take place in the reduce phase, sometimes, enormous gains can be achieved by scheduling joins in the map phase (also called "mapjoins"). 设置此选项:Set this option:

    set hive.auto.convert.join=true;
    
  5. 将映射器数指定为 Hive:虽然 Hadoop 允许用户设置归约器数量,但是映射器数量通常不由用户设置。Specifying the number of mappers to Hive: While Hadoop allows the user to set the number of reducers, the number of mappers is typically not be set by the user. 允许对此数量在一定程度进行控制的技巧是,选择 Hadoop 变量 mapred.min.split.size 和 mapred.max.split.size,因为每个映射任务的大小由以下内容决定: A trick that allows some degree of control on this number is to choose the Hadoop variables mapred.min.split.size and mapred.max.split.size as the size of each map task is determined by:

    num_maps = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))
    

    通常而言:Typically, the default value of:

    • mapred.min.split.size 的默认值为 0 ,mapred.min.split.size is 0, that of

    • mapred.max.split.size 的默认值为 Long.MAX ,mapred.max.split.size is Long.MAX and that of

    • dfs.block.size 的默认值为 64 MB 。dfs.block.size is 64 MB.

      如我们所见,根据数据大小,通过对其进行“设置”允许对使用的映射器数量进行优化,可优化这些参数。As we can see, given the data size, tuning these parameters by "setting" them allows us to tune the number of mappers used.

  6. 以下是用于优化 Hive 性能的一些其他高级选项 。Here are a few other more advanced options for optimizing Hive performance. 这些选项可用于设置分配用于映射和化简任务的内存,并可用于调节性能。These options allow you to set the memory allocated to map and reduce tasks, and can be useful in tweaking performance. 请记住,mapreduce.reduce.memory.mb 不能大于 Hadoop 群集中每个辅助角色节点的物理内存大小 。Keep in mind that the mapreduce.reduce.memory.mb cannot be greater than the physical memory size of each worker node in the Hadoop cluster.

    set mapreduce.map.memory.mb = 2048;
    set mapreduce.reduce.memory.mb=6144;
    set mapreduce.reduce.java.opts=-Xmx8192m;
    set mapred.reduce.tasks=128;
    set mapred.tasktracker.reduce.tasks.maximum=128;