教程:Azure Data Lake Storage Gen2、Azure Databricks 和 Spark

本教程介绍如何将 Azure Databricks 群集连接到启用了 Azure Data Lake Storage Gen2 的 Azure 存储帐户中存储的数据。 建立此连接后,即可在群集本机上针对数据运行查询和分析。

在本教程中,将:

  • 将非结构化数据引入存储帐户中
  • 对 Blob 存储中的数据运行分析

如果没有 Azure 订阅,请在开始前创建一个试用帐户

先决条件

创建 Azure Databricks 工作区、群集和笔记本

  1. 创建 Azure Databricks 工作区。 请参阅创建 Azure Databricks 工作区

  2. 创建群集。 参阅创建群集

  3. 创建笔记本。 请参阅创建笔记本。 选择 Python 作为笔记本的默认语言。

使笔记本保持打开状态。 在以下部分使用它。

下载航班数据

本教程使用美国运输统计局 2016 年 1 月的实时性能航班数据来演示如何执行 ETL 操作。 必须下载该数据才能完成本教程。

  1. 下载 On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip 文件。 该文件包含航班数据。

  2. 将压缩文件的内容解压缩,并记下文件名和文件路径。 在稍后的步骤中需要使用此信息。

引入数据

在本部分中,将 .csv 航班数据上传到 Azure Data Lake Storage Gen2 帐户,然后将存储帐户装载到 Databricks 群集。 最后,使用 Databricks 读取 .csv 航班数据并将其写回到 Apache parquet 格式的存储。

将航班数据上传到存储帐户

使用 AzCopy 将 .csv 文件复制到 Azure Data Lake Storage Gen2 帐户。 使用 azcopy make 命令在存储帐户中创建容器。 然后使用 azcopy copy 命令复制刚刚下载到该容器中目录的 csv 数据。

在以下步骤中,需要输入要创建的容器的名称,以及要将航班数据上传到容器中的目录和 Blob。 可以在每个步骤中使用建议的名称,或指定自己的容器、目录和 blob 的命名约定

  1. 打开命令提示符窗口,输入以下命令登录到 Azure Active Directory 以访问存储帐户。

    azcopy login --aad-endpoint https://login.partner.microsoftonline.cn
    

    按照命令提示符窗口中的说明对用户帐户进行身份验证。

  2. 若要在存储帐户中创建容器以存储航班数据,请输入以下命令:

    azcopy make  "https://<storage-account-name>.dfs.core.chinacloudapi.cn/<container-name>" 
    
    • <storage-account-name> 占位符值替换为存储帐户的名称。

    • <container-name> 占位符替换为要创建的容器的名称来存储 csv 数据;例如, flight-data-container

  3. 若要将 csv 数据上传到存储帐户,请输入以下命令。

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.chinacloudapi.cn/<container-name>/<directory-name>/On_Time.csv
    
    • <csv-folder-path> 占位符值替换为 .csv 文件的路径 。

    • <storage-account-name> 占位符值替换为存储帐户的名称。

    • <container-name> 占位符替换为存储帐户中的容器名称。

    • <directory-name> 占位符替换为用于在容器中存储数据的目录的名称;例如,jan2016

将存储帐户装载到 Databricks 群集

在本部分中,将 Azure Data Lake Storage Gen2 云对象存储装载到 Databricks 文件系统 (DBFS)。 使用之前创建的 Azure AD 服务原则对存储帐户进行身份验证。 有关详细信息,请参阅在 Azure Databricks 上装载云对象存储

  1. 将笔记本附加到群集。

    1. 在之前创建的笔记本中,选择笔记本工具栏右上角的连接按钮。 此按钮将打开计算选择器。 (如果已将笔记本连接到群集,按钮文本会显示该群集的名称,而不是连接)。

    2. 在群集下拉菜单中,选择之前创建的群集。

    3. 请注意,群集选择器中的文本将更改为开始。 等待群集完成启动,并等待群集的名称显示在按钮中,然后再继续。

  2. 将以下代码块复制并粘贴到第一个单元格中,但目前请勿运行此代码。

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.partner.microsoftonline.cn/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.chinacloudapi.cn/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. 在此代码块中:

    • configs 中,将 <appId><clientSecret><tenantId> 占位符值替换为在先决条件中创建服务主体时复制的应用程序 ID、客户端密码和租户 ID。

    • source URI 中,将 <storage-account-name><container-name><directory-name> 占位符值替换为 Azure Data Lake Storage Gen2 存储帐户的名称以及将外部测试版数据上传到存储帐户时指定的容器和目录名称。

      注意

      URI 中的方案标识符 abfss 指示 Databricks 将 Azure Blob 文件系统驱动程序与传输层安全性 (TLS) 配合使用。 若要详细了解 URI,请参阅使用 Azure Data Lake Storage Gen2 URI

  4. 在继续操作之前,确保群集已完成启动。

  5. SHIFT + ENTER 键,运行此块中的代码。

上传存储帐户中的外部测试版数据的容器和目录现在可通过装入点 /mnt/flightdata 在笔记本中访问。

使用 Databricks Notebook 将 CSV 转换为 Parquet

现在,csv 外部测试版数据可通过 DBFS 装入点访问,你可以使用 Apache Spark DataFrame 将其加载到工作区中,并将其以 Apache parquet 格式写回到 Azure Data Lake Storage Gen2 对象存储。

  • DataFrame 是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可以使用 DataFrame 以各种支持的格式轻松读取和写入数据。 使用 DataFrame,可以从云对象存储加载数据,并在计算群集中对数据执行分析和转换,而不会影响云对象存储中的基础数据。 若要了解详细信息,请参阅使用 Azure Databricks 上的 PySpark DataFrame

  • Apache Parquet 是一种列式文件格式,具有加快查询速度的优化功能。 它是比 CSV 或 JSON 更为高效的文件格式。 若要了解详细信息,请参阅 Parquet 文件

在笔记本中,添加新单元格,并将以下代码粘贴到其中。

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

SHIFT + ENTER 键,运行此块中的代码。

在继续下一部分之前,请确保已写入所有 parquet 数据,并且输出中显示“完成”。

浏览数据

在本部分中,将使用 Databricks 文件系统实用工具,利用上一部分中创建的 DBFS 装入点浏览 Azure Data Lake Storage Gen2 对象存储。

在新单元格中,粘贴以下代码以获取装入点的文件列表。 第一个命令输出文件和目录的列表。 第二个命令以表格格式显示输出,以便于阅读。

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

SHIFT + ENTER 键,运行此块中的代码。

请注意,parquet 目录显示在列表中。 将 parquet 格式的 .csv 航班数据保存到上一部分中的 parquet/flights 目录。 若要列出 parquet/flights 目录中的文件,请将以下代码粘贴到新单元格中并运行它:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

若要创建新文件并将其列出,请将以下代码粘贴到新单元格中并运行它:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

由于本教程中不需要 1.txt 文件,因此可以将以下代码粘贴到单元格中,并运行它以递归方式删除 mydirectoryTrue 参数指示递归删除。

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

为方便起见,可以使用帮助命令了解有关其他命令的详细信息。

dbutils.fs.help("rm")

通过这些代码示例,你已使用启用了 Azure Data Lake Storage Gen2 的存储帐户中存储的数据探讨了 HDFS 的层次结构性质。

查询数据

接下来,可以开始查询上传到存储帐户中的数据。 将以下每个代码块输入到新单元格中,然后按 SHIFT + ENTER 运行 Python 脚本。

DataFrame 提供一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。

若要从以前保存的 parquet 外部测试版数据加载 DataFrame 并浏览一些受支持的功能,请将此脚本输入到新单元格中并运行它。

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

输入此脚本,以针对数据运行一些基本分析查询。 可以选择运行整个脚本 (SHIFT + ENTER),突出显示每个查询,并使用 CTRL + SHIFT + ENTER 单独运行该脚本,或将每个查询输入单独的单元格并在那里运行。

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

总结

在本教程中,你将了解:

  • 创建 Azure 资源,包括 Azure Data Lake Storage Gen2 存储帐户和 Azure AD 服务主体,并分配了访问存储帐户的权限。

  • 创建 Azure Databricks 工作区、笔记本和计算群集。

  • 使用 AzCopy 将非结构化 .csv 航班数据上传到 Azure Data Lake Storage Gen2 存储帐户。

  • 使用 Databricks 文件系统实用工具函数装载 Azure Data Lake Storage Gen2 存储帐户并浏览其分层文件系统。

  • 使用 Apache Spark DataFrame 将 .csv 航班数据转换为 Apache parquet 格式,并将其存储回 Azure Data Lake Storage Gen2 存储帐户。

  • 使用 DataFrame 浏览外部测试版数据并执行简单的查询。

  • 使用 Apache Spark SQL 查询 2016 年 1 月每个航空公司的航班总数、德克萨斯州的机场、从得克萨斯州起飞的航空公司、全国每个航空公司的平均到达延误时间(以分钟为单位),以及每个航空公司航班延误或抵达时间的百分比。

清理资源

如果想要保留笔记本并稍后返回该笔记本,最好关闭(终止)群集以避免产生费用。 若要终止群集,请在笔记本工具栏右上角的计算选择器中选择它,从菜单中选择终止 ,然后确认选择。 (默认情况下,群集将在处于非活动状态 120 分钟后自动终止。)

如果要删除笔记本和群集等单个工作区资源,可以从工作区的左侧栏中执行此操作。 有关详细说明,请参阅删除群集删除笔记本

如果不再需要本文中创建的资源,可以删除资源组和所有相关资源。 若要在 Azure 门户中执行此操作,请选择存储帐户所在的资源组和工作区,然后选择“删除”

后续步骤