教程:在 HDInsight 中生成 Apache Spark 机器学习应用程序Tutorial: Build an Apache Spark machine learning application in HDInsight

本教程介绍如何使用 Jupyter Notebook 生成适用于 Azure HDInsight 的 Apache Spark 机器学习应用程序。In this tutorial, you learn how to use the Jupyter Notebook to build an Apache Spark machine learning application for Azure HDInsight.

MLlib 是 Spark 的可缩放机器学习库,由常见学习算法和实用工具(包括分类、回归、聚集、协作筛选、维数约简以及底层优化基元)组成。MLlib is Spark��s scalable machine learning library consisting of common learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as underlying optimization primitives.

本教程介绍如何执行下列操作:In this tutorial, you learn how to:

  • 开发 Apache Spark 机器学习应用程序Develop an Apache Spark machine learning application

如果没有 Azure 订阅,请在开始前创建一个试用帐户If you don't have an Azure subscription, create a trial account before you begin.

先决条件Prerequisites

了解数据集Understand the data set

应用程序默认使用所有群集提供的 HVAC.csv 数据示例。The application uses the sample HVAC.csv data that is available on all clusters by default. 该文件位于 \HdiSamples\HdiSamples\SensorSampleData\hvacThe file is located at \HdiSamples\HdiSamples\SensorSampleData\hvac. 数据显示了安装有 HVAC 系统的一些建筑物的目标温度和实际温度。The data shows the target temperature and the actual temperature of some buildings that have HVAC systems installed. System 列代表系统 ID,SystemAge 列代表建筑物安装 HVAC 系统的年数。The System column represents the system ID and the SystemAge column represents the number of years the HVAC system has been in place at the building. 在指定系统 ID 和系统年数的情况下,可使用这些数据来预测建筑物的温度比目标温度高还是低。Using the data, you can predict whether a building will be hotter or colder based on the target temperature, given a system ID, and system age.

用于 Spark 机器学习示例的数据的快照Snapshot of data used for Spark machine learning example

使用 Spark MLlib 开发 Spark 机器学习应用程序Develop a Spark machine learning application using Spark MLlib

在此应用程序中,将使用 Spark ML 管道来执行文档分类。In this application, you use a Spark ML pipeline to perform a document classification. ML 管道提供了一组统一的高级 API,它们构建在数据帧之上,可帮助用户创建和优化实际机器学习管道。ML Pipelines provide a uniform set of high-level APIs built on top of DataFrames that help users create and tune practical machine learning pipelines. 在管道中,可将文档分割成单字、将单字转换成数字特征向量,最后使用特征向量和标签创建预测模型。In the pipeline, you split the document into words, convert the words into a numerical feature vector, and finally build a prediction model using the feature vectors and labels. 执行下列步骤创建应用程序。Perform the following steps to create the application.

  1. 使用 PySpark 内核创建 Jupyter Notebook。Create a Jupyter notebook using the PySpark kernel. 有关说明,请参阅创建 Jupyter NotebookFor the instructions, see Create a Jupyter notebook.

  2. 导入此方案所需的类型。Import the types required for this scenario. 将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTERPaste the following snippet in an empty cell, and then press SHIFT + ENTER.

    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row
    
    import os
    import sys
    from pyspark.sql.types import *
    
    from pyspark.mllib.classification import LogisticRegressionWithSGD
    from pyspark.mllib.regression import LabeledPoint
    from numpy import array
    
  3. 加载数据 (hvac.csv),分析数据,并使用它来训练模型。Load the data (hvac.csv), parse it, and use it to train the model.

    # Define a type called LabelDocument
    LabeledDocument = Row("BuildingID", "SystemInfo", "label")
    
    # Define a function that parses the raw CSV file and returns an object of type LabeledDocument
    def parseDocument(line):
        values = [str(x) for x in line.split(',')]
        if (values[3] > values[2]):
            hot = 1.0
        else:
            hot = 0.0        
    
        textValue = str(values[4]) + " " + str(values[5])
    
        return LabeledDocument((values[6]), textValue, hot)
    
    # Load the raw HVAC.csv file, parse it using the function
    data = sc.textFile("/HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
    
    documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
    training = documents.toDF()
    

    在此代码片段中,定义用于比较实际温度与目标温度的函数。In the code snippet, you define a function that compares the actual temperature with the target temperature. 如果实际温度较高,则表示建筑物处于高温状态,用值 1.0表示。If the actual temperature is greater, the building is hot, denoted by the value 1.0. 否则建筑物处于低温状态,用值 0.0 表示。Otherwise the building is cold, denoted by the value 0.0.

  4. 设置包括三个阶段的 Spark 机器学习管道:tokenizer、hashingTF 和 lr。Configure the Spark machine learning pipeline that consists of three stages: tokenizer, hashingTF, and lr.

    tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
    hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
    lr = LogisticRegression(maxIter=10, regParam=0.01)
    pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
    

    有关管道及其工作原理的详细信息,请参阅 Apache Spark 机器学习管道For more information about pipeline and how it works, see Apache Spark machine learning pipeline.

  5. 将管道拟合到培训文档中。Fit the pipeline to the training document.

    model = pipeline.fit(training)
    
  6. 验证训练文档以根据应用程序进度创建检查点。Verify the training document to checkpoint your progress with the application.

    training.show()
    

    输出类似于:The output is similar to:

    +----------+----------+-----+
    |BuildingID|SystemInfo|label|
    +----------+----------+-----+
    |         4|     13 20|  0.0|
    |        17|      3 20|  0.0|
    |        18|     17 20|  1.0|
    |        15|      2 23|  0.0|
    |         3|      16 9|  1.0|
    |         4|     13 28|  0.0|
    |         2|     12 24|  0.0|
    |        16|     20 26|  1.0|
    |         9|      16 9|  1.0|
    |        12|       6 5|  0.0|
    |        15|     10 17|  1.0|
    |         7|      2 11|  0.0|
    |        15|      14 2|  1.0|
    |         6|       3 2|  0.0|
    |        20|     19 22|  0.0|
    |         8|     19 11|  0.0|
    |         6|      15 7|  0.0|
    |        13|      12 5|  0.0|
    |         4|      8 22|  0.0|
    |         7|      17 5|  0.0|
    +----------+----------+-----+
    

    将输出与原始 CSV 文件进行比较。Comparing the output against the raw CSV file. 例如,CSV 文件中第一行包含此数据:For example, the first row the CSV file has this data:

    Spark 机器学习示例的输出数据快照Output data snapshot for Spark machine learning example

    请注意,实际温度比目标温度低的情况表示建筑物处于低温状态。Notice how the actual temperature is less than the target temperature suggesting the building is cold. 因此在训练输出中,第一行中的 label 值为 0.0,表示建筑物并非处于高温状态。Hence in the training output, the value for label in the first row is 0.0, which means the building is not hot.

  7. 准备要对其运行训练模型的数据集。Prepare a data set to run the trained model against. 为此,将传递系统 ID 和系统年数(以训练输出中的 SystemInfo 表示),模型将预测具有该系统 ID 和系统年数的建筑物的温度是较高(以 1.0 表示)还是较低(以 0.0 表示)。To do so, you pass on a system ID and system age (denoted as SystemInfo in the training output), and the model predicts whether the building with that system ID and system age will be hotter (denoted by 1.0) or cooler (denoted by 0.0).

    # SystemInfo here is a combination of system ID followed by system age
    Document = Row("id", "SystemInfo")
    test = sc.parallelize([(1L, "20 25"),
                    (2L, "4 15"),
                    (3L, "16 9"),
                    (4L, "9 22"),
                    (5L, "17 10"),
                    (6L, "7 22")]) \
        .map(lambda x: Document(*x)).toDF() 
    
  8. 最后,对测试数据进行预测。Finally, make predictions on the test data.

    # Make predictions on test documents and print columns of interest
    prediction = model.transform(test)
    selected = prediction.select("SystemInfo", "prediction", "probability")
    for row in selected.collect():
        print row
    

    输出类似于:The output is similar to:

    Row(SystemInfo=u'20 25', prediction=1.0, probability=DenseVector([0.4999, 0.5001]))
    Row(SystemInfo=u'4 15', prediction=0.0, probability=DenseVector([0.5016, 0.4984]))
    Row(SystemInfo=u'16 9', prediction=1.0, probability=DenseVector([0.4785, 0.5215]))
    Row(SystemInfo=u'9 22', prediction=1.0, probability=DenseVector([0.4549, 0.5451]))
    Row(SystemInfo=u'17 10', prediction=1.0, probability=DenseVector([0.4925, 0.5075]))
    Row(SystemInfo=u'7 22', prediction=0.0, probability=DenseVector([0.5015, 0.4985]))
    

    从预测中的第一行可以看出,对于 ID 为 20 且系统年数为 25 的 HVAC 系统,建筑物处于高温状态 (prediction=1.0)。From the first row in the prediction, you can see that for an HVAC system with ID 20 and system age of 25 years, the building is hot (prediction=1.0). DenseVector (0.49999) 的第一个值对应于预测 0.0,第二个值 (0.5001) 对应于预测 1.0。The first value for DenseVector (0.49999) corresponds to the prediction 0.0 and the second value (0.5001) corresponds to the prediction 1.0. 在输出中,即使第二个值仅稍微偏高,模型仍显示 prediction=1.0In the output, even though the second value is only marginally higher, the model shows prediction=1.0.

  9. 关闭 Notebook 以释放资源。Shut down the notebook to release the resources. 为此,请在 Notebook 的“文件”菜单中选择“关闭并停止” 。To do so, from the File menu on the notebook, select Close and Halt. 此操作会关闭 Notebook。This action shuts down and closes the notebook.

将 Anaconda scikit-learn 库用于 Spark 机器学习Use Anaconda scikit-learn library for Spark machine learning

HDInsight 中的 Apache Spark 群集包含 Anaconda 库。Apache Spark clusters in HDInsight include Anaconda libraries. 它还包括适用于机器学习的 scikit-learn 库 。It also includes the scikit-learn library for machine learning. 该库还包含可用于直接从 Jupyter notebook 生成示例应用程序的各种数据集。The library also includes various data sets that you can use to build sample applications directly from a Jupyter notebook. 有关 scikit-learn 库的用法示例,请参阅 https://scikit-learn.org/stable/auto_examples/index.htmlFor examples on using the scikit-learn library, see https://scikit-learn.org/stable/auto_examples/index.html.

清理资源Clean up resources

如果不打算继续使用此应用程序,请使用以下步骤删除创建的群集:If you're not going to continue to use this application, delete the cluster that you created with the following steps:

  1. 登录到 Azure 门户Sign in to the Azure portal.

  2. 在顶部的“搜索”框中,键入 HDInsightIn the Search box at the top, type HDInsight.

  3. 选择“服务”下的“HDInsight 群集” 。Select HDInsight clusters under Services.

  4. 在显示的 HDInsight 群集列表中,选择为本教程创建的群集旁边的“...”。 In the list of HDInsight clusters that appears, select the ... next to the cluster that you created for this tutorial.

  5. 选择“删除” 。Select Delete. 请选择“是”。 Select Yes.

删除 HDInsight 群集Delete an HDInsight cluster

后续步骤Next steps

本教程介绍如何使用 Jupyter Notebook 生成适用于 Azure HDInsight 的 Apache Spark 机器学习应用程序。In this tutorial, you learned how to use the Jupyter Notebook to build an Apache Spark machine learning application for Azure HDInsight. 继续学习下一教程,了解如何将 IntelliJ IDEA 用于 Spark 作业。Advance to the next tutorial to learn how to use IntelliJ IDEA for Spark jobs.