操作 Spark 构建的机器学习模型Operationalize Spark-built machine learning models

本主题演示如何在 HDInsight Spark 群集上使用 Python 操作已保存的机器学习模型 (ML)。This topic shows how to operationalize a saved machine learning model (ML) using Python on HDInsight Spark clusters. 它介绍如何加载使用 Spark MLlib 生成并存储在 Azure Blob 存储 (WASB) 中的机器学习模型,以及如何使用同样存储在 WASB 中的数据集为它们评分。It describes how to load machine learning models that have been built using Spark MLlib and stored in Azure Blob Storage (WASB), and how to score them with datasets that have also been stored in WASB. 它介绍如何预处理输入数据、使用 MLlib 工具包中的索引和编码函数转换特征,以及如何创建可用作 ML 模型评分的输入的标签点数据对象。It shows how to pre-process the input data, transform features using the indexing and encoding functions in the MLlib toolkit, and how to create a labeled point data object that can be used as input for scoring with the ML models. 用于评分的模型包括线性回归、逻辑回归、随机林模型和梯度提升树模型。The models used for scoring include Linear Regression, Logistic Regression, Random Forest Models, and Gradient Boosting Tree Models.

Spark 群集和 Jupyter 笔记本Spark clusters and Jupyter notebooks

本演练提供了使用 HDInsight Spark 1.6 群集和 Spark 2.0 群集操作 ML 模型的设置步骤和代码。Setup steps and the code to operationalize an ML model are provided in this walkthrough for using an HDInsight Spark 1.6 cluster as well as a Spark 2.0 cluster. Jupyter 笔记本中也提供了有关这些过程的代码。The code for these procedures is also provided in Jupyter notebooks.

适用于 Spark 1.6 的笔记本Notebook for Spark 1.6

pySpark-machine-learning-data-science-spark-model-consumption.ipynb:Jupyter 笔记本演示如何在 HDInsight 群集上使用 Python 操作已保存的模型。The pySpark-machine-learning-data-science-spark-model-consumption.ipynb Jupyter notebook shows how to operationalize a saved model using Python on HDInsight clusters.

适用于 Spark 2.0 的笔记本Notebook for Spark 2.0

要修改适用于 Spark 1.6 的 Jupyter 笔记本以用于 HDInsight Spark 2.0 群集,请将 Python 代码文件替换为此文件To modify the Jupyter notebook for Spark 1.6 to use with an HDInsight Spark 2.0 cluster, replace the Python code file with this file. 此代码演示如何使用在 Spark 2.0 中创建的模型。This code shows how to consume the models created in Spark 2.0.

必备条件Prerequisites

  1. 需要一个 Azure 帐户和一个 Spark 1.6(或 Spark 2.0)HDInsight 群集来完成本演练。You need an Azure account and a Spark 1.6 (or Spark 2.0) HDInsight cluster to complete this walkthrough. 有关如何满足这些要求的说明,请参阅在 Azure HDInsight 上使用 Spark 的数据科学的概述See the Overview of Data Science using Spark on Azure HDInsight for instructions on how to satisfy these requirements. 该主题还包含此处使用的 NYC 2013 出租车数据的说明以及有关如何在 Spark 群集上执行来自 Jupyter 笔记本的代码的说明。That topic also contains a description of the NYC 2013 Taxi data used here and instructions on how to execute code from a Jupyter notebook on the Spark cluster.
  2. 在此处通过演练针对 Spark 1.6 群集或 Spark 2.0 笔记本的使用 Spark 进行数据探索和建模主题,来创建要评分的机器学习模型。Create the machine learning models to be scored here by working through the Data exploration and modeling with Spark topic for the Spark 1.6 cluster or the Spark 2.0 notebooks.
  3. Spark 2.0 笔记本将其他数据集用于分类任务(从 2011 年到 2012 年的已知航班准时出发数据集)。The Spark 2.0 notebooks use an additional data set for the classification task, the well-known Airline On-time departure dataset from 2011 and 2012. 包含这些笔记本的 GitHub 存储库的 Readme.md 中提供了这些笔记本的说明和链接。A description of the notebooks and links to them are provided in the Readme.md for the GitHub repository containing them. 而且,此处和位于链接笔记本中的代码是泛型代码,应适用于任何 Spark 群集。Moreover, the code here and in the linked notebooks is generic and should work on any Spark cluster. 如果不使用 HDInsight Spark,群集设置和管理步骤可能与此处所示内容稍有不同。If you are not using HDInsight Spark, the cluster setup and management steps may be slightly different from what is shown here.

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

设置:存储位置、库和预设 Spark 上下文Setup: storage locations, libraries, and the preset Spark context

Spark 能够读取和写入 Azure 存储 Blob (WASB)。Spark is able to read and write to an Azure Storage Blob (WASB). 因此,存储在该处的任何现有数据都可以使用 Spark 处理,并将结果再次存储在 WASB 中。So any of your existing data stored there can be processed using Spark and the results stored again in WASB.

若要在 WASB 中保存模型或文件,需要正确指定路径。To save models or files in WASB, the path needs to be specified properly. 可使用开头为“wasb///” 的路径,引用附加到 Spark 群集的默认容器。The default container attached to the Spark cluster can be referenced using a path beginning with: "wasb///". 以下代码示例指定要读取的数据的位置和模型输出要保存到的模型存储目录的路径。The following code sample specifies the location of the data to be read and the path for the model storage directory to which the model output is saved.

在 WASB 中为存储位置设置目录路径Set directory paths for storage locations in WASB

模型保存在:“wasb:///user/remoteuser/NYCTaxi/Models”。Models are saved in: "wasb:///user/remoteuser/NYCTaxi/Models". 如果未正确设置此路径,则不加载模型用于评分。If this path is not set properly, models are not loaded for scoring.

评分结果已保存在:“wasb:///user/remoteuser/NYCTaxi/ScoredResults”。The scored results have been saved in: "wasb:///user/remoteuser/NYCTaxi/ScoredResults". 如果文件夹路径不正确,则结果不保存在该文件夹中。If the path to folder is incorrect, results are not saved in that folder.

备注

可从 machine-learning-data-science-spark-data-exploration-modeling.ipynb 笔记本的最后一个单元格的输出中将文件路径位置复制并粘贴到此代码中的占位符中。The file path locations can be copied and pasted into the placeholders in this code from the output of the last cell of the machine-learning-data-science-spark-data-exploration-modeling.ipynb notebook.

下面是设置目录路径的代码:Here is the code to set directory paths:

    # LOCATION OF DATA TO BE SCORED (TEST DATA)
    taxi_test_file_loc = "wasb://mllibwalkthroughs@cdspsparksamples.blob.core.chinacloudapi.cn/Data/NYCTaxi/JoinedTaxiTripFare.Point1Pct.Test.tsv";

    # SET THE MODEL STORAGE DIRECTORY PATH 
    # NOTE THE LAST BACKSLASH IN THIS PATH IS NEEDED
    modelDir = "wasb:///user/remoteuser/NYCTaxi/Models/" 

    # SET SCORDED RESULT DIRECTORY PATH
    # NOTE THE LAST BACKSLASH IN THIS PATH IS NEEDED
    scoredResultDir = "wasb:///user/remoteuser/NYCTaxi/ScoredResults/"; 

    # FILE LOCATIONS FOR THE MODELS TO BE SCORED
    logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-04-1817_40_35.796789"
    linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-04-1817_44_00.993832"
    randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-04-1817_42_58.899412"
    randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-04-1817_44_27.204734"
    BoostedTreeClassificationFileLoc = modelDir + "GradientBoostingTreeClassification_2016-04-1817_43_16.354770"
    BoostedTreeRegressionFileLoc = modelDir + "GradientBoostingTreeRegression_2016-04-1817_44_46.206262"

    # RECORD START TIME
    import datetime
    datetime.datetime.now()

输出:OUTPUT:

datetime.datetime(2016, 4, 25, 23, 56, 19, 229403)datetime.datetime(2016, 4, 25, 23, 56, 19, 229403)

导入库Import libraries

设置 Spark 上下文并使用以下代码导入必要的库Set spark context and import necessary libraries with the following code

#IMPORT LIBRARIES
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import atexit
from numpy import array
import numpy as np
import datetime

预设 Spark 上下文和 PySpark magicPreset Spark context and PySpark magics

与 Jupyter 笔记本一起提供的 PySpark 内核具有预设上下文。The PySpark kernels that are provided with Jupyter notebooks have a preset context. 因此,在开始处理正在开发的应用程序前无需显式设置 Spark 或 Hive 上下文。Therefore, you do not need to set the Spark or Hive contexts explicitly before you start working with the application you are developing. 这些上下文默认可用:These contexts are available by default:

  • sc - 用于 Sparksc - for Spark
  • sqlContext - 用于 HivesqlContext - for Hive

PySpark 内核提供一些预定义的“magic”,这是可以结合 %% 调用的特殊命令。The PySpark kernel provides some predefined “magics”, which are special commands that you can call with %%. 在这些代码示例中使用的此类命令有两个。There are two such commands that are used in these code samples.

  • %%local 已指定后续行中的代码在本地执行。%%local Specified that the code in subsequent lines is executed locally. 代码必须是有效的 Python 代码。Code must be valid Python code.
  • %%sql -o <variable name>%%sql -o <variable name>
  • 针对 sqlContext 执行 Hive 查询。Executes a Hive query against the sqlContext. 如果传递了 -o 参数,则查询的结果以 Pandas 数据帧的形式保存在 %%local Python 上下文中。If the -o parameter is passed, the result of the query is persisted in the %%local Python context as a Pandas dataframe.

有关 Jupyter 笔记本内核和它们提供的预定义“magic”的详细信息,请参阅适用于装有 HDInsight 上的 HDInsight Spark Linux 群集的 Jupyter 笔记本的内核For more information on the kernels for Jupyter notebooks and the predefined "magics" that they provide, see Kernels available for Jupyter notebooks with HDInsight Spark Linux clusters on HDInsight.

引入数据并创建已清理的数据帧Ingest data and create a cleaned data frame

本部分包含引入要评分的数据所需的一系列任务的代码。This section contains the code for a series of tasks required to ingest the data to be scored. 读入出租车行程和车费文件(存储为 .tsv 文件)的已加入的 0.1% 样本,并创建干净的数据帧。Read in a joined 0.1% sample of the taxi trip and fare file (stored as a .tsv file), format the data, and then creates a clean data frame.

基于以下主题中提供的过程加入了出租车行程和车费文件:运行中的 Team Data Science Process:使用 HDInsight Hadoop 群集主题。The taxi trip and fare files were joined based on the procedure provided in the: The Team Data Science Process in action: using HDInsight Hadoop clusters topic.

# INGEST DATA AND CREATE A CLEANED DATA FRAME

# RECORD START TIME
timestart = datetime.datetime.now()

# IMPORT FILE FROM PUBLIC BLOB
taxi_test_file = sc.textFile(taxi_test_file_loc)

# GET SCHEMA OF THE FILE FROM HEADER
taxi_header = taxi_test_file.filter(lambda l: "medallion" in l)

# PARSE FIELDS AND CONVERT DATA TYPE FOR SOME FIELDS
taxi_temp = taxi_test_file.subtract(taxi_header).map(lambda k: k.split("\t"))\
        .map(lambda p: (p[0],p[1],p[2],p[3],p[4],p[5],p[6],int(p[7]),int(p[8]),int(p[9]),int(p[10]),
                        float(p[11]),float(p[12]),p[13],p[14],p[15],p[16],p[17],p[18],float(p[19]),
                        float(p[20]),float(p[21]),float(p[22]),float(p[23]),float(p[24]),int(p[25]),int(p[26])))

# GET SCHEMA OF THE FILE FROM HEADER
schema_string = taxi_test_file.first()
fields = [StructField(field_name, StringType(), True) for field_name in schema_string.split('\t')]
fields[7].dataType = IntegerType() #Pickup hour
fields[8].dataType = IntegerType() # Pickup week
fields[9].dataType = IntegerType() # Weekday
fields[10].dataType = IntegerType() # Passenger count
fields[11].dataType = FloatType() # Trip time in secs
fields[12].dataType = FloatType() # Trip distance
fields[19].dataType = FloatType() # Fare amount
fields[20].dataType = FloatType() # Surcharge
fields[21].dataType = FloatType() # Mta_tax
fields[22].dataType = FloatType() # Tip amount
fields[23].dataType = FloatType() # Tolls amount
fields[24].dataType = FloatType() # Total amount
fields[25].dataType = IntegerType() # Tipped or not
fields[26].dataType = IntegerType() # Tip class
taxi_schema = StructType(fields)

# CREATE DATA FRAME
taxi_df_test = sqlContext.createDataFrame(taxi_temp, taxi_schema)

# CREATE A CLEANED DATA-FRAME BY DROPPING SOME UN-NECESSARY COLUMNS & FILTERING FOR UNDESIRED VALUES OR OUTLIERS
taxi_df_test_cleaned = taxi_df_test.drop('medallion').drop('hack_license').drop('store_and_fwd_flag').drop('pickup_datetime')\
    .drop('dropoff_datetime').drop('pickup_longitude').drop('pickup_latitude').drop('dropoff_latitude')\
    .drop('dropoff_longitude').drop('tip_class').drop('total_amount').drop('tolls_amount').drop('mta_tax')\
    .drop('direct_distance').drop('surcharge')\
    .filter("passenger_count > 0 and passenger_count < 8 AND payment_type in ('CSH', 'CRD') AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 150 AND trip_distance > 0 AND trip_distance < 100 AND trip_time_in_secs > 30 AND trip_time_in_secs < 7200" )

# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
taxi_df_test_cleaned.cache()
taxi_df_test_cleaned.count()

# REGISTER DATA-FRAME AS A TEMP-TABLE IN SQL-CONTEXT
taxi_df_test_cleaned.registerTempTable("taxi_test")

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:46.37 秒Time taken to execute above cell: 46.37 seconds

为 Spark 中的评分准备数据Prepare data for scoring in Spark

本部分介绍如何为分类功能编制索引、编码和对其进行缩放,使它们准备好在分类和回归的 MLlib 监督式学习算法中使用。This section shows how to index, encode, and scale categorical features to prepare them for use in MLlib supervised learning algorithms for classification and regression.

特征转换:为分类特征编制索引并编码以输入到模型中进行评分Feature transformation: index and encode categorical features for input into models for scoring

本部分介绍如何使用 StringIndexer 为分类数据编制索引,并使用 OneHotEncoder 为特征编码以输入到模型中。This section shows how to index categorical data using a StringIndexer and encode features with OneHotEncoder input into the models.

StringIndexer 将标签的字符串列编码为标签索引列。The StringIndexer encodes a string column of labels to a column of label indices. 索引按标签频率排序。The indices are ordered by label frequencies.

OneHotEncoder 将标签索引列映射到二元向量列,该列最多只有单个值。The OneHotEncoder maps a column of label indices to a column of binary vectors, with at most a single one-value. 此编码允许将预期连续值特征的算法(如逻辑回归)应用到分类特征。This encoding allows algorithms that expect continuous valued features, such as logistic regression, to be applied to categorical features.

#INDEX AND ONE-HOT ENCODE CATEGORICAL FEATURES

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer

# CREATE FOUR BUCKETS FOR TRAFFIC TIMES
sqlStatement = """
    SELECT *,
    CASE
     WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night" 
     WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush" 
     WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
     WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
    END as TrafficTimeBins
    FROM taxi_test 
"""
taxi_df_test_with_newFeatures = sqlContext.sql(sqlStatement)

# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
taxi_df_test_with_newFeatures.cache()
taxi_df_test_with_newFeatures.count()

# INDEX AND ONE-HOT ENCODING
stringIndexer = StringIndexer(inputCol="vendor_id", outputCol="vendorIndex")
model = stringIndexer.fit(taxi_df_test_with_newFeatures) # Input data-frame is the cleaned one from above
indexed = model.transform(taxi_df_test_with_newFeatures)
encoder = OneHotEncoder(dropLast=False, inputCol="vendorIndex", outputCol="vendorVec")
encoded1 = encoder.transform(indexed)

# INDEX AND ENCODE RATE_CODE
stringIndexer = StringIndexer(inputCol="rate_code", outputCol="rateIndex")
model = stringIndexer.fit(encoded1)
indexed = model.transform(encoded1)
encoder = OneHotEncoder(dropLast=False, inputCol="rateIndex", outputCol="rateVec")
encoded2 = encoder.transform(indexed)

# INDEX AND ENCODE PAYMENT_TYPE
stringIndexer = StringIndexer(inputCol="payment_type", outputCol="paymentIndex")
model = stringIndexer.fit(encoded2)
indexed = model.transform(encoded2)
encoder = OneHotEncoder(dropLast=False, inputCol="paymentIndex", outputCol="paymentVec")
encoded3 = encoder.transform(indexed)

# INDEX AND ENCODE TRAFFIC TIME BINS
stringIndexer = StringIndexer(inputCol="TrafficTimeBins", outputCol="TrafficTimeBinsIndex")
model = stringIndexer.fit(encoded3)
indexed = model.transform(encoded3)
encoder = OneHotEncoder(dropLast=False, inputCol="TrafficTimeBinsIndex", outputCol="TrafficTimeBinsVec")
encodedFinal = encoder.transform(indexed)

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:5.37 秒Time taken to execute above cell: 5.37 seconds

使用特征数组创建 RDD 对象以输入到模型中Create RDD objects with feature arrays for input into models

本部分包含的代码显示如何将分类文本数据编制索引为标签点数据类型,并对其进行独热编码,以便它可用于训练和测试 MLlib 逻辑回归和基于树的模型。This section contains code that shows how to index categorical text data as an RDD object and one-hot encode it so it can be used to train and test MLlib logistic regression and tree-based models. 索引数据存储在弹性分布式数据集 (RDD) 对象中。The indexed data is stored in Resilient Distributed Dataset (RDD) objects. 这些 RDD 是 Spark 中的基本抽象。The RDDs are the basic abstraction in Spark. RDD 对象表示可与 Spark 并行处理的不可变、已分区的元素集合。An RDD object represents an immutable, partitioned collection of elements that can be operated on in parallel with Spark.

它还包含显示如何使用 MLlib 提供的 StandardScalar 缩放数据的代码,用于使用随机梯度下降 (SGD) 的线性回归,随机梯度下降是一种用于训练范围广泛的机器学习模型的流行算法。It also contains code that shows how to scale data with the StandardScalar provided by MLlib for use in linear regression with Stochastic Gradient Descent (SGD), a popular algorithm for training a wide range of machine learning models. StandardScaler 用于将特征缩放到单位方差。The StandardScaler is used to scale the features to unit variance. 特征缩放(也称为数据规范化)确保具有广泛分散的值的特征不在目标函数中得到过多权重。Feature scaling, also known as data normalization, insures that features with widely disbursed values are not given excessive weigh in the objective function.

# CREATE RDD OBJECTS WITH FEATURE ARRAYS FOR INPUT INTO MODELS

# RECORD START TIME
timestart = datetime.datetime.now()

# IMPORT LIBRARIES
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.util import MLUtils
from numpy import array

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingBinary(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex,
                         line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs, 
                         line.trip_distance, line.fare_amount])
    return  features

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO LOGISTIC REGRESSION MODELS
def parseRowOneHotBinary(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                                        line.vendorVec.toArray(), line.rateVec.toArray(), 
                                        line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
    return  features

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingRegression(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex, 
                         line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs, 
                         line.trip_distance, line.fare_amount])
    return  features

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO LINEAR REGRESSION MODELS
def parseRowOneHotRegression(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                                        line.vendorVec.toArray(), line.rateVec.toArray(), 
                                        line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
    return  features

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTESTbinary = encodedFinal.map(parseRowIndexingBinary)
oneHotTESTbinary = encodedFinal.map(parseRowOneHotBinary)

# FOR REGRESSION CLASSIFICATION TRAINING AND TESTING
indexedTESTreg = encodedFinal.map(parseRowIndexingRegression)
oneHotTESTreg = encodedFinal.map(parseRowOneHotRegression)

# SCALING FEATURES FOR LINEARREGRESSIONWITHSGD MODEL
scaler = StandardScaler(withMean=False, withStd=True).fit(oneHotTESTreg)
oneHotTESTregScaled = scaler.transform(oneHotTESTreg)

# CACHE RDDS IN MEMORY
indexedTESTbinary.cache();
oneHotTESTbinary.cache();
indexedTESTreg.cache();
oneHotTESTreg.cache();
oneHotTESTregScaled.cache();

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:11.72 秒Time taken to execute above cell: 11.72 seconds

使用逻辑回归模型评分并将输出保存到 blobScore with the Logistic Regression Model and save output to blob

本部分中的代码显示如何加载已存储在 Azure Blob 存储中的逻辑回归模型,并使用它预测是否在某个出租车行程中支付小费、使用标准分类指标为其评分,然后将结果保存并绘制到 Blob 存储。The code in this section shows how to load a Logistic Regression Model that has been saved in Azure blob storage and use it to predict whether or not a tip is paid on a taxi trip, score it with standard classification metrics, and then save and plot the results to blob storage. 评分结果存储在 RDD 对象中。The scored results are stored in RDD objects.

# SCORE AND EVALUATE LOGISTIC REGRESSION MODEL

# RECORD START TIME
timestart = datetime.datetime.now()

# IMPORT LIBRARIES
from pyspark.mllib.classification import LogisticRegressionModel

## LOAD SAVED MODEL
savedModel = LogisticRegressionModel.load(sc, logisticRegFileLoc)
predictions = oneHotTESTbinary.map(lambda features: (float(savedModel.predict(features))))

## SAVE SCORED RESULTS (RDD) TO BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
logisticregressionfilename = "LogisticRegressionWithLBFGS_" + datestamp + ".txt";
dirfilename = scoredResultDir + logisticregressionfilename;
predictions.saveAsTextFile(dirfilename)


# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

输出:OUTPUT:

执行以上单元格所花的时间:19.22 秒Time taken to execute above cell: 19.22 seconds

为线性回归模型评分Score a Linear Regression Model

我们已使用 LinearRegressionWithSGD 通过随机梯度下降 (SGD) 训练线性回归模型进行优化,以预测支付的小费金额。We used LinearRegressionWithSGD to train a linear regression model using Stochastic Gradient Descent (SGD) for optimization to predict the amount of tip paid.

本部分中的代码显示如何从 Azure Blob 存储加载线性回归模型、使用缩放变量评分,然后将结果保存回 blob。The code in this section shows how to load a Linear Regression Model from Azure blob storage, score using scaled variables, and then save the results back to the blob.

#SCORE LINEAR REGRESSION MODEL

# RECORD START TIME
timestart = datetime.datetime.now()

#LOAD LIBRARIES
from pyspark.mllib.regression import LinearRegressionWithSGD, LinearRegressionModel

# LOAD MODEL AND SCORE USING ** SCALED VARIABLES **
savedModel = LinearRegressionModel.load(sc, linearRegFileLoc)
predictions = oneHotTESTregScaled.map(lambda features: (float(savedModel.predict(features))))

# SAVE RESULTS
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
linearregressionfilename = "LinearRegressionWithSGD_" + datestamp;
dirfilename = scoredResultDir + linearregressionfilename;
predictions.saveAsTextFile(dirfilename)

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:16.63 秒Time taken to execute above cell: 16.63 seconds

为分类和回归随机林模型评分Score classification and regression Random Forest Models

本部分中的代码显示如何加载已保存在 Azure Blob 存储中的分类和回归随机林模型、使用标准分类器和回归测量为其性能评分,然后将结果保存回 Blob 存储。The code in this section shows how to load the saved classification and regression Random Forest Models saved in Azure blob storage, score their performance with standard classifier and regression measures, and then save the results back to blob storage.

随机林是决策树的整体。Random forests are ensembles of decision trees. 它们组合了许多决策树以降低过度拟合的风险。They combine many decision trees to reduce the risk of overfitting. 随机林可处理分类特征、扩展到多类分类设置、不需要功能缩放,并且能够捕获非线性和特征交互。Random forests can handle categorical features, extend to the multiclass classification setting, do not require feature scaling, and are able to capture non-linearities and feature interactions. 随机林是用于分类和回归的最成功的机器学习模型之一。Random forests are one of the most successful machine learning models for classification and regression.

spark.mllib 支持将随机林用于使用连续和分类特征的二元和多类分类以及回归。spark.mllib supports random forests for binary and multiclass classification and for regression, using both continuous and categorical features.

# SCORE RANDOM FOREST MODELS FOR CLASSIFICATION AND REGRESSION

# RECORD START TIME
timestart = datetime.datetime.now()

#IMPORT MLLIB LIBRARIES    
from pyspark.mllib.tree import RandomForest, RandomForestModel


# CLASSIFICATION: LOAD SAVED MODEL, SCORE AND SAVE RESULTS BACK TO BLOB
savedModel = RandomForestModel.load(sc, randomForestClassificationFileLoc)
predictions = savedModel.predict(indexedTESTbinary)

# SAVE RESULTS
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfclassificationfilename = "RandomForestClassification_" + datestamp + ".txt";
dirfilename = scoredResultDir + rfclassificationfilename;
predictions.saveAsTextFile(dirfilename)


# REGRESSION: LOAD SAVED MODEL, SCORE AND SAVE RESULTS BACK TO BLOB
savedModel = RandomForestModel.load(sc, randomForestRegFileLoc)
predictions = savedModel.predict(indexedTESTreg)

# SAVE RESULTS
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfregressionfilename = "RandomForestRegression_" + datestamp + ".txt";
dirfilename = scoredResultDir + rfregressionfilename;
predictions.saveAsTextFile(dirfilename)

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

输出:OUTPUT:

执行以上单元格所花的时间:31.07 秒Time taken to execute above cell: 31.07 seconds

为分类和回归梯度提升树模型评分Score classification and regression Gradient Boosting Tree Models

本部分中的代码显示如何从 Azure Blob 存储加载分类和回归梯度提升树模型、使用标准分类器和回归测量为其性能评分,然后将结果保存回 Blob 存储。The code in this section shows how to load classification and regression Gradient Boosting Tree Models from Azure blob storage, score their performance with standard classifier and regression measures, and then save the results back to blob storage.

spark.mllib 支持将 GBTS 用于使用连续和分类特征的二元分类和回归。spark.mllib supports GBTS for binary classification and for regression, using both continuous and categorical features.

梯度提升树 (GBTS) 是决策树的整体。Gradient Boosting Trees (GBTS) are ensembles of decision trees. GBTS 以迭代方式训练决策树以最大程度减少损失函数。GBTS train decision trees iteratively to minimize a loss function. GBTS 可处理分类特征、不需要特征缩放,并且能够捕获非线性和特征交互。GBTS can handle categorical features, do not require feature scaling, and are able to capture non-linearities and feature interactions. 此算法还可以在多类分类设置中使用。This algorithm can also be used in a multiclass-classification setting.

# SCORE GRADIENT BOOSTING TREE MODELS FOR CLASSIFICATION AND REGRESSION

# RECORD START TIME
timestart = datetime.datetime.now()

#IMPORT MLLIB LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel

# CLASSIFICATION: LOAD SAVED MODEL, SCORE AND SAVE RESULTS BACK TO BLOB

#LOAD AND SCORE THE MODEL
savedModel = GradientBoostedTreesModel.load(sc, BoostedTreeClassificationFileLoc)
predictions = savedModel.predict(indexedTESTbinary)

# SAVE RESULTS
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btclassificationfilename = "GradientBoostingTreeClassification_" + datestamp + ".txt";
dirfilename = scoredResultDir + btclassificationfilename;
predictions.saveAsTextFile(dirfilename)


# REGRESSION: LOAD SAVED MODEL, SCORE AND SAVE RESULTS BACK TO BLOB

# LOAD AND SCORE MODEL 
savedModel = GradientBoostedTreesModel.load(sc, BoostedTreeRegressionFileLoc)
predictions = savedModel.predict(indexedTESTreg)

# SAVE RESULTS
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btregressionfilename = "GradientBoostingTreeRegression_" + datestamp + ".txt";
dirfilename = scoredResultDir + btregressionfilename;
predictions.saveAsTextFile(dirfilename)


# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:14.6 秒Time taken to execute above cell: 14.6 seconds

从内存中清除对象并打印评分文件位置Clean up objects from memory and print scored file locations

# UNPERSIST OBJECTS CACHED IN MEMORY
taxi_df_test_cleaned.unpersist()
indexedTESTbinary.unpersist();
oneHotTESTbinary.unpersist();
indexedTESTreg.unpersist();
oneHotTESTreg.unpersist();
oneHotTESTregScaled.unpersist();


# PRINT OUT PATH TO SCORED OUTPUT FILES
print "logisticRegFileLoc: " + logisticregressionfilename;
print "linearRegFileLoc: " + linearregressionfilename;
print "randomForestClassificationFileLoc: " + rfclassificationfilename;
print "randomForestRegFileLoc: " + rfregressionfilename;
print "BoostedTreeClassificationFileLoc: " + btclassificationfilename;
print "BoostedTreeRegressionFileLoc: " + btregressionfilename;

输出:OUTPUT:

logisticRegFileLoc:LogisticRegressionWithLBFGS_2016-05-0317_22_38.953814.txtlogisticRegFileLoc: LogisticRegressionWithLBFGS_2016-05-0317_22_38.953814.txt

linearRegFileLoc:LinearRegressionWithSGD_2016-05-0317_22_58.878949linearRegFileLoc: LinearRegressionWithSGD_2016-05-0317_22_58.878949

randomForestClassificationFileLoc:RandomForestClassification_2016-05-0317_23_15.939247.txtrandomForestClassificationFileLoc: RandomForestClassification_2016-05-0317_23_15.939247.txt

randomForestRegFileLoc:RandomForestRegression_2016-05-0317_23_31.459140.txtrandomForestRegFileLoc: RandomForestRegression_2016-05-0317_23_31.459140.txt

BoostedTreeClassificationFileLoc:GradientBoostingTreeClassification_2016-05-0317_23_49.648334.txtBoostedTreeClassificationFileLoc: GradientBoostingTreeClassification_2016-05-0317_23_49.648334.txt

BoostedTreeRegressionFileLoc:GradientBoostingTreeRegression_2016-05-0317_23_56.860740.txtBoostedTreeRegressionFileLoc: GradientBoostingTreeRegression_2016-05-0317_23_56.860740.txt

通过 Web 界面使用 Spark 模型Consume Spark Models through a web interface

Spark 提供使用名为 Livy 的组件通过 REST 界面远程提交批处理作业或交互式查询的机制。Spark provides a mechanism to remotely submit batch jobs or interactive queries through a REST interface with a component called Livy. Livy 在 HDInsight Spark 群集上默认处于启用状态。Livy is enabled by default on your HDInsight Spark cluster. 有关 Livy 的详细信息,请参阅:使用 Livy 远程提交 Spark 作业For more information on Livy, see: Submit Spark jobs remotely using Livy.

可使用 Livy 远程提交一个作业,该作业批处理评分存储在 Azure Blob 中的文件,然后将结果写入另一个 blob。You can use Livy to remotely submit a job that batch scores a file that is stored in an Azure blob and then writes the results to another blob. 要执行此操作,将 Python 脚本从To do this, you upload the Python script from
GitHub 上载到 Spark 群集的 blob。GitHub to the blob of the Spark cluster. 可使用 Microsoft Azure 存储资源管理器AzCopy 将脚本复制到群集 blob。You can use a tool like Microsoft Azure Storage Explorer or AzCopy to copy the script to the cluster blob. 在本例中,我们将脚本上传到了 wasb:///example/python/ConsumeGBNYCReg.pyIn our case we uploaded the script to wasb:///example/python/ConsumeGBNYCReg.py.

备注

可在与 Spark 群集相关联的存储帐户的门户上找到所需的访问密钥。The access keys that you need can be found on the portal for the storage account associated with the Spark cluster.

上传到此位置后,此脚本在分布式上下文中在 Spark 内运行。Once uploaded to this location, this script runs within the Spark cluster in a distributed context. 它加载模型,并基于模型对输入文件运行预测。It loads the model and runs predictions on input files based on the model.

可通过在 Livy 上发出简单 HTTPS/REST 请求来远程调用此脚本。You can invoke this script remotely by making a simple HTTPS/REST request on Livy. 下面是用于构造远程调用 Python 脚本的 HTTP 请求的 curl 命令。Here is a curl command to construct the HTTP request to invoke the Python script remotely. 将 CLUSTERLOGIN、CLUSTERPASSWORD、CLUSTERNAME 替换为 Spark 群集的相应值。Replace CLUSTERLOGIN, CLUSTERPASSWORD, CLUSTERNAME with the appropriate values for your Spark cluster.

# CURL COMMAND TO INVOKE PYTHON SCRIPT WITH HTTP REQUEST

curl -k --user "CLUSTERLOGIN:CLUSTERPASSWORD" -X POST --data "{\"file\": \"wasb:///example/python/ConsumeGBNYCReg.py\"}" -H "Content-Type: application/json" https://CLUSTERNAME.azurehdinsight.net/livy/batches

通过使用基本身份验证发出简单的 HTTPS 调用,可通过 Livy 在远程系统上使用任意语言调用 Spark 作业。You can use any language on the remote system to invoke the Spark job through Livy by making a simple HTTPS call with Basic Authentication.

备注

进行此 HTTP 调用时使用 Python 请求库会很方便,但它当前未默认安装在 Azure Functions 中。It would be convenient to use the Python Requests library when making this HTTP call, but it is not currently installed by default in Azure Functions. 因此改为使用较早的 HTTP 库。So older HTTP libraries are used instead.

下面是 HTTP 调用的 Python 代码:Here is the Python code for the HTTP call:

#MAKE AN HTTPS CALL ON LIVY. 

import os

# OLDER HTTP LIBRARIES USED HERE INSTEAD OF THE REQUEST LIBRARY AS THEY ARE AVAILABLE BY DEFAULT
import httplib, urllib, base64

# REPLACE VALUE WITH ONES FOR YOUR SPARK CLUSTER
host = '<spark cluster name>.azurehdinsight.net:443'
username='<username>'
password='<password>'

#AUTHORIZATION
conn = httplib.HTTPSConnection(host)
auth = base64.encodestring('%s:%s' % (username, password)).replace('\n', '')
headers = {'Content-Type': 'application/json', 'Authorization': 'Basic %s' % auth}

# SPECIFY THE PYTHON SCRIPT TO RUN ON THE SPARK CLUSTER
# IN THE FILE PARAMETER OF THE JSON POST REQUEST BODY
r=conn.request("POST", '/livy/batches', '{"file": "wasb:///example/python/ConsumeGBNYCReg.py"}', headers )
response = conn.getresponse().read()
print(response)
conn.close()

还可以将此 Python 代码添加到 Azure Functions 以触发一个 Spark 作业提交,该作业提交基于各种事件(如计时器、创建或 blob 更新)为 blob 评分。You can also add this Python code to Azure Functions to trigger a Spark job submission that scores a blob based on various events like a timer, creation, or update of a blob.

如果首选无代码客户端体验,请使用 Azure 逻辑应用通过在逻辑应用设计器上定义一个 HTTP 操作并设置其参数来调用 Spark 批处理评分。If you prefer a code free client experience, use the Azure Logic Apps to invoke the Spark batch scoring by defining an HTTP action on the Logic Apps Designer and setting its parameters.

  • 从 Azure 门户,通过依次选择“+新建” -> “Web + 移动” -> “逻辑应用”创建新的逻辑应用。From Azure portal, create a new Logic App by selecting +New -> Web + Mobile -> Logic App.
  • 若要显示逻辑应用设计器,请输入逻辑应用和应用服务计划的名称。To bring up the Logic Apps Designer, enter the name of the Logic App and App Service Plan.
  • 选择某个 HTTP 操作并输入下图中显示的参数:Select an HTTP action and enter the parameters shown in the following figure:

逻辑应用设计器

下一步操作What's next?

交叉验证和超参数扫描:参阅使用 Spark 进行高级数据探索和建模,了解如何使用交叉验证和超参数扫描训练模型。Cross-validation and hyperparameter sweeping: See Advanced data exploration and modeling with Spark on how models can be trained using cross-validation and hyper-parameter sweeping.