在 Azure HDInsight 上生成 Apache Spark 机器学习应用程序

了解如何使用 HDInsight 中的 Spark 群集生成 Apache Spark 机器学习应用程序。 本文说明如何使用群集随附的 Jupyter notebook 来生成和测试此应用程序。 应用程序默认使用所有群集提供的 HVAC.csv 数据示例。

先决条件:

必须满足以下条件:

了解数据集

在开始生成应用程序之前,我们先来了解要为其生成应用程序的数据的结构,以及要对数据执行的分析类型。

在本文中,使用与 HDInsight 群集关联的 Azure 存储帐户中提供的 HVAC.csv 数据文件示例。 该文件位于存储帐户中的 \HdiSamples\HdiSamples\SensorSampleData\hvac 位置。 下载并打开 CSV 文件,以获取数据的快照。

用于 Spark 机器学习示例的数据的快照

该数据显示安装 HVAC 系统的建筑物的目标温度和实际温度。 我们假设 System 列代表系统 ID,SystemAge 列代表建筑物安装 HVAC 系统的年数。

如果指定系统 ID 和系统年数,可以使用此数据预测建筑物的温度比目标温度高还是低。

使用 Spark MLlib 编写 Spark 机器学习应用程序

在此应用程序中,使用 Spark ML 管道执行文档分类。 在管道中,将文档分割成单字,将单字转换成数字特征向量,并最后使用特征向量和标签创建预测模型。 执行下列步骤创建应用程序。

  1. Azure 门户上的启动板中,单击 Spark 群集的磁贴(如果已将它固定到启动板)。 也可以单击“全部浏览” > “HDInsight 群集”导航到群集。
  2. 在 Spark 群集边栏选项卡中单击“群集仪表板”,然后单击“Jupyter Notebook”。 出现提示时,请输入群集的管理员凭据。

    Note

    也可以在浏览器中打开以下 URL 访问群集的 Jupyter 笔记本。 将 CLUSTERNAME 替换为群集的名称:

    https://CLUSTERNAME.azurehdinsight.cn/jupyter

  3. 创建新的笔记本。 单击“新建”,然后单击“PySpark”。

    创建用于 Spark 机器学习示例的 Jupyter notebook

  4. 随即创建新笔记本,并以 Untitled.pynb 名称打开。 在顶部单击笔记本名称,并输入一个友好名称。

    提供用于 Spark 机器学习示例的笔记本名称

  5. 使用笔记本是使用 PySpark 内核创建的,因此不需要显式创建任何上下文。 运行第一个代码单元格时,系统自动创建 Spark 和 Hive 上下文。 首先,可以导入此方案所需的类型。 将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTER

     from pyspark.ml import Pipeline
     from pyspark.ml.classification import LogisticRegression
     from pyspark.ml.feature import HashingTF, Tokenizer
     from pyspark.sql import Row
    
     import os
     import sys
     from pyspark.sql.types import *
    
     from pyspark.mllib.classification import LogisticRegressionWithSGD
     from pyspark.mllib.regression import LabeledPoint
     from numpy import array
    
  6. 现在必须加载数据 (hvac.csv),分析数据,并使用它来训练模型。 为此,需要定义检查建筑物实际温度是否高于目标温度的函数。 如果实际温度较高,则表示建筑物处于高温状态,用值 1.0表示。 如果实际温度较低,则表示建筑物处于低温状态,用值 0.0表示。

    将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTER

     # List the structure of data for better understanding. Because the data will be
     # loaded as an array, this structure makes it easy to understand what each element
     # in the array corresponds to
    
     # 0 Date
     # 1 Time
     # 2 TargetTemp
     # 3 ActualTemp
     # 4 System
     # 5 SystemAge
     # 6 BuildingID
    
     LabeledDocument = Row("BuildingID", "SystemInfo", "label")
    
     # Define a function that parses the raw CSV file and returns an object of type LabeledDocument
    
     def parseDocument(line):
         values = [str(x) for x in line.split(',')]
         if (values[3] > values[2]):
             hot = 1.0
         else:
             hot = 0.0        
    
         textValue = str(values[4]) + " " + str(values[5])
    
         return LabeledDocument((values[6]), textValue, hot)
    
     # Load the raw HVAC.csv file, parse it using the function
     data = sc.textFile("wasb:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
    
     documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
     training = documents.toDF()
    
  7. 设置包括三个阶段的 Spark 机器学习管道:tokenizer、hashingTF 和 lr。 有关管道介绍及其工作原理的详细信息,请参阅 Spark 机器学习管道

    将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTER

     tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
     hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
     lr = LogisticRegression(maxIter=10, regParam=0.01)
     pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
    
  8. 将管道拟合到培训文档中。 将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTER

     model = pipeline.fit(training)
    
  9. 验证训练文档以根据应用程序进度创建检查点。 将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTER

     training.show()
    

    输出应如下所示:

     +----------+----------+-----+
     |BuildingID|SystemInfo|label|
     +----------+----------+-----+
     |         4|     13 20|  0.0|
     |        17|      3 20|  0.0|
     |        18|     17 20|  1.0|
     |        15|      2 23|  0.0|
     |         3|      16 9|  1.0|
     |         4|     13 28|  0.0|
     |         2|     12 24|  0.0|
     |        16|     20 26|  1.0|
     |         9|      16 9|  1.0|
     |        12|       6 5|  0.0|
     |        15|     10 17|  1.0|
     |         7|      2 11|  0.0|
     |        15|      14 2|  1.0|
     |         6|       3 2|  0.0|
     |        20|     19 22|  0.0|
     |         8|     19 11|  0.0|
     |         6|      15 7|  0.0|
     |        13|      12 5|  0.0|
     |         4|      8 22|  0.0|
     |         7|      17 5|  0.0|
     +----------+----------+-----+
    

    返回并根据原始 CSV 文件验证输出。 例如,CSV 文件中第一行包含此数据:

    Spark 机器学习示例的输出数据快照

    请注意,实际温度比目标温度低表示建筑物处于低温状态。 因此在训练输出中,第一行中的 label 值为 0.0,表示建筑物并非处于高温状态。

  10. 准备要对其运行训练模型的数据集。 为此,我们传递了系统 ID 和系统年数(以训练输出中的 SystemInfo 表示),模型将预测具有该系统 ID 和系统年数的建筑物的温度是较高(以 1.0 表示)还是较低(以 0.0 表示)。

    将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTER

    # SystemInfo here is a combination of system ID followed by system age
    Document = Row("id", "SystemInfo")
    test = sc.parallelize([(1L, "20 25"),
                  (2L, "4 15"),
                  (3L, "16 9"),
                  (4L, "9 22"),
                  (5L, "17 10"),
                  (6L, "7 22")]) \
        .map(lambda x: Document(*x)).toDF() 
    
  11. 最后,对测试数据进行预测。 将以下代码段粘贴到空白单元格中,并按 SHIFT + ENTER

     # Make predictions on test documents and print columns of interest
     prediction = model.transform(test)
     selected = prediction.select("SystemInfo", "prediction", "probability")
     for row in selected.collect():
         print row
    
  12. 应该会看到与下面类似的输出:

    Row(SystemInfo=u'20 25', prediction=1.0, probability=DenseVector([0.4999, 0.5001]))
    Row(SystemInfo=u'4 15', prediction=0.0, probability=DenseVector([0.5016, 0.4984]))
    Row(SystemInfo=u'16 9', prediction=1.0, probability=DenseVector([0.4785, 0.5215]))
    Row(SystemInfo=u'9 22', prediction=1.0, probability=DenseVector([0.4549, 0.5451]))
    Row(SystemInfo=u'17 10', prediction=1.0, probability=DenseVector([0.4925, 0.5075]))
    Row(SystemInfo=u'7 22', prediction=0.0, probability=DenseVector([0.5015, 0.4985]))
    

    从预测中的第一行可以看出,对于 ID 为 20 且系统年数为 25 的 HVAC 系统,建筑物处于高温状态 (prediction=1.0)。 DenseVector (0.49999) 的第一个值对应于预测 0.0,第二个值 (0.5001) 对应于预测 1.0。 在输出中,即使第二个值仅稍微偏高,模型仍显示 prediction=1.0

  13. 完成运行应用程序之后,应该要关闭笔记本以释放资源。 为此,请在 Notebook 的“文件”菜单中,单击“关闭并停止”。 这将会关闭 notebook。

将 Anaconda scikit-learn 库用于 Spark 机器学习

HDInsight 上的 Apache Spark 群集包含 Anaconda 库, 还包含适用于机器学习的 scikit-learn 库。 该库还包含可用于直接从 Jupyter notebook 生成示例应用程序的各种数据集。 有关使用 scikit-learn 库的示例,请参阅 http://scikit-learn.org/stable/auto_examples/index.html

另请参阅

方案

创建和运行应用程序

工具和扩展

管理资源