创建 Spark 机器学习管道Create a Spark machine learning pipeline

Apache Spark 的可缩放机器学习库 (MLlib) 向分布式环境引入了建模功能。Apache Spark's scalable machine learning library (MLlib) brings modeling capabilities to a distributed environment. Spark 包 spark.ml 是一套基于数据帧的高级 API。The Spark package spark.ml is a set of high-level APIs built on DataFrames. 借助这些 API,可创建和调整实际的机器学习管道。These APIs help you create and tune practical machine-learning pipelines. Spark 机器学习引用此基于 MLlib 数据帧的 API,而不是旧的基于 RDD 的管道 API。Spark machine learning refers to this MLlib DataFrame-based API, not the older RDD-based pipeline API.

机器学习 (ML) 管道是结合了多种机器学习算法的完整工作流。A machine learning (ML) pipeline is a complete workflow combining multiple machine learning algorithms together. 处理和了解数据需要许多步骤,需要一系列算法。There can be many steps required to process and learn from data, requiring a sequence of algorithms. 管道定义机器学习过程的阶段和顺序。Pipelines define the stages and ordering of a machine learning process. 在 MLlib 中,管道的阶段由特定的 PipelineStages 序列表示,其中转换器和估算器各自执行任务。In MLlib, stages of a pipeline are represented by a specific sequence of PipelineStages, where a Transformer and an Estimator each perform tasks.

转换器是一种算法,它通过使用 transform() 方法将一个数据帧转换为另一个数据帧。A Transformer is an algorithm that transforms one DataFrame to another by using the transform() method. 例如,功能转换器可读取数据帧的一个列,将其映射到另一个列,并输出追加有映射列的新数据帧。For example, a feature transformer could read one column of a DataFrame, map it to another column, and output a new DataFrame with the mapped column appended to it.

估算器是一种抽象的学习算法,负责调整或培训数据集,以生成转换器。An Estimator is an abstraction of learning algorithms, and is responsible for fitting or training on a dataset to produce a Transformer. 估算器实现一种名为 fit() 的方法,该方法接受数据帧并生成数据帧,即为转换器。An Estimator implements a method named fit(), which accepts a DataFrame and produces a DataFrame, which is a Transformer.

转换器或估算器的每个无状态实例都具有其自己的、指定参数时使用的唯一标识符。Each stateless instance of a Transformer or an Estimator has its own unique identifier, which is used when specifying parameters. 两者都使用统一的 API 来指定这些参数。Both use a uniform API for specifying these parameters.

管道示例Pipeline example

为了演示 ML 管道的实际用途,此示例使用预加载在 HDInsight 群集默认存储(Azure 存储或 Data Lake Store)上的示例 HVAC.csv 数据文件。To demonstrate a practical use of an ML pipeline, this example uses the sample HVAC.csv data file that comes pre-loaded on the default storage for your HDInsight cluster, either Azure Storage or Data Lake Store. 若要查看文件的内容,导航到 /HdiSamples/HdiSamples/SensorSampleData/hvac 目录。To view the contents of the file, navigate to the /HdiSamples/HdiSamples/SensorSampleData/hvac directory. HVAC.csv 包含一组时间和各种建筑物中 HVAC(供暖、通风和空调)系统的目标温度和实际温度。HVAC.csv contains a set of times with both target and actual temperatures for HVAC (heating, ventilation, and air conditioning) systems in various buildings. 其目标是使用数据来训练模型,并生成给定建筑物的预测温度。The goal is to train the model on the data, and produce a forecast temperature for a given building.

以下代码:The following code:

  1. 定义 LabeledDocument,用于存储 BuildingIDSystemInfo(系统的标识符和年龄),以及 label(如果建筑物温度过高,则为 1.0,否则为 0.0)。Defines a LabeledDocument, which stores the BuildingID, SystemInfo (a system's identifier and age), and a label (1.0 if the building is too hot, 0.0 otherwise).
  2. 创建自定义分析器函数 parseDocument,该函数接收一行数据,并通过比较目标温度与实际温度确定建筑物是否为“高温”。Creates a custom parser function parseDocument that takes a line (row) of data and determines whether the building is "hot" by comparing the target temperature to the actual temperature.
  3. 提取源数据时应用分析器。Applies the parser when extracting the source data.
  4. 创建培训数据。Creates training data.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0        

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)

# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

此示例管道包含三个阶段:TokenizerHashingTF(两者均为转换器)和 Logistic Regression(估算器)。This example pipeline has three stages: Tokenizer and HashingTF (both Transformers), and Logistic Regression (an Estimator). training 数据帧中提取和分析的数据在调用 pipeline.fit(training) 时将流经管道。The extracted and parsed data in the training DataFrame flows through the pipeline when pipeline.fit(training) is called.

  1. 第一个阶段 TokenizerSystemInfo 输入列(由系统标识符和年龄值组成)拆分为 words 输出列。The first stage, Tokenizer, splits the SystemInfo input column (consisting of the system identifier and age values) into a words output column. 此新 words 列被添加到数据帧。This new words column is added to the DataFrame.
  2. 第二个阶段 HashingTF 将新 words 列转换为功能矢量。The second stage, HashingTF, converts the new words column into feature vectors. 此新 features 列被添加到数据帧。This new features column is added to the DataFrame. 这前两个阶段为转换器。These first two stages are Transformers.
  3. 第三个阶段 LogisticRegression 为估算器,因此,管道调用 LogisticRegression.fit() 方法以生成 LogisticRegressionModelThe third stage, LogisticRegression, is an Estimator, and so the pipeline calls the LogisticRegression.fit() method to produce a LogisticRegressionModel.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

若要查看由 TokenizerHashingTF 转换器添加的新的 wordsfeatures 列以及 LogisticRegression 估算器示例,在原始数据帧上运行 PipelineModel.transform() 方法。To see the new words and features columns added by the Tokenizer and HashingTF transformers, and a sample of the LogisticRegression estimator, run a PipelineModel.transform() method on the original DataFrame. 在生产代码中,下一步为传入测试数据帧,以验证培训。In production code, the next step would be to pass in a test DataFrame to validate the training.

peek = model.transform(training)
peek.show()

# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label|   words|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|         4|     13 20|  0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...|       0.0|
|        17|      3 20|  0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...|       0.0|
|        18|     17 20|  1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...|       0.0|
|        15|      2 23|  0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...|       1.0|
|         3|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|         4|     13 28|  0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...|       0.0|
|         2|     12 24|  0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...|       1.0|
|        16|     20 26|  1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...|       0.0|
|         9|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|        12|       6 5|  0.0|  [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...|       0.0|
|        15|     10 17|  1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...|       1.0|
|         7|      2 11|  0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...|       0.0|
|        15|      14 2|  1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...|       0.0|
|         6|       3 2|  0.0|  [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...|       0.0|
|        20|     19 22|  0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...|       1.0|
|         8|     19 11|  0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...|       0.0|
|         6|      15 7|  0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...|       0.0|
|        13|      12 5|  0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...|       1.0|
|         4|      8 22|  0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...|       1.0|
|         7|      17 5|  0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...|       1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+

only showing top 20 rows

现在可以使用 model 对象来进行预测。The model object can now be used to make predictions. 有关此机器学习应用程序的完整示例以及运行此应用程序的分步说明,请参阅在 Azure HDInsight 上生成 Apache Spark 机器学习应用程序For the full sample of this machine learning application, and step-by-step instructions for running it, see Build Apache Spark machine learning applications on Azure HDInsight.

另请参阅See also