教程:使用 Azure HDInsight 提取、转换和加载数据

本教程执行 ETL 操作:提取、转换和加载数据。 有了原始 CSV 数据文件以后,将其导入 Azure HDInsight 群集,使用 Apache Hive 对其进行转换,然后使用 Apache Sqoop 将其加载到 Azure SQL 数据库。

在本教程中,你将了解如何执行以下操作:

  • 提取数据并将其上传到 HDInsight 群集。
  • 使用 Apache Hive 转换数据。
  • 使用 Sqoop 将数据加载到 Azure SQL 数据库。

如果没有 Azure 订阅,请在开始前创建一个试用帐户

先决条件

下载、提取,然后上传数据

在本部分中,你将下载示例航班数据。 然后,将数据上传到 HDInsight 群集,再将该数据复制到 Data Lake Storage Gen2 帐户。

  1. 下载 On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip 文件。 该文件包含航班数据。

  2. 打开命令提示符,使用以下安全负责 (Scp) 命令将 zip 文件上传到 HDInsight 群集头节点:

    scp On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.cn:
    
    • <ssh-user-name> 占位符替换为 HDInsight 群集的 SSH 用户名。
    • <cluster-name> 占位符替换为 HDInsight 群集的名称。

    如果使用密码对 SSH 用户名进行身份验证,系统会提示输入密码。

    如果使用公钥,可能需要使用 -i 参数并指定匹配私钥的路径。 例如,scp -i ~/.ssh/id_rsa <file_name>.zip <user-name>@<cluster-name>-ssh.azurehdinsight.cn:

  3. 上传完成后,使用 SSH 连接到群集。 在命令提示符中输入以下命令:

    ssh <ssh-user-name>@<cluster-name>-ssh.azurehdinsight.cn
    
  4. 使用以下命令解压缩 .zip 文件:

    unzip <file-name>.zip
    

    此命令会提取 .csv 文件。

  5. 使用以下命令创建 Data Lake Storage Gen2 容器。

    hadoop fs -D "fs.azure.createRemoteFileSystemDuringInitialization=true" -ls abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/
    

    <container-name> 占位符替换为你要为容器指定的名称。

    <storage-account-name> 占位符替换为存储帐户的名称。

  6. 使用以下命令创建目录。

    hdfs dfs -mkdir -p abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/tutorials/flightdelays/data
    
  7. 使用以下命令将 .csv 文件复制到目录:

    hdfs dfs -put "On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2016_1.csv" abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/tutorials/flightdelays/data/
    

    如果文件名包含空格或特殊字符,请对文件名使用引号。

转换数据

本部分使用 Beeline 运行 Apache Hive 作业。

在 Apache Hive 作业运行期间,请将 .csv 文件中的数据导入到名为“delays”的 Apache Hive 表中。

  1. 在 HDInsight 群集已有的 SSH 提示符中,使用以下命令创建并编辑名为 flightdelays.hql 的新文件:

    nano flightdelays.hql
    
  2. 修改以下文本,将 <container-name><storage-account-name> 占位符替换为容器和存储帐户名称。 然后将文本复制并粘贴到 nano 控制台中,方法是同时按 SHIFT 键和鼠标右键选择按钮。

      DROP TABLE delays_raw;
      -- Creates an external table over the csv file
      CREATE EXTERNAL TABLE delays_raw (
         YEAR string,
         FL_DATE string,
         UNIQUE_CARRIER string,
         CARRIER string,
         FL_NUM string,
         ORIGIN_AIRPORT_ID string,
         ORIGIN string,
         ORIGIN_CITY_NAME string,
         ORIGIN_CITY_NAME_TEMP string,
         ORIGIN_STATE_ABR string,
         DEST_AIRPORT_ID string,
         DEST string,
         DEST_CITY_NAME string,
         DEST_CITY_NAME_TEMP string,
         DEST_STATE_ABR string,
         DEP_DELAY_NEW float,
         ARR_DELAY_NEW float,
         CARRIER_DELAY float,
         WEATHER_DELAY float,
         NAS_DELAY float,
         SECURITY_DELAY float,
         LATE_AIRCRAFT_DELAY float)
      -- The following lines describe the format and location of the file
      ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
      LINES TERMINATED BY '\n'
      STORED AS TEXTFILE
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/tutorials/flightdelays/data';
    
      -- Drop the delays table if it exists
      DROP TABLE delays;
      -- Create the delays table and populate it with data
      -- pulled in from the CSV file (via the external table defined previously)
      CREATE TABLE delays
      LOCATION 'abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/tutorials/flightdelays/processed'
      AS
      SELECT YEAR AS year,
         FL_DATE AS FlightDate, 
         substring(UNIQUE_CARRIER, 2, length(UNIQUE_CARRIER) -1) AS IATA_CODE_Reporting_Airline,
         substring(CARRIER, 2, length(CARRIER) -1) AS Reporting_Airline, 
         substring(FL_NUM, 2, length(FL_NUM) -1) AS Flight_Number_Reporting_Airline,
         ORIGIN_AIRPORT_ID AS OriginAirportID,
         substring(ORIGIN, 2, length(ORIGIN) -1) AS OriginAirportSeqID,
         substring(ORIGIN_CITY_NAME, 2) AS OriginCityName,
         substring(ORIGIN_STATE_ABR, 2, length(ORIGIN_STATE_ABR) -1)  AS OriginState,
         DEST_AIRPORT_ID AS DestAirportID,
         substring(DEST, 2, length(DEST) -1) AS DestAirportSeqID,
         substring(DEST_CITY_NAME,2) AS DestCityName,
         substring(DEST_STATE_ABR, 2, length(DEST_STATE_ABR) -1) AS DestState,
         DEP_DELAY_NEW AS DepDelay,
         ARR_DELAY_NEW AS ArrDelay,
         CARRIER_DELAY AS CarrierDelay,
         WEATHER_DELAY AS WeatherDelay,
         NAS_DELAY AS NASDelay,
         SECURITY_DELAY AS SecurityDelay,
         LATE_AIRCRAFT_DELAY AS LateAircraftDelay
      FROM delays_raw;
    
  3. 保存文件,方法是键入 CTRL+X,然后根据提示键入 Y

  4. 若要启动 Hive 并运行 flightdelays.hql 文件,请使用以下命令:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http' -f flightdelays.hql
    
  5. flightdelays.hql 脚本完成运行后,使用以下命令打开交互式 Beeline 会话:

    beeline -u 'jdbc:hive2://localhost:10001/;transportMode=http'
    
  6. 收到 jdbc:hive2://localhost:10001/> 提示时,使用以下查询从导入的航班延误数据中检索数据:

    INSERT OVERWRITE DIRECTORY '/tutorials/flightdelays/output'
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    SELECT regexp_replace(OriginCityName, '''', ''),
      avg(WeatherDelay)
    FROM delays
    WHERE WeatherDelay IS NOT NULL
    GROUP BY OriginCityName;
    

    此查询会检索遇到天气延迟的城市的列表以及平均延迟时间,并将其保存到 abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/tutorials/flightdelays/output 中。 稍后,Sqoop 会从该位置读取数据并将其导出到 Azure SQL 数据库。

  7. 若要退出 Beeline,请在提示符处输入 !quit

创建 SQL 数据库表

需要使用 SQL 数据库中的服务器名称才能执行此操作。 完成以下步骤即可找到服务器名称。

  1. 转到 Azure 门户

  2. 选择“SQL 数据库”。

  3. 针对选择使用的数据库的名称进行筛选。 服务器名称在“服务器名称”列中列出。

  4. 针对要使用的数据库的名称进行筛选。 服务器名称在“服务器名称”列中列出。

    Get Azure SQL server details

    可通过多种方式连接到 SQL 数据库并创建表。 以下步骤从 HDInsight 群集使用 FreeTDS

  5. 若要安装 FreeTDS,请使用以下命令从 SSH 连接到群集:

    sudo apt-get --assume-yes install freetds-dev freetds-bin
    
  6. 安装完成后,使用以下命令连接到 SQL 数据库。

    TDSVER=8.0 tsql -H '<server-name>.database.chinacloudapi.cn' -U '<admin-login>' -p 1433 -D '<database-name>'
    
    • <server-name> 占位符替换为逻辑 SQL 服务器名称。

    • 使用 SQL 数据库的管理员用户名替换 <admin-login> 占位符。

    • <database-name> 占位符替换为数据库名称

    出现提示时,输入 SQL 数据库管理员用户名的密码。

    将收到类似于以下文本的输出:

    locale is "en_US.UTF-8"
    locale charset is "UTF-8"
    using default charset "UTF-8"
    Default database being set to sqooptest
    1>
    
  7. 1> 提示符下输入以下语句:

    CREATE TABLE [dbo].[delays](
    [OriginCityName] [nvarchar](50) NOT NULL,
    [WeatherDelay] float,
    CONSTRAINT [PK_delays] PRIMARY KEY CLUSTERED
    ([OriginCityName] ASC))
    GO
    
  8. 输入 GO 语句后,会评估前面的语句。

    此查询创建一个名为 delays 且具有聚集索引的表。

  9. 使用以下查询验证是否已创建表:

    SELECT * FROM information_schema.tables
    GO
    

    输出与以下文本类似:

    TABLE_CATALOG   TABLE_SCHEMA    TABLE_NAME      TABLE_TYPE
    databaseName       dbo             delays        BASE TABLE
    
  10. 1> 提示符下输入 exit 以退出 tsql 实用工具。

导出和加载数据

在前面的部分中,已经在 abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/tutorials/flightdelays/output 位置复制了转换后的数据。 本部分使用 Sqoop 将数据从 abfs://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/tutorials/flightdelays/output 导出到在 Azure SQL 数据库中创建的表。

  1. 使用以下命令验证 Sqoop 是否可以查看 SQL 数据库:

    sqoop list-databases --connect jdbc:sqlserver://<SERVER_NAME>.database.chinacloudapi.cn:1433 --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD>
    

    此命令会返回数据库列表,其中包括创建的 delays 表所在的数据库。

  2. 使用以下命令将 hivesampletable 表中的数据导出到 delays 表:

    sqoop export --connect 'jdbc:sqlserver://<SERVER_NAME>.database.chinacloudapi.cn:1433;database=<DATABASE_NAME>' --username <ADMIN_LOGIN> --password <ADMIN_PASSWORD> --table 'delays' --export-dir 'abfs://<container-name>@.dfs.core.chinacloudapi.cn/tutorials/flightdelays/output' --fields-terminated-by '\t' -m 1
    

    Sqoop 连接到包含 delays 表的数据库,并将数据从 /tutorials/flightdelays/output 目录导出到 delays 表。

  3. sqoop 命令完成后,使用 tsql 实用程序连接到数据库:

    TDSVER=8.0 tsql -H <SERVER_NAME>.database.chinacloudapi.cn -U <ADMIN_LOGIN> -P <ADMIN_PASSWORD> -p 1433 -D <DATABASE_NAME>
    
  4. 使用以下语句验证数据是否已导出到 delays 表:

    SELECT * FROM delays
    GO
    

    会在表中看到一系列数据。 该表包括城市名称和该城市的平均航班延迟时间。

  5. 输入 exit,退出 tsql 实用程序。

清理资源

本教程中使用的所有资源都预先存在。 不需清理。

后续步骤

若要了解使用 HDInsight 中的数据的更多方式,请参阅以下文章: