教程:在 Azure HDInsight 中使用交互式查询提取、转换和加载数据

在本教程中,我们将下载公开发布的航班数据的原始 CSV 数据文件。 将该文件导入 HDInsight 群集存储,然后使用 Azure HDInsight 中的 Interactive Query 来转换数据。 数据转换完毕后,使用 Apache Sqoop 将数据加载到 Azure SQL 数据库中的某个数据库中。

本教程涵盖以下任务:

  • 下载示例航班数据
  • 将数据上传到 HDInsight 群集
  • 使用交互式查询转换数据
  • 在 Azure SQL 数据库的某个数据库中创建表
  • 使用 Sqoop 将数据导出到 Azure SQL 数据库中的某个数据库

先决条件

下载航班数据

  1. 浏览到美国研究与技术创新管理部门、运输统计局

  2. 在页面上,清除所有字段,然后选择以下值:

    名称
    筛选年份 2019
    筛选期间 1 月
    字段 Year、FlightDate、Reporting_Airline、DOT_ID_Reporting_Airline、Flight_Number_Reporting_Airline、OriginAirportID、Origin、OriginCityName、OriginState、DestAirportID、Dest、DestCityName、DestState、DepDelayMinutes、ArrDelay、ArrDelayMinutes、CarrierDelay、WeatherDelay、NASDelay、SecurityDelay、LateAircraftDelay。
  3. 选择“下载”。 你将得到一个具有所选数据字段的 zip 文件。

将数据上传到 HDInsight 群集

可通过多种方式将数据上传到与 HDInsight 群集相关的存储。 本部分使用 scp 上传数据。 若要了解上传数据的其他方式,请参阅将数据上传到 HDInsight

  1. 将 .zip 文件上传到 HDInsight 群集头节点。 编辑以下命令,将 FILENAME 替换为 .zip 文件的名称,将 CLUSTERNAME 替换为 HDInsight 群集的名称。 然后打开命令提示符,将工作目录设置为文件位置,然后输入命令。

    scp FILENAME.zip sshuser@CLUSTERNAME-ssh.azurehdinsight.cn:FILENAME.zip
    

    如果出现提示,请输入“yes”或“no”以继续。 键入时,文本在窗口中不可见。

  2. 上传完成后,使用 SSH 连接到群集。 通过将 CLUSTERNAME 替换为 HDInsight 群集的名称来编辑以下命令。 然后输入以下命令:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.cn
    
  3. 建立 SSH 连接后设置环境变量。 将 FILE_NAMESQL_SERVERNAMESQL_DATABASESQL_USERSQL_PASWORD 替换为适当的值。 然后输入该命令:

    export FILENAME=FILE_NAME
    export SQLSERVERNAME=SQL_SERVERNAME
    export DATABASE=SQL_DATABASE
    export SQLUSER=SQL_USER
    export SQLPASWORD='SQL_PASWORD'
    
  4. 通过输入以下命令解压缩 .zip 文件:

    unzip $FILENAME.zip
    
  5. 在 HDInsight 存储上创建目录,然后输入以下命令将 .csv 文件复制到该目录:

    hdfs dfs -mkdir -p /tutorials/flightdelays/data
    hdfs dfs -put $FILENAME.csv /tutorials/flightdelays/data/
    

使用 Hive 查询转换数据

可通过多种方式在 HDInsight 群集上运行 Hive 作业。 本部分使用 Beeline 运行 Hive 作业。 有关以其他方式运行 Hive 作业的信息,请参阅在 HDInsight 上使用 Apache Hive

在 Hive 作业运行期间,请将 .csv 文件中的数据导入到名为“Delays”的 Hive 表中。

  1. 在 HDInsight 群集已有的 SSH 提示符中,使用以下命令创建并编辑名为“flightdelays.hql”的新文件:

    nano flightdelays.hql
    
  2. 将以下文本用作此文件的内容:

    DROP TABLE delays_raw;
    -- Creates an external table over the csv file
    CREATE EXTERNAL TABLE delays_raw (
        YEAR string,
        FL_DATE string,
        UNIQUE_CARRIER string,
        CARRIER string,
        FL_NUM string,
        ORIGIN_AIRPORT_ID string,
        ORIGIN string,
        ORIGIN_CITY_NAME string,
        ORIGIN_CITY_NAME_TEMP string,
        ORIGIN_STATE_ABR string,
        DEST_AIRPORT_ID string,
        DEST string,
        DEST_CITY_NAME string,
        DEST_CITY_NAME_TEMP string,
        DEST_STATE_ABR string,
        DEP_DELAY_NEW float,
        ARR_DELAY_NEW float,
        CARRIER_DELAY float,
        WEATHER_DELAY float,
        NAS_DELAY float,
        SECURITY_DELAY float,
        LATE_AIRCRAFT_DELAY float)
    -- The following lines describe the format and location of the file
    ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    LINES TERMINATED BY '\n'
    STORED AS TEXTFILE
    LOCATION '/tutorials/flightdelays/data';
    
    -- Drop the delays table if it exists
    DROP TABLE delays;
    -- Create the delays table and populate it with data
    -- pulled in from the CSV file (via the external table defined previously)
    CREATE TABLE delays AS
    SELECT YEAR AS year,
        FL_DATE AS flight_date,
        substring(UNIQUE_CARRIER, 2, length(UNIQUE_CARRIER) -1) AS unique_carrier,
        substring(CARRIER, 2, length(CARRIER) -1) AS carrier,
        substring(FL_NUM, 2, length(FL_NUM) -1) AS flight_num,
        ORIGIN_AIRPORT_ID AS origin_airport_id,
        substring(ORIGIN, 2, length(ORIGIN) -1) AS origin_airport_code,
        substring(ORIGIN_CITY_NAME, 2) AS origin_city_name,
        substring(ORIGIN_STATE_ABR, 2, length(ORIGIN_STATE_ABR) -1)  AS origin_state_abr,
        DEST_AIRPORT_ID AS dest_airport_id,
        substring(DEST, 2, length(DEST) -1) AS dest_airport_code,
        substring(DEST_CITY_NAME,2) AS dest_city_name,
        substring(DEST_STATE_ABR, 2, length(DEST_STATE_ABR) -1) AS dest_state_abr,
        DEP_DELAY_NEW AS dep_delay_new,
        ARR_DELAY_NEW AS arr_delay_new,
        CARRIER_DELAY AS carrier_delay,
        WEATHER_DELAY AS weather_delay,
        NAS_DELAY AS nas_delay,
        SECURITY_DELAY AS security_delay,
        LATE_AIRCRAFT_DELAY AS late_aircraft_delay
    FROM delays_raw;
    
  3. 若要保存该文件,请按“Ctrl + X”,然后按“y”,然后输入 。

  4. 若要启动 Hive 并运行 flightdelays.hql 文件,请使用以下命令:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http' -f flightdelays.hql
    
  5. flightdelays.hql 脚本完成运行后,使用以下命令打开交互式 Beeline 会话:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http'
    
  6. 收到 jdbc:hive2://localhost:10001/> 提示时,使用以下查询从导入的航班延误数据中检索数据:

    INSERT OVERWRITE DIRECTORY '/tutorials/flightdelays/output'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    SELECT regexp_replace(origin_city_name, '''', ''),
        avg(weather_delay)
    FROM delays
    WHERE weather_delay IS NOT NULL
    GROUP BY origin_city_name;
    

    此查询会检索遇到天气延迟的城市的列表以及平均延迟时间,并将其保存到 /tutorials/flightdelays/output 中。 稍后,Sqoop 会从该位置读取数据并将其导出到 Azure SQL 数据库。

  7. 若要退出 Beeline,请在提示符处输入 !quit

创建 SQL 数据库表

可通过多种方式连接到 SQL 数据库并创建表。 以下步骤从 HDInsight 群集使用 FreeTDS

  1. 若要安装 FreeTDS,请使用以下命令从打开的 SSH 连接到群集:

    sudo apt-get --assume-yes install freetds-dev freetds-bin
    
  2. 安装完成后,使用以下命令连接到 SQL 数据库。

    TDSVER=8.0 tsql -H $SQLSERVERNAME.database.chinacloudapi.cn -U $SQLUSER -p 1433 -D $DATABASE -P $SQLPASWORD
    

    将收到类似于以下文本的输出:

    locale is "en_US.UTF-8"
    locale charset is "UTF-8"
    using default charset "UTF-8"
    Default database being set to <yourdatabase>
    1>
    
  3. 1> 提示符下,输入以下行:

    CREATE TABLE [dbo].[delays](
    [origin_city_name] [nvarchar](50) NOT NULL,
    [weather_delay] float,
    CONSTRAINT [PK_delays] PRIMARY KEY CLUSTERED
    ([origin_city_name] ASC))
    GO
    

    输入 GO 语句后,会评估前面的语句。 此语句会创建一个名为“delays”且具有聚集索引的表。

    使用以下查询验证是否已创建该表:

    SELECT * FROM information_schema.tables
    GO
    

    输出与以下文本类似:

    TABLE_CATALOG   TABLE_SCHEMA    TABLE_NAME      TABLE_TYPE
    databaseName       dbo             delays        BASE TABLE
    
  4. 1> 提示符下输入 exit 以退出 tsql 实用工具。

使用 Apache Sqoop 将数据导出到 SQL 数据库

在前面的部分中,已经在 /tutorials/flightdelays/output 复制了转换后的数据。 本部分使用 Sqoop 将数据从 /tutorials/flightdelays/output 导出到在 Azure SQL 数据库中创建的表。

  1. 通过输入以下命令验证 Sqoop 是否可以查看 SQL 数据库:

    sqoop list-databases --connect jdbc:sqlserver://$SQLSERVERNAME.database.chinacloudapi.cn:1433 --username $SQLUSER --password $SQLPASWORD
    

    此命令会返回数据库列表,其中包括此前创建的 delays 表所在的数据库。

  2. 通过输入以下命令将数据从 /tutorials/flightdelays/output 导出到 delays 表:

    sqoop export --connect "jdbc:sqlserver://$SQLSERVERNAME.database.chinacloudapi.cn:1433;database=$DATABASE" --username $SQLUSER --password $SQLPASWORD --table 'delays' --export-dir '/tutorials/flightdelays/output' --fields-terminated-by '\t' -m 1
    

    Sqoop 连接到包含 delays 表的数据库,并将数据从 /tutorials/flightdelays/output 目录导出到 delays 表。

  3. Sqoop 命令完成后,使用 tsql 实用程序通过输入以下命令连接到数据库:

    TDSVER=8.0 tsql -H $SQLSERVERNAME.database.chinacloudapi.cnt -U $SQLUSER -p 1433 -D $DATABASE -P $SQLPASWORD
    

    使用以下语句验证数据是否已导出到 delays 表:

    SELECT * FROM delays
    GO
    

    会在表中看到一系列数据。 该表包括城市名称和该城市的平均航班延迟时间。

    键入 exit 退出 tsql 实用工具。

清理资源

完成教程之后,可能想要删除该群集。 有了 HDInsight,便可以将数据存储在 Azure 存储中,因此可以在群集不用时安全地删除群集。 此外,还需要支付 HDInsight 群集费用,即使未使用。 由于群集费用高于存储空间费用数倍,因此在不使用群集时将其删除可以节省费用。

若要删除群集,请参阅使用浏览器、PowerShell 或 Azure CLI 删除 HDInsight 群集

后续步骤

在本教程中,你获取了一个原始 CSV 数据文件,将其导入到 HDInsight 群集存储中,然后使用 Azure HDInsight 中的交互式查询转换数据。 继续学习下一篇教程,了解 Apache Hive Warehouse Connector。