创建 Spark 机器学习管道Create a Spark machine learning pipeline
Apache Spark 的可缩放机器学习库 (MLlib) 向分布式环境引入了建模功能。Apache Spark's scalable machine learning library (MLlib) brings modeling capabilities to a distributed environment. Spark 包 spark.ml
是一套基于数据帧的高级 API。The Spark package spark.ml
is a set of high-level APIs built on DataFrames. 借助这些 API,可创建和调整实际的机器学习管道。These APIs help you create and tune practical machine-learning pipelines. Spark 机器学习引用此基于 MLlib 数据帧的 API,而不是旧的基于 RDD 的管道 API。Spark machine learning refers to this MLlib DataFrame-based API, not the older RDD-based pipeline API.
机器学习 (ML) 管道是结合了多种机器学习算法的完整工作流。A machine learning (ML) pipeline is a complete workflow combining multiple machine learning algorithms together. 处理和了解数据需要许多步骤,需要一系列算法。There can be many steps required to process and learn from data, requiring a sequence of algorithms. 管道定义机器学习过程的阶段和顺序。Pipelines define the stages and ordering of a machine learning process. 在 MLlib 中,管道的阶段由特定的 PipelineStages 序列表示,其中转换器和估算器各自执行任务。In MLlib, stages of a pipeline are represented by a specific sequence of PipelineStages, where a Transformer and an Estimator each perform tasks.
转换器是一种算法,它通过使用 transform()
方法将一个数据帧转换为另一个数据帧。A Transformer is an algorithm that transforms one DataFrame to another by using the transform()
method. 例如,功能转换器可读取数据帧的一个列,将其映射到另一个列,并输出追加有映射列的新数据帧。For example, a feature transformer could read one column of a DataFrame, map it to another column, and output a new DataFrame with the mapped column appended to it.
估算器是一种抽象的学习算法,负责调整或培训数据集,以生成转换器。An Estimator is an abstraction of learning algorithms, and is responsible for fitting or training on a dataset to produce a Transformer. 估算器实现一种名为 fit()
的方法,该方法接受数据帧并生成数据帧,即为转换器。An Estimator implements a method named fit()
, which accepts a DataFrame and produces a DataFrame, which is a Transformer.
转换器或估算器的每个无状态实例都具有其自己的、指定参数时使用的唯一标识符。Each stateless instance of a Transformer or an Estimator has its own unique identifier, which is used when specifying parameters. 两者都使用统一的 API 来指定这些参数。Both use a uniform API for specifying these parameters.
管道示例Pipeline example
为了演示 ML 管道的实际用途,此示例使用预加载在 HDInsight 群集默认存储(Azure 存储或 Data Lake Store)上的示例 HVAC.csv
数据文件。To demonstrate a practical use of an ML pipeline, this example uses the sample HVAC.csv
data file that comes pre-loaded on the default storage for your HDInsight cluster, either Azure Storage or Data Lake Store. 若要查看文件的内容,导航到 /HdiSamples/HdiSamples/SensorSampleData/hvac
目录。To view the contents of the file, navigate to the /HdiSamples/HdiSamples/SensorSampleData/hvac
directory. HVAC.csv
包含一组时间和各种建筑物中 HVAC(供暖、通风和空调)系统的目标温度和实际温度。HVAC.csv
contains a set of times with both target and actual temperatures for HVAC (heating, ventilation, and air conditioning) systems in various buildings. 其目标是使用数据来训练模型,并生成给定建筑物的预测温度。The goal is to train the model on the data, and produce a forecast temperature for a given building.
以下代码:The following code:
- 定义
LabeledDocument
,用于存储BuildingID
、SystemInfo
(系统的标识符和年龄),以及label
(如果建筑物温度过高,则为 1.0,否则为 0.0)。Defines aLabeledDocument
, which stores theBuildingID
,SystemInfo
(a system's identifier and age), and alabel
(1.0 if the building is too hot, 0.0 otherwise). - 创建自定义分析器函数
parseDocument
,该函数接收一行数据,并通过比较目标温度与实际温度确定建筑物是否为“高温”。Creates a custom parser functionparseDocument
that takes a line (row) of data and determines whether the building is "hot" by comparing the target temperature to the actual temperature. - 提取源数据时应用分析器。Applies the parser when extracting the source data.
- 创建培训数据。Creates training data.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID
LabeledDocument = Row("BuildingID", "SystemInfo", "label")
# Define a function that parses the raw CSV file and returns an object of type LabeledDocument
def parseDocument(line):
values = [str(x) for x in line.split(',')]
if (values[3] > values[2]):
hot = 1.0
else:
hot = 0.0
textValue = str(values[4]) + " " + str(values[5])
return LabeledDocument((values[6]), textValue, hot)
# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")
documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()
此示例管道包含三个阶段:Tokenizer
、HashingTF
(两者均为转换器)和 Logistic Regression
(估算器)。This example pipeline has three stages: Tokenizer
and HashingTF
(both Transformers), and Logistic Regression
(an Estimator). training
数据帧中提取和分析的数据在调用 pipeline.fit(training)
时将流经管道。The extracted and parsed data in the training
DataFrame flows through the pipeline when pipeline.fit(training)
is called.
- 第一个阶段
Tokenizer
将SystemInfo
输入列(由系统标识符和年龄值组成)拆分为words
输出列。The first stage,Tokenizer
, splits theSystemInfo
input column (consisting of the system identifier and age values) into awords
output column. 此新words
列被添加到数据帧。This newwords
column is added to the DataFrame. - 第二个阶段
HashingTF
将新words
列转换为功能矢量。The second stage,HashingTF
, converts the newwords
column into feature vectors. 此新features
列被添加到数据帧。This newfeatures
column is added to the DataFrame. 这前两个阶段为转换器。These first two stages are Transformers. - 第三个阶段
LogisticRegression
为估算器,因此,管道调用LogisticRegression.fit()
方法以生成LogisticRegressionModel
。The third stage,LogisticRegression
, is an Estimator, and so the pipeline calls theLogisticRegression.fit()
method to produce aLogisticRegressionModel
.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training)
若要查看由 Tokenizer
和 HashingTF
转换器添加的新的 words
和 features
列以及 LogisticRegression
估算器示例,在原始数据帧上运行 PipelineModel.transform()
方法。To see the new words
and features
columns added by the Tokenizer
and HashingTF
transformers, and a sample of the LogisticRegression
estimator, run a PipelineModel.transform()
method on the original DataFrame. 在生产代码中,下一步为传入测试数据帧,以验证培训。In production code, the next step would be to pass in a test DataFrame to validate the training.
peek = model.transform(training)
peek.show()
# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label| words| features| rawPrediction| probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
| 4| 13 20| 0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...| 0.0|
| 17| 3 20| 0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...| 0.0|
| 18| 17 20| 1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...| 0.0|
| 15| 2 23| 0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...| 1.0|
| 3| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 4| 13 28| 0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...| 0.0|
| 2| 12 24| 0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...| 1.0|
| 16| 20 26| 1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...| 0.0|
| 9| 16 9| 1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...| 1.0|
| 12| 6 5| 0.0| [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...| 0.0|
| 15| 10 17| 1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...| 1.0|
| 7| 2 11| 0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...| 0.0|
| 15| 14 2| 1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...| 0.0|
| 6| 3 2| 0.0| [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...| 0.0|
| 20| 19 22| 0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...| 1.0|
| 8| 19 11| 0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...| 0.0|
| 6| 15 7| 0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...| 0.0|
| 13| 12 5| 0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...| 1.0|
| 4| 8 22| 0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...| 1.0|
| 7| 17 5| 0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...| 1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
only showing top 20 rows
现在可以使用 model
对象来进行预测。The model
object can now be used to make predictions. 有关此机器学习应用程序的完整示例以及运行此应用程序的分步说明,请参阅在 Azure HDInsight 上生成 Apache Spark 机器学习应用程序。For the full sample of this machine learning application, and step-by-step instructions for running it, see Build Apache Spark machine learning applications on Azure HDInsight.