使用 Spark 进行高级数据探索和建模Advanced data exploration and modeling with Spark

此演练对 NYC 出租车行程和车费 2013 数据的样本使用 HDInsight Spark 进行数据探索,并使用交叉验证和超参数优化训练二元分类和回归模型。This walkthrough uses HDInsight Spark to do data exploration and train binary classification and regression models using cross-validation and hyperparameter optimization on a sample of the NYC taxi trip and fare 2013 dataset. 它端到端演练数据科学过程的步骤,使用 HDInsight Spark 群集进行处理并使用 Azure blob 存储数据和模型。It walks you through the steps of the Data Science Process, end-to-end, using an HDInsight Spark cluster for processing and Azure blobs to store the data and the models. 此过程探索并可视化从 Azure 存储 Blob 引入的数据,并使数据为生成预测模型做好准备。The process explores and visualizes data brought in from an Azure Storage Blob and then prepares the data to build predictive models. 已使用 Python 编写解决方案并显示相关绘图。Python has been used to code the solution and to show the relevant plots. 这些模型使用 Spark MLlib 工具包生成,用于执行二元分类和回归建模任务。These models are build using the Spark MLlib toolkit to do binary classification and regression modeling tasks.

  • 二元分类任务用于预测某个行程是否会支付小费。The binary classification task is to predict whether or not a tip is paid for the trip.
  • 回归任务用于根据其他小费特征预测小费的金额。The regression task is to predict the amount of the tip based on other tip features.

建模步骤还包含显示如何训练、评估和保存每个模型类型的代码。The modeling steps also contain code showing how to train, evaluate, and save each type of model. 此主题涵盖某些与使用 Spark 进行数据探索和建模主题相同的基础。The topic covers some of the same ground as the Data exploration and modeling with Spark topic. 但是其内容更“高级”,因为还会涉及如何将交叉验证与超参数扫描结合使用,以训练最准确的分类和回归模型。But it is more "advanced" in that it also uses cross-validation with hyperparameter sweeping to train optimally accurate classification and regression models.

交叉验证 (CV) 是评估在已知数据集上训练的模型对预测未经训练的数据集特征的一般化程度的技术。Cross-validation (CV) is a technique that assesses how well a model trained on a known set of data generalizes to predicting the features of datasets on which it has not been trained. 此处使用的常见实现是将数据集划分为 K 折,然后以轮询机制方式在除一折以外的所有折上训练模型。A common implementation used here is to divide a dataset into K folds and then train the model in a round-robin fashion on all but one of the folds. 模型的准确预测能力在针对此不用来训练模型的折叠中的独立数据集进行测试时受到评估。The ability of the model to prediction accurately when tested against the independent dataset in this fold not used to train the model is assessed.

超参数优化是为学习算法选择一组超参数的问题,通常目标是优化算法在独立数据集上的性能度量值。Hyperparameter optimization is the problem of choosing a set of hyperparameters for a learning algorithm, usually with the goal of optimizing a measure of the algorithm's performance on an independent data set. 超参数是必须在模型训练过程之外指定的值。Hyperparameters are values that must be specified outside of the model training procedure. 关于这些值的假设可能影响模型的灵活性和准确性。Assumptions about these values can impact the flexibility and accuracy of the models. 例如,决策树具有超参数,如所需的深度和树中的树叶数量。Decision trees have hyperparameters, for example, such as the desired depth and number of leaves in the tree. 支持向量机 (SVM) 需要设置错误分类惩罚项。Support Vector Machines (SVMs) require setting a misclassification penalty term.

执行此处使用的超参数优化的常用方法是网格搜索或参数扫描A common way to perform hyperparameter optimization used here is a grid search, or a parameter sweep. 此搜索是指在超参数空间子集中搜索学习算法。This search goes through a subset of the hyperparameter space for a learning algorithm. 交叉验证可提供性能指标,用于为网格搜索算法生成的最佳结果排序。Cross validation can supply a performance metric to sort out the optimal results produced by the grid search algorithm. 与超参数扫描一起使用的 CV 有助于解决极限问题(如使模型过渡拟合训练数据),从而使模型保留适用于一般数据集(从中提取了训练数据)的能力。CV used with hyperparameter sweeping helps limit problems like overfitting a model to training data so that the model retains the capacity to apply to the general set of data from which the training data was extracted.

我们使用的模型包括逻辑和线性回归、随机林和梯度提升树:The models we use include logistic and linear regression, random forests, and gradient boosted trees:

  • 使用 SGD 的线性回归是使用随机梯度下降 (SGD) 方法进行优化和特征缩放的线性回归模型,以预测支付的小费金额。Linear regression with SGD is a linear regression model that uses a Stochastic Gradient Descent (SGD) method and for optimization and feature scaling to predict the tip amounts paid.
  • 使用 LBFGS 的逻辑回归或“logit”回归,是可在因变量分类时用于执行数据分类的回归模型。Logistic regression with LBFGS or "logit" regression, is a regression model that can be used when the dependent variable is categorical to do data classification. LBFGS 是拟牛顿优化算法,近似于使用有限计算机内存量的 Broyden–Fletcher–Goldfarb–Shanno (BFGS) 算法,在机器学习中广泛使用。LBFGS is a quasi-Newton optimization algorithm that approximates the Broyden–Fletcher–Goldfarb–Shanno (BFGS) algorithm using a limited amount of computer memory and that is widely used in machine learning.
  • 随机林是决策树的整体。Random forests are ensembles of decision trees. 它们组合了许多决策树以降低过度拟合的风险。They combine many decision trees to reduce the risk of overfitting. 随机林用于回归和分类,并且可处理分类特征,也可扩展到多类分类设置。Random forests are used for regression and classification and can handle categorical features and can be extended to the multiclass classification setting. 它们不需要特征缩放,并且能够捕获非线性和特征交互。They do not require feature scaling and are able to capture non-linearities and feature interactions. 随机林是用于分类和回归的最成功的机器学习模型之一。Random forests are one of the most successful machine learning models for classification and regression.
  • 梯度提升树 (GBTS) 是决策树的整体。Gradient boosted trees (GBTS) are ensembles of decision trees. GBTS 以迭代方式训练决策树以最大程度减少损失函数。GBTS train decision trees iteratively to minimize a loss function. GBTS 用于回归和分类,并且可处理分类特征,不需要功能缩放,并且能够捕获非线性和特征交互。GBTS is used for regression and classification and can handle categorical features, do not require feature scaling, and are able to capture non-linearities and feature interactions. 它们还可以在多类分类设置中使用。They can also be used in a multiclass-classification setting.

为二元分类问题显示了使用 CV 和超参数扫描的建模示例。Modeling examples using CV and Hyperparameter sweep are shown for the binary classification problem. 回归任务的主要主题中显示了更简单的示例(不使用参数扫描)。Simpler examples (without parameter sweeps) are presented in the main topic for regression tasks. 但是在附录中,还显示了为线性回归使用弹性网络的验证和为随机林回归使用参数扫描的 CV。But in the appendix, validation using elastic net for linear regression and CV with parameter sweep using for random forest regression are also presented. 弹性网络是用于拟合线性回归模型的正则化回归方法,该方法将 L1 和 L2 指标组合起来,作为 lassoridge 方法的惩罚。The elastic net is a regularized regression method for fitting linear regression models that linearly combines the L1 and L2 metrics as penalties of the lasso and ridge methods.

备注

尽管 Spark MLlib 工具包设计用于处理大型数据集,但为方便起见,此处使用相对较小的样本(约 30 Mb,使用 170K 行,大约原始 NYC 数据集的 0.1%)。Although the Spark MLlib toolkit is designed to work on large datasets, a relatively small sample (~30 Mb using 170K rows, about 0.1% of the original NYC dataset) is used here for convenience. 此处提供的练习在带有 2 个辅助节点的 HDInsight 群集中高效运行(在大约 10 分钟内)。The exercise given here runs efficiently (in about 10 minutes) on an HDInsight cluster with 2 worker nodes. 相同的代码(带有少量修改)可用于处理更大的数据集,前提是进行相应的修改以在内存中缓存数据和更改群集大小。The same code, with minor modifications, can be used to process larger data-sets, with appropriate modifications for caching data in memory and changing the cluster size.

设置:Spark 群集和笔记本Setup: Spark clusters and notebooks

本演练中提供的设置步骤和代码适用于 HDInsight Spark 1.6。Setup steps and code are provided in this walkthrough for using an HDInsight Spark 1.6. 但是,Jupyter 笔记本是针对 HDInsight Spark 1.6 和 Spark 2.0 群集提供的。But Jupyter notebooks are provided for both HDInsight Spark 1.6 and Spark 2.0 clusters. 包含这些笔记本的 GitHub 存储库的 Readme.md 中提供了这些笔记本的说明和链接。A description of the notebooks and links to them are provided in the Readme.md for the GitHub repository containing them. 而且,此处和位于链接笔记本中的代码是泛型代码,应适用于任何 Spark 群集。Moreover, the code here and in the linked notebooks is generic and should work on any Spark cluster. 如果不使用 HDInsight Spark,群集设置和管理步骤可能与此处所示内容稍有不同。If you are not using HDInsight Spark, the cluster setup and management steps may be slightly different from what is shown here. 为方便起见,下面提供了在 Jupyter 笔记本服务器的 pyspark 内核中运行的、适用于 Spark 1.6 和 2.0 的 Jupyter 笔记本的链接:For convenience, here are the links to the Jupyter notebooks for Spark 1.6 and 2.0 to be run in the pyspark kernel of the Jupyter Notebook server:

Spark 1.6 笔记本Spark 1.6 notebooks

pySpark-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb:包括笔记本 #1 中的主题,以及使用超参数优化和交叉验证的模型开发。pySpark-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb: Includes topics in notebook #1, and model development using hyperparameter tuning and cross-validation.

Spark 2.0 笔记本Spark 2.0 notebooks

Spark2.0-pySpark3-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb:此文件提供有关如何在 Spark 2.0 群集中执行数据探索、建模和评分的信息。Spark2.0-pySpark3-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb: This file provides information on how to perform data exploration, modeling, and scoring in Spark 2.0 clusters.

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

设置:存储位置、库和预设 Spark 上下文Setup: storage locations, libraries, and the preset Spark context

Spark 能够读取和写入 Azure 存储 Blob(也称为 WASB)。Spark is able to read and write to Azure Storage Blob (also known as WASB). 因此,存储在该处的任何现有数据都可以使用 Spark 处理,并将结果再次存储在 WASB 中。So any of your existing data stored there can be processed using Spark and the results stored again in WASB.

若要在 WASB 中保存模型或文件,需要正确指定路径。To save models or files in WASB, the path needs to be specified properly. 可使用开头为以下内容的路径,引用附加到 Spark 群集的默认容器:“wasb:///”。The default container attached to the Spark cluster can be referenced using a path beginning with: "wasb:///". 其他位置通过“wasb://”引用。Other locations are referenced by “wasb://”.

在 WASB 中为存储位置设置目录路径Set directory paths for storage locations in WASB

以下代码示例指定要读取的数据位置,以及用于保存模型输出的模型存储目录的路径:The following code sample specifies the location of the data to be read and the path for the model storage directory to which the model output is saved:

# SET PATHS TO FILE LOCATIONS: DATA AND MODEL STORAGE

# LOCATION OF TRAINING DATA
taxi_train_file_loc = "wasb://mllibwalkthroughs@cdspsparksamples.blob.core.chinacloudapi.cn/Data/NYCTaxi/JoinedTaxiTripFare.Point1Pct.Train.tsv";


# SET THE MODEL STORAGE DIRECTORY PATH 
# NOTE THAT THE FINAL BACKSLASH IN THE PATH IS NEEDED.
modelDir = "wasb:///user/remoteuser/NYCTaxi/Models/";

# PRINT START TIME
import datetime
datetime.datetime.now()

OUTPUTOUTPUT

datetime.datetime(2016, 4, 18, 17, 36, 27, 832799)datetime.datetime(2016, 4, 18, 17, 36, 27, 832799)

导入库Import libraries

使用以下代码导入必要的库:Import necessary libraries with the following code:

# LOAD PYSPARK LIBRARIES
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import atexit
from numpy import array
import numpy as np
import datetime

预设 Spark 上下文和 PySpark magicPreset Spark context and PySpark magics

与 Jupyter 笔记本一起提供的 PySpark 内核具有预设上下文。The PySpark kernels that are provided with Jupyter notebooks have a preset context. 因此在开始处理正在开发的应用程序前无需显式设置 Spark 或 Hive 上下文。So you do not need to set the Spark or Hive contexts explicitly before you start working with the application you are developing. 这些上下文默认可用。These contexts are available for you by default. 这些上下文包括:These contexts are:

  • sc - 用于 Sparksc - for Spark
  • sqlContext - 用于 HivesqlContext - for Hive

PySpark 内核提供一些预定义的“magic”,这是可以结合 %% 调用的特殊命令。The PySpark kernel provides some predefined “magics”, which are special commands that you can call with %%. 在这些代码示例中使用的此类命令有两个。There are two such commands that are used in these code samples.

  • %%local 指定本地执行后续行中的代码。%%local Specifies that the code in subsequent lines is to be executed locally. 代码必须是有效的 Python 代码。Code must be valid Python code.
  • %%sql -o <variable name> 针对 sqlContext 执行 Hive 查询。%%sql -o <variable name> Executes a Hive query against the sqlContext. 如果传递了 -o 参数,则查询的结果以 Pandas 数据帧的形式保存在 %%local Python 上下文中。If the -o parameter is passed, the result of the query is persisted in the %%local Python context as a Pandas DataFrame.

有关 Jupyter 笔记本内核和它们提供的预定义“magic”的详细信息,请参阅适用于装有 HDInsight 上的 HDInsight Spark Linux 群集的 Jupyter 笔记本的内核For more information on the kernels for Jupyter notebooks and the predefined "magics" that they provide, see Kernels available for Jupyter notebooks with HDInsight Spark Linux clusters on HDInsight.

来自公共 blob 的数据引入:Data ingestion from public blob:

数据科学过程的第一步是从源引入数据以供分析,使其驻留在数据探索和建模环境中。The first step in the data science process is to ingest the data to be analyzed from sources where it resides into your data exploration and modeling environment. 此环境在本演练中为 Spark。This environment is Spark in this walkthrough. 本部分包含要完成一系列任务的代码:This section contains the code to complete a series of tasks:

  • 引入数据样本以供建模ingest the data sample to be modeled
  • 读取输入数据集(以 .tsv 文件的形式存储)read in the input dataset (stored as a .tsv file)
  • 格式化并清理数据format and clean the data
  • 在内存中创建并缓存对象(RDD 或数据帧)create and cache objects (RDDs or data-frames) in memory
  • 在 SQL 上下文中将其注册为临时表。register it as a temp-table in SQL-context.

下面是用于数据引入的代码。Here is the code for data ingestion.

# RECORD START TIME
timestart = datetime.datetime.now()

# IMPORT FILE FROM PUBLIC BLOB
taxi_train_file = sc.textFile(taxi_train_file_loc)

# GET SCHEMA OF THE FILE FROM HEADER
schema_string = taxi_train_file.first()
fields = [StructField(field_name, StringType(), True) for field_name in schema_string.split('\t')]
fields[7].dataType = IntegerType() #Pickup hour
fields[8].dataType = IntegerType() # Pickup week
fields[9].dataType = IntegerType() # Weekday
fields[10].dataType = IntegerType() # Passenger count
fields[11].dataType = FloatType() # Trip time in secs
fields[12].dataType = FloatType() # Trip distance
fields[19].dataType = FloatType() # Fare amount
fields[20].dataType = FloatType() # Surcharge
fields[21].dataType = FloatType() # Mta_tax
fields[22].dataType = FloatType() # Tip amount
fields[23].dataType = FloatType() # Tolls amount
fields[24].dataType = FloatType() # Total amount
fields[25].dataType = IntegerType() # Tipped or not
fields[26].dataType = IntegerType() # Tip class
taxi_schema = StructType(fields)

# PARSE FIELDS AND CONVERT DATA TYPE FOR SOME FIELDS
taxi_header = taxi_train_file.filter(lambda l: "medallion" in l)
taxi_temp = taxi_train_file.subtract(taxi_header).map(lambda k: k.split("\t"))\
            .map(lambda p: (p[0],p[1],p[2],p[3],p[4],p[5],p[6],int(p[7]),int(p[8]),int(p[9]),int(p[10]),
                        float(p[11]),float(p[12]),p[13],p[14],p[15],p[16],p[17],p[18],float(p[19]),
                        float(p[20]),float(p[21]),float(p[22]),float(p[23]),float(p[24]),int(p[25]),int(p[26])))


# CREATE DATA FRAME
taxi_train_df = sqlContext.createDataFrame(taxi_temp, taxi_schema)

# CREATE A CLEANED DATA-FRAME BY DROPPING SOME UN-NECESSARY COLUMNS & FILTERING FOR UNDESIRED VALUES OR OUTLIERS
taxi_df_train_cleaned = taxi_train_df.drop('medallion').drop('hack_license').drop('store_and_fwd_flag').drop('pickup_datetime')\
    .drop('dropoff_datetime').drop('pickup_longitude').drop('pickup_latitude').drop('dropoff_latitude')\
    .drop('dropoff_longitude').drop('tip_class').drop('total_amount').drop('tolls_amount').drop('mta_tax')\
    .drop('direct_distance').drop('surcharge')\
    .filter("passenger_count > 0 and passenger_count < 8 AND payment_type in ('CSH', 'CRD') AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 150 AND trip_distance > 0 AND trip_distance < 100 AND trip_time_in_secs > 30 AND trip_time_in_secs < 7200" )

# CACHE & MATERIALIZE DATA-FRAME IN MEMORY. GOING THROUGH AND COUNTING NUMBER OF ROWS MATERIALIZES THE DATA-FRAME IN MEMORY
taxi_df_train_cleaned.cache()
taxi_df_train_cleaned.count()

# REGISTER DATA-FRAME AS A TEMP-TABLE IN SQL-CONTEXT
taxi_df_train_cleaned.registerTempTable("taxi_train")

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

执行以上单元格所花的时间:276.62 秒Time taken to execute above cell: 276.62 seconds

数据探索和可视化Data exploration & visualization

将数据引入到 Spark 中后,数据科学过程的下一步是通过探索和可视化更深入地了解数据。Once the data has been brought into Spark, the next step in the data science process is to gain deeper understanding of the data through exploration and visualization. 在本部分中,我们使用 SQL 查询检查出租车数据,并绘制目标变量和预期特征以供视觉检查。In this section, we examine the taxi data using SQL queries and plot the target variables and prospective features for visual inspection. 具体而言,我们绘制出租车行程中乘客数的频率、小费金额的频率以及小费如何随支付金额和类型而变化。Specifically, we plot the frequency of passenger counts in taxi trips, the frequency of tip amounts, and how tips vary by payment amount and type.

绘制出租车小费样本中乘客数频率的直方图Plot a histogram of passenger count frequencies in the sample of taxi trips

此代码和后续代码段使用 SQL magic 查询样本,使用本地 magic 绘制数据。This code and subsequent snippets use SQL magic to query the sample and local magic to plot the data.

  • SQL magic (%%sql) HDInsight PySpark 内核支持针对 sqlContext 的轻松内联 HiveQL 查询。SQL magic (%%sql) The HDInsight PySpark kernel supports easy inline HiveQL queries against the sqlContext. (-o VARIABLE_NAME) 参数在 Jupyter 服务器上将 SQL 查询的输出保留为 Pandas 数据帧。The (-o VARIABLE_NAME) argument persists the output of the SQL query as a Pandas DataFrame on the Jupyter server. 这意味着它在本地模式下可用。This means it is available in the local mode.
  • %%local magic 用于在 Jupyter 服务器上本地运行代码,该服务器是 HDInsight 群集的头节点。The %%local magic is used to run code locally on the Jupyter server, which is the headnode of the HDInsight cluster. 通常,在将 %%sql -o magic 用于运行查询后,使用 %%local magic。Typically, you use %%local magic after the %%sql -o magic is used to run a query. -o 参数会在本地保留 SQL 查询的输出。The -o parameter would persist the output of the SQL query locally. 然后,%%local magic 触发下一组代码片段,以在本地针对已保留在本地的 SQL 查询输出运行。Then the %%local magic triggers the next set of code snippets to run locally against the output of the SQL queries that has been persisted locally. 该输出在运行代码后自动可视化。The output is automatically visualized after you run the code.

此查询通过乘客数检索行程。This query retrieves the trips by passenger count.

# PLOT FREQUENCY OF PASSENGER COUNTS IN TAXI TRIPS

# SQL QUERY
%%sql -q -o sqlResults
SELECT passenger_count, COUNT(*) as trip_counts FROM taxi_train WHERE passenger_count > 0 and passenger_count < 7 GROUP BY passenger_count

此代码从查询输出创建本地数据帧,并绘制数据。This code creates a local data-frame from the query output and plots the data. %%local magic 创建本地数据帧 sqlResults,可用于使用 matplotlib 进行绘制。The %%local magic creates a local data-frame, sqlResults, which can be used for plotting with matplotlib.

备注

此 PySpark magic 在本演练中多次使用。This PySpark magic is used multiple times in this walkthrough. 如果数据量很大,应采样以创建本地内存可容纳的数据帧。If the amount of data is large, you should sample to create a data-frame that can fit in local memory.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# USE THE JUPYTER AUTO-PLOTTING FEATURE TO CREATE INTERACTIVE FIGURES. 
# CLICK ON THE TYPE OF PLOT TO BE GENERATED (E.G. LINE, AREA, BAR ETC.)
sqlResults

下面是用于按乘客数绘制行程的代码Here is the code to plot the trips by passenger counts

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
import matplotlib.pyplot as plt
%matplotlib inline

# PLOT PASSENGER NUMBER VS TRIP COUNTS
x_labels = sqlResults['passenger_count'].values
fig = sqlResults[['trip_counts']].plot(kind='bar', facecolor='lightblue')
fig.set_xticklabels(x_labels)
fig.set_title('Counts of trips by passenger count')
fig.set_xlabel('Passenger count in trips')
fig.set_ylabel('Trip counts')
plt.show()

OUTPUTOUTPUT

按乘客数的行程频率

可通过使用笔记本中的“类型”菜单按钮从多个不同类型的可视化(表、饼图、线图、面积图或条形图)中选择。You can select among several different types of visualizations (Table, Pie, Line, Area, or Bar) by using the Type menu buttons in the notebook. 此处显示了条形图。The Bar plot is shown here.

绘制小费金额以及小费金额如何随乘客数和车费金额变化的直方图。Plot a histogram of tip amounts and how tip amount varies by passenger count and fare amounts.

使用 SQL 查询为数据采样。Use a SQL query to sample data..

# SQL SQUERY
%%sql -q -o sqlResults
    SELECT fare_amount, passenger_count, tip_amount, tipped
    FROM taxi_train 
    WHERE passenger_count > 0 
    AND passenger_count < 7
    AND fare_amount > 0 
    AND fare_amount < 200
    AND payment_type in ('CSH', 'CRD')
    AND tip_amount > 0 
    AND tip_amount < 25

此代码单元格使用 SQL 查询创建三个数据图。This code cell uses the SQL query to create three plots the data.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline

# TIP BY PAYMENT TYPE AND PASSENGER COUNT
ax1 = resultsPDDF[['tip_amount']].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('Tip amount distribution')
ax1.set_xlabel('Tip Amount ($)')
ax1.set_ylabel('Counts')
plt.suptitle('')
plt.show()

# TIP BY PASSENGER COUNT
ax2 = resultsPDDF.boxplot(column=['tip_amount'], by=['passenger_count'])
ax2.set_title('Tip amount ($) by Passenger count')
ax2.set_xlabel('Passenger count')
ax2.set_ylabel('Tip Amount ($)')
plt.suptitle('')
plt.show()

# TIP AMOUNT BY FARE AMOUNT, POINTS ARE SCALED BY PASSENGER COUNT
ax = resultsPDDF.plot(kind='scatter', x= 'fare_amount', y = 'tip_amount', c='blue', alpha = 0.10, s=5*(resultsPDDF.passenger_count))
ax.set_title('Tip amount by Fare amount ($)')
ax.set_xlabel('Fare Amount')
ax.set_ylabel('Tip Amount')
plt.axis([-2, 120, -2, 30])
plt.show()

输出:OUTPUT:

小费金额分布

按乘客数的小费金额

按车费金额的小费金额

用于建模的特征工程、转换和数据准备Feature engineering, transformation, and data preparation for modeling

本部分介绍并提供使数据准备好在 ML 建模中使用的过程的代码。This section describes and provides the code for procedures used to prepare data for use in ML modeling. 它介绍如何执行以下任务:It shows how to do the following tasks:

  • 通过将小时划分到交通时间箱来创建新特征Create a new feature by partitioning hours into traffic time bins
  • 为分类特征编制索引并进行独热编码Index and on-hot encode categorical features
  • 创建标签点对象用于输入到 ML 函数中Create labeled point objects for input into ML functions
  • 创建数据的随机子采样,并将其拆分为训练集和测试集Create a random subsampling of the data and split it into training and testing sets
  • 特征缩放Feature scaling
  • 在内存中缓存对象Cache objects in memory

通过将交通时间划分到箱来创建新特征Create a new feature by partitioning traffic times into bins

此代码显示如何通过将交通时间划分到箱来创建新特征,以及如何在内存中缓存生成的数据帧。This code shows how to create a new feature by partitioning traffic times into bins and then how to cache the resulting data frame in memory. 当重复使用弹性分布式数据集 (RDD) 和数据帧时,缓存导致执行时间改善。Caching leads to improved execution time where Resilient Distributed Datasets (RDDs) and data-frames are used repeatedly. 因此,我们在本演练中的多个阶段缓存 RDD 和数据帧。So, we cache RDDs and data-frames at several stages in this walkthrough.

# CREATE FOUR BUCKETS FOR TRAFFIC TIMES
sqlStatement = """
    SELECT *,
    CASE
     WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night" 
     WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush" 
     WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
     WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
    END as TrafficTimeBins
    FROM taxi_train 
"""
taxi_df_train_with_newFeatures = sqlContext.sql(sqlStatement)
# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
# THE .COUNT() GOES THROUGH THE ENTIRE DATA-FRAME,
# MATERIALIZES IT IN MEMORY, AND GIVES THE COUNT OF ROWS.
taxi_df_train_with_newFeatures.cache()
taxi_df_train_with_newFeatures.count()

OUTPUTOUTPUT

126050126050

为分类特征编制索引并进行独热编码Index and one-hot encode categorical features

本部分介绍如何为分类特征编制索引或进行编码以输入到建模函数中。This section shows how to index or encode categorical features for input into the modeling functions. MLlib 的建模和预测函数需要在使用前,对带有分类输入数据的特征进行索引或编码。The modeling and predict functions of MLlib require that features with categorical input data be indexed or encoded prior to use.

根据模型,需要以不同方式为它们编制索引或进行独热编码。Depending on the model, you need to index or encode them in different ways. 例如,逻辑和线性回归模型需要独热编码,举例来说,具有三个类别的特征可扩展为三个特征列,每个根据观察的类别包含 0 或 1。For example, Logistic and Linear Regression models require one-hot encoding, where, for example, a feature with three categories can be expanded into three feature columns, with each containing 0 or 1 depending on the category of an observation. MLlib 提供 OneHotEncoder 函数用于执行独热编码。MLlib provides OneHotEncoder function to do one-hot encoding. 此编码器将标签索引列映射到二元向量列,该列最多只有单个值。This encoder maps a column of label indices to a column of binary vectors, with at most a single one-value. 此编码允许将预期数值特征的算法(如逻辑回归)应用到分类特征。This encoding allows algorithms that expect numerical valued features, such as logistic regression, to be applied to categorical features.

下面是用于为分类特征编制索引和编码的代码:Here is the code to index and encode categorical features:

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, OneHotEncoder, VectorIndexer

# INDEX AND ENCODE VENDOR_ID
stringIndexer = StringIndexer(inputCol="vendor_id", outputCol="vendorIndex")
model = stringIndexer.fit(taxi_df_train_with_newFeatures) # Input data-frame is the cleaned one from above
indexed = model.transform(taxi_df_train_with_newFeatures)
encoder = OneHotEncoder(dropLast=False, inputCol="vendorIndex", outputCol="vendorVec")
encoded1 = encoder.transform(indexed)

# INDEX AND ENCODE RATE_CODE
stringIndexer = StringIndexer(inputCol="rate_code", outputCol="rateIndex")
model = stringIndexer.fit(encoded1)
indexed = model.transform(encoded1)
encoder = OneHotEncoder(dropLast=False, inputCol="rateIndex", outputCol="rateVec")
encoded2 = encoder.transform(indexed)

# INDEX AND ENCODE PAYMENT_TYPE
stringIndexer = StringIndexer(inputCol="payment_type", outputCol="paymentIndex")
model = stringIndexer.fit(encoded2)
indexed = model.transform(encoded2)
encoder = OneHotEncoder(dropLast=False, inputCol="paymentIndex", outputCol="paymentVec")
encoded3 = encoder.transform(indexed)

# INDEX AND TRAFFIC TIME BINS
stringIndexer = StringIndexer(inputCol="TrafficTimeBins", outputCol="TrafficTimeBinsIndex")
model = stringIndexer.fit(encoded3)
indexed = model.transform(encoded3)
encoder = OneHotEncoder(dropLast=False, inputCol="TrafficTimeBinsIndex", outputCol="TrafficTimeBinsVec")
encodedFinal = encoder.transform(indexed)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

执行以上单元格所花的时间:3.14 秒Time taken to execute above cell: 3.14 seconds

创建标签点对象用于输入到 ML 函数中Create labeled point objects for input into ML functions

本节包含的代码演示如何将分类文本数据索引为标签点数据类型以及如何对其进行编码。This section contains code that shows how to index categorical text data as a labeled point data type and how to encode it. 此转换会准备将文本数据用于训练和测试 MLlib 逻辑回归和其他分类模型。This transformation prepares text data to be used to train and test MLlib logistic regression and other classification models. 标签点对象是弹性分布式数据集 (RDD),格式化为 MLlib 中的大多数 ML 算法所需的输入数据。Labeled point objects are Resilient Distributed Datasets (RDD) formatted in a way that is needed as input data by most of ML algorithms in MLlib. 标签点是本地向量,可能密集,也可能稀疏,与标签/响应相关联。A labeled point is a local vector, either dense or sparse, associated with a label/response.

下面是用于针对二元分类为文本特征编制索引和编码的代码。Here is the code to index and encode text features for binary classification.

# FUNCTIONS FOR BINARY CLASSIFICATION

# LOAD LIBRARIES
from pyspark.mllib.regression import LabeledPoint
from numpy import array

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingBinary(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.pickup_hour, line.weekday,
                         line.passenger_count, line.trip_time_in_secs, line.trip_distance, line.fare_amount])
    labPt = LabeledPoint(line.tipped, features)
    return  labPt

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO LOGISTIC REGRESSION MODELS
def parseRowOneHotBinary(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                               line.vendorVec.toArray(), line.rateVec.toArray(), line.paymentVec.toArray()), axis=0)
    labPt = LabeledPoint(line.tipped, features)
    return  labPt

下面是用于针对线性回归分析为分类文本特征编码和编制索引的代码。Here is the code to encode and index categorical text features for linear regression analysis.

# FUNCTIONS FOR REGRESSION WITH TIP AMOUNT AS TARGET VARIABLE

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingRegression(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex, 
                         line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs, 
                         line.trip_distance, line.fare_amount])
    labPt = LabeledPoint(line.tip_amount, features)
    return  labPt

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO LINEAR REGRESSION MODELS
def parseRowOneHotRegression(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                                        line.vendorVec.toArray(), line.rateVec.toArray(), 
                                        line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
    labPt = LabeledPoint(line.tip_amount, features)
    return  labPt

创建数据的随机子采样,并将其拆分为训练集和测试集Create a random subsampling of the data and split it into training and testing sets

此代码创建数据的随机采样(此处使用 25%)。This code creates a random sampling of the data (25% is used here). 尽管由于数据集的大小,本示例不需要此操作,但我们将演示如何在此处对数据采样。Although it is not required for this example due to the size of the dataset, we demonstrate how you can sample the data here. 然后,就会知道如何在需要时针对自己的问题使用它。Then you know how to use it for your own problem if needed. 当样本很大时,采样可以在训练模型时节省大量时间。When samples are large, sampling can save significant time while training models. 接下来我们将样本拆分为训练部分(此处为 75%)和测试部分(此处为 25%),用于分类和回归建模。Next we split the sample into a training part (75% here) and a testing part (25% here) to use in classification and regression modeling.

# RECORD START TIME
timestart = datetime.datetime.now()

# SPECIFY SAMPLING AND SPLITTING FRACTIONS
from pyspark.sql.functions import rand

samplingFraction = 0.25;
trainingFraction = 0.75; testingFraction = (1-trainingFraction);
seed = 1234;
encodedFinalSampled = encodedFinal.sample(False, samplingFraction, seed=seed)

# SPLIT SAMPLED DATA-FRAME INTO TRAIN/TEST, WITH A RANDOM COLUMN ADDED FOR DOING CV (SHOWN LATER)
# INCLUDE RAND COLUMN FOR CREATING CROSS-VALIDATION FOLDS
dfTmpRand = encodedFinalSampled.select("*", rand(0).alias("rand"));
trainData, testData = dfTmpRand.randomSplit([trainingFraction, testingFraction], seed=seed);

# CACHE TRAIN AND TEST DATA
trainData.cache()
testData.cache()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary = trainData.map(parseRowIndexingBinary)
indexedTESTbinary = testData.map(parseRowIndexingBinary)
oneHotTRAINbinary = trainData.map(parseRowOneHotBinary)
oneHotTESTbinary = testData.map(parseRowOneHotBinary)

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg = trainData.map(parseRowIndexingRegression)
indexedTESTreg = testData.map(parseRowIndexingRegression)
oneHotTRAINreg = trainData.map(parseRowOneHotRegression)
oneHotTESTreg = testData.map(parseRowOneHotRegression)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

执行以上单元格所花的时间:0.31 秒Time taken to execute above cell: 0.31 second

特征缩放Feature scaling

特征缩放(也称为数据规范化)确保具有广泛分散的值的特征不在目标函数中得到过多权重。Feature scaling, also known as data normalization, insures that features with widely disbursed values are not given excessive weigh in the objective function. 用于特征缩放的代码使用 StandardScaler 将特征缩放为单位差异。The code for feature scaling uses the StandardScaler to scale the features to unit variance. 它由 MLlib 提供,用于使用随机梯度下降 (SGD) 的线性回归。It is provided by MLlib for use in linear regression with Stochastic Gradient Descent (SGD). SGD 是一种用于训练范围广泛的其他机器学习模型(如正则化回归或支持向量机 (SVM))的流行算法。SGD is a popular algorithm for training a wide range of other machine learning models such as regularized regressions or support vector machines (SVM).

提示

我们发现,LinearRegressionWithSGD 算法对特征缩放很敏感。We have found the LinearRegressionWithSGD algorithm to be sensitive to feature scaling.

下面是用于缩放变量以用于正则化线性 SGD 算法的代码。Here is the code to scale variables for use with the regularized linear SGD algorithm.

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.util import MLUtils

# SCALE VARIABLES FOR REGULARIZED LINEAR SGD ALGORITHM
label = oneHotTRAINreg.map(lambda x: x.label)
features = oneHotTRAINreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTRAINregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

label = oneHotTESTreg.map(lambda x: x.label)
features = oneHotTESTreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTESTregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

执行以上单元格所花的时间:11.67 秒Time taken to execute above cell: 11.67 seconds

在内存中缓存对象Cache objects in memory

可通过缓存用于分类、回归和缩放特征的输入数据帧对象来缩短训练和测试 ML 算法所花的时间。The time taken for training and testing of ML algorithms can be reduced by caching the input data frame objects used for classification, regression and, scaled features.

# RECORD START TIME
timestart = datetime.datetime.now()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.cache()
indexedTESTbinary.cache()
oneHotTRAINbinary.cache()
oneHotTESTbinary.cache()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.cache()
indexedTESTreg.cache()
oneHotTRAINreg.cache()
oneHotTESTreg.cache()

# SCALED FEATURES
oneHotTRAINregScaled.cache()
oneHotTESTregScaled.cache()

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

执行以上单元格所花的时间:0.13 秒Time taken to execute above cell: 0.13 second

通过二元分类模型预测是否支付小费Predict whether or not a tip is paid with binary classification models

本部分介绍如何将二元分类任务的三个模型用于预测是否为某个出租车行程支付小费。This section shows how use three models for the binary classification task of predicting whether or not a tip is paid for a taxi trip. 显示的模型包括:The models presented are:

  • 逻辑回归Logistic regression
  • 随机林Random forest
  • 梯度提升树Gradient Boosting Trees

每个模型生成代码部分拆分为多个步骤:Each model building code section is split into steps:

  1. 带有一个参数集的模型训练数据Model training data with one parameter set
  2. 测试数据集上的使用指标的模型评估Model evaluation on a test data set with metrics
  3. 在 blob 中保存模型以供将来使用Saving model in blob for future consumption

我们介绍如何通过两种方法使用参数扫描进行交叉验证 (CV):We show how to do cross-validation (CV) with parameter sweeping in two ways:

  1. 使用泛型自定义代码,此代码可应用到 MLlib 中的任何算法以及算法中的任何参数集。Using generic custom code that can be applied to any algorithm in MLlib and to any parameter sets in an algorithm.

  2. 使用 pySpark CrossValidator 管道函数Using the pySpark CrossValidator pipeline function. CrossValidator 对于 Spark 1.5.0 有几个限制:CrossValidator has a few limitations for Spark 1.5.0:

    • 无法保存或保留管道模型以供将来使用。Pipeline models cannot be saved or persisted for future consumption.
    • 无法用于模型中的每个参数。Cannot be used for every parameter in a model.
    • 无法用于每个 MLlib 算法。Cannot be used for every MLlib algorithm.

与二元分类的逻辑回归算法一起使用的泛型交叉验证和超参数扫描Generic cross validation and hyperparameter sweeping used with the logistic regression algorithm for binary classification

本部分中的代码显示如何使用 LBFGS 训练、评估和保存逻辑回归模型,该模型在 NYC 出租车行程和车费数据集中预测某个行程是否支付小费。The code in this section shows how to train, evaluate, and save a logistic regression model with LBFGS that predicts whether or not a tip is paid for a trip in the NYC taxi trip and fare dataset. 使用交叉验证 (CV) 和通过自定义代码(可应用到 MLlib 中的任何学习算法)实现的超参数扫描训练模型。The model is trained using cross validation (CV) and hyperparameter sweeping implemented with custom code that can be applied to any of the learning algorithms in MLlib.

备注

此自定义 CV 代码的执行可能需要几分钟。The execution of this custom CV code can take several minutes.

使用 CV 和超参数扫描训练逻辑回归模型Train the logistic regression model using CV and hyperparameter sweeping

# LOGISTIC REGRESSION CLASSIFICATION WITH CV AND HYPERPARAMETER SWEEPING

# GET ACCURACY FOR HYPERPARAMETERS BASED ON CROSS-VALIDATION IN TRAINING DATA-SET

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD LIBRARIES
from pyspark.mllib.classification import LogisticRegressionWithLBFGS 
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# CREATE PARAMETER GRID FOR LOGISTIC REGRESSION PARAMETER SWEEP
from sklearn.grid_search import ParameterGrid
grid = [{'regParam': [0.01, 0.1], 'iterations': [5, 10], 'regType': ["l1", "l2"], 'tolerance': [1e-3, 1e-4]}]
paramGrid = list(ParameterGrid(grid))
numModels = len(paramGrid)

# SET NUM FOLDS AND NUM PARAMETER SETS TO SWEEP ON
nFolds = 3;
h = 1.0 / nFolds;
metricSum = np.zeros(numModels);

# BEGIN CV WITH PARAMETER SWEEP
for i in range(nFolds):
    # Create training and x-validation sets
    validateLB = i * h
    validateUB = (i + 1) * h
    condition = (trainData["rand"] >= validateLB) & (trainData["rand"] < validateUB)
    validation = trainData.filter(condition)
    # Create LabeledPoints from data-frames
    if i > 0:
        trainCVLabPt.unpersist()
        validationLabPt.unpersist()
    trainCV = trainData.filter(~condition)
    trainCVLabPt = trainCV.map(parseRowOneHotBinary)
    trainCVLabPt.cache()
    validationLabPt = validation.map(parseRowOneHotBinary)
    validationLabPt.cache()
    # For parameter sets compute metrics from x-validation
    for j in range(numModels):
        regt = paramGrid[j]['regType']
        regp = paramGrid[j]['regParam']
        iters = paramGrid[j]['iterations']
        tol = paramGrid[j]['tolerance']
        # Train logistic regression model with hypermarameter set
        model = LogisticRegressionWithLBFGS.train(trainCVLabPt, regType=regt, iterations=iters,  
                                                  regParam=regp, tolerance = tol, intercept=True)
        predictionAndLabels = validationLabPt.map(lambda lp: (float(model.predict(lp.features)), lp.label))
        # Use ROC-AUC as accuracy metrics
        validMetrics = BinaryClassificationMetrics(predictionAndLabels)
        metric = validMetrics.areaUnderROC
        metricSum[j] += metric

avgAcc = metricSum / nFolds;
bestParam = paramGrid[np.argmax(avgAcc)];

# UNPERSIST OBJECTS
trainCVLabPt.unpersist()
validationLabPt.unpersist()

# TRAIN ON FULL TRAIING SET USING BEST PARAMETERS FROM CV/PARAMETER SWEEP
logitBest = LogisticRegressionWithLBFGS.train(oneHotTRAINbinary, regType=bestParam['regType'], 
                                              iterations=bestParam['iterations'], 
                                              regParam=bestParam['regParam'], tolerance = bestParam['tolerance'], 
                                              intercept=True)


# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features, 
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(logitBest.weights))
print("Intercept: " + str(logitBest.intercept))

# PRINT ELAPSED TIME    
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

系数:[0.0082065285375, -0.0223675576104, -0.0183812028036, -3.48124578069e-05, -0.00247646947233, -0.00165897881503, 0.0675394837328, -0.111823113101, -0.324609912762, -0.204549780032, -1.36499216354, 0.591088507921, -0.664263411392, -1.00439726852, 3.46567827545, -3.51025855172, -0.0471341112232, -0.043521833294, 0.000243375810385, 0.054518719222]Coefficients: [0.0082065285375, -0.0223675576104, -0.0183812028036, -3.48124578069e-05, -0.00247646947233, -0.00165897881503, 0.0675394837328, -0.111823113101, -0.324609912762, -0.204549780032, -1.36499216354, 0.591088507921, -0.664263411392, -1.00439726852, 3.46567827545, -3.51025855172, -0.0471341112232, -0.043521833294, 0.000243375810385, 0.054518719222]

截距:-0.0111216486893Intercept: -0.0111216486893

执行以上单元格所花的时间:14.43 秒Time taken to execute above cell: 14.43 seconds

通过标准指标评估二元分类模型Evaluate the binary classification model with standard metrics

本部分中的代码显示如何针对测试数据集评估逻辑回归模型,包括 ROC 曲线图。The code in this section shows how to evaluate a logistic regression model against a test data-set, including a plot of the ROC curve.

# RECORD START TIME
timestart = datetime.datetime.now()

#IMPORT LIBRARIES
from sklearn.metrics import roc_curve,auc
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

# PREDICT ON TEST DATA WITH BEST/FINAL MODEL
predictionAndLabels = oneHotTESTbinary.map(lambda lp: (float(logitBest.predict(lp.features)), lp.label))

# INSTANTIATE METRICS OBJECT
metrics = BinaryClassificationMetrics(predictionAndLabels)

# AREA UNDER PRECISION-RECALL CURVE
print("Area under PR = %s" % metrics.areaUnderPR)

# AREA UNDER ROC CURVE
print("Area under ROC = %s" % metrics.areaUnderROC)
metrics = MulticlassMetrics(predictionAndLabels)

# OVERALL STATISTICS
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# OUTPUT PROBABILITIES AND REGISTER TEMP TABLE
logitBest.clearThreshold(); # This clears threshold for classification (0.5) and outputs probabilities
predictionAndLabelsDF = predictionAndLabels.toDF()
predictionAndLabelsDF.registerTempTable("tmp_results");

# PRINT ELAPSED TIME    
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

PR = 0.985336538462 下的面积Area under PR = 0.985336538462

ROC = 0.983383274312 下的面积Area under ROC = 0.983383274312

摘要统计信息Summary Stats

精度 = 0.984174341679Precision = 0.984174341679

召回率 = 0.984174341679Recall = 0.984174341679

F1 分数 = 0.984174341679F1 Score = 0.984174341679

执行以上单元格所花的时间:2.67 秒Time taken to execute above cell: 2.67 seconds

绘制 ROC 曲线。Plot the ROC curve.

在上一个单元格中,predictionAndLabelsDF 注册为表 tmp_resultsThe predictionAndLabelsDF is registered as a table, tmp_results, in the previous cell. tmp_results 可用于执行查询并将结果输出到 sqlResults 数据帧中用于绘图。tmp_results can be used to do queries and output results into the sqlResults data-frame for plotting. 代码如下。Here is the code.

# QUERY RESULTS                              
%%sql -q -o sqlResults
SELECT * from tmp_results

下面是进行预测并绘制 ROC 曲线的代码。Here is the code to make predictions and plot the ROC-curve.

# MAKE PREDICTIONS AND PLOT ROC-CURVE

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES                              
%%local
%matplotlib inline
from sklearn.metrics import roc_curve,auc

#PREDICTIONS
predictions_pddf = sqlResults.rename(columns={'_1': 'probability', '_2': 'label'})
prob = predictions_pddf["probability"] 
fpr, tpr, thresholds = roc_curve(predictions_pddf['label'], prob, pos_label=1);
roc_auc = auc(fpr, tpr)

# PLOT ROC CURVES
plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

OUTPUTOUTPUT

泛型方法的逻辑回归 ROC 曲线

在 blob 中保留模型以供将来使用Persist model in a blob for future consumption

本部分中的代码显示如何保存逻辑回归模型以供使用。The code in this section shows how to save the logistic regression model for consumption.

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.classification import LogisticRegressionModel

# PERSIST MODEL
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
logisticregressionfilename = "LogisticRegressionWithLBFGS_" + datestamp;
dirfilename = modelDir + logisticregressionfilename;

logitBest.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

OUTPUTOUTPUT

执行以上单元格所花的时间:34.57 秒Time taken to execute above cell: 34.57 seconds

将 MLlib 的 CrossValidator 管道函数与逻辑回归(弹性回归)模型一起使用Use MLlib's CrossValidator pipeline function with logistic regression (Elastic regression) model

本部分中的代码显示如何使用 LBFGS 训练、评估和保存逻辑回归模型,该模型在 NYC 出租车行程和车费数据集中预测某个行程是否支付小费。The code in this section shows how to train, evaluate, and save a logistic regression model with LBFGS that predicts whether or not a tip is paid for a trip in the NYC taxi trip and fare dataset. 使用交叉验证 (CV) 和通过 MLlib CrossValidator 管道函数(用于使用参数扫描的 CV)实现的超参数扫描训练模型。The model is trained using cross validation (CV) and hyperparameter sweeping implemented with the MLlib CrossValidator pipeline function for CV with parameter sweep.

备注

此 MLlib CV 代码的执行可能需要几分钟。The execution of this MLlib CV code can take several minutes.

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.metrics import roc_curve,auc

# DEFINE ALGORITHM / MODEL
lr = LogisticRegression()

# DEFINE GRID PARAMETERS
paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
                              .addGrid(lr.maxIter, (5, 10))\
                              .addGrid(lr.tol, (1e-4, 1e-5))\
                              .addGrid(lr.elasticNetParam, (0.25,0.75))\
                              .build()

# DEFINE CV WITH PARAMETER SWEEP
cv = CrossValidator(estimator= lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=BinaryClassificationEvaluator(),
                    numFolds=3)

# CONVERT TO DATA-FRAME: THIS DOES NOT RUN ON RDDs
trainDataFrame = sqlContext.createDataFrame(oneHotTRAINbinary, ["features", "label"])

# TRAIN WITH CROSS-VALIDATION
cv_model = cv.fit(trainDataFrame)


## PREDICT AND EVALUATE ON TEST DATA-SET

# USE TEST DATASET FOR PREDICTION
testDataFrame = sqlContext.createDataFrame(oneHotTESTbinary, ["features", "label"])
test_predictions = cv_model.transform(testDataFrame)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

OUTPUTOUTPUT

执行以上单元格所花的时间:107.98 秒Time taken to execute above cell: 107.98 seconds

绘制 ROC 曲线。Plot the ROC curve.

在上一个单元格中,predictionAndLabelsDF 注册为表 tmp_resultsThe predictionAndLabelsDF is registered as a table, tmp_results, in the previous cell. tmp_results 可用于执行查询并将结果输出到 sqlResults 数据帧中用于绘图。tmp_results can be used to do queries and output results into the sqlResults data-frame for plotting. 代码如下。Here is the code.

# QUERY RESULTS
%%sql -q -o sqlResults
SELECT label, prediction, probability from tmp_results

下面是用于绘制 ROC 曲线的代码。Here is the code to plot the ROC curve.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES 
%%local
from sklearn.metrics import roc_curve,auc

# ROC CURVE
prob = [x["values"][1] for x in sqlResults["probability"]]
fpr, tpr, thresholds = roc_curve(sqlResults['label'], prob, pos_label=1);
roc_auc = auc(fpr, tpr)

#PLOT
plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

OUTPUTOUTPUT

使用 MLlib 的 CrossValidator 的逻辑回归 ROC 曲线

随机林分类Random forest classification

本部分中的代码显示如何训练、评估和保存随机林回归,该模型在 NYC 出租车行程和车费数据集中预测某个行程是否支付小费。The code in this section shows how to train, evaluate, and save a random forest regression that predicts whether or not a tip is paid for a trip in the NYC taxi trip and fare dataset.

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

# TRAIN RANDOMFOREST MODEL
rfModel = RandomForest.trainClassifier(indexedTRAINbinary, numClasses=2, 
                                       categoricalFeaturesInfo=categoricalFeaturesInfo,
                                       numTrees=25, featureSubsetStrategy="auto",
                                       impurity='gini', maxDepth=5, maxBins=32)
## UN-COMMENT IF YOU WANT TO PRING TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = rfModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# AREA UNDER ROC CURVE
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfclassificationfilename = "RandomForestClassification_" + datestamp;
dirfilename = modelDir + rfclassificationfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

ROC = 0.985336538462 下的面积Area under ROC = 0.985336538462

执行以上单元格所花的时间:26.72 秒Time taken to execute above cell: 26.72 seconds

梯度提升树分类Gradient boosting trees classification

本部分中的代码显示如何训练、评估和保存渐变提升树模型,该模型在 NYC 出租车行程和车费数据集中预测某个行程是否支付小费。The code in this section shows how to train, evaluate, and save a gradient boosting trees model that predicts whether or not a tip is paid for a trip in the NYC taxi trip and fare dataset.

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

gbtModel = GradientBoostedTrees.trainClassifier(indexedTRAINbinary, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                                numIterations=10)
## UNCOMMENT IF YOU WANT TO PRINT TREE DETAILS
#print('Learned classification GBT model:')
#print(bgtModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = gbtModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# Area under ROC curve
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN A BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btclassificationfilename = "GradientBoostingTreeClassification_" + datestamp;
dirfilename = modelDir + btclassificationfilename;

gbtModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

ROC = 0.985336538462 下的面积Area under ROC = 0.985336538462

执行以上单元格所花的时间:28.13 秒Time taken to execute above cell: 28.13 seconds

通过回归模型预测小费金额(不使用 CV)Predict tip amount with regression models (not using CV)

本部分介绍如何针对以下回归任务使用三个模型:基于其他小费特征预测为某个出租车行程支付的小费金额。This section shows how use three models for the regression task: predict the tip amount paid for a taxi trip based on other tip features. 显示的模型包括:The models presented are:

  • 正则化线性回归Regularized linear regression
  • 随机林Random forest
  • 梯度提升树Gradient Boosting Trees

简介中介绍了这些模型。These models were described in the introduction. 每个模型生成代码部分拆分为多个步骤:Each model building code section is split into steps:

  1. 带有一个参数集的模型训练数据Model training data with one parameter set
  2. 测试数据集上的使用指标的模型评估Model evaluation on a test data set with metrics
  3. 在 blob 中保存模型以供将来使用Saving model in blob for future consumption

备注

交叉验证在本部分中不与三个回归模型一起使用,因为已针对逻辑回归模型详细显示此操作。Cross-validation is not used with the three regression models in this section, since this was shown in detail for the logistic regression models. 本主题的附录中提供了显示如何将 CV 和弹性网络一起用于线性回归的示例。An example showing how to use CV with Elastic Net for linear regression is provided in the Appendix of this topic.

备注

根据我们的经验,LinearRegressionWithSGD 模型的收敛可能出现问题,需要仔细更改/优化参数以获取有效的模型。In our experience, there can be issues with convergence of LinearRegressionWithSGD models, and parameters need to be changed/optimized carefully for obtaining a valid model. 变量的缩放对收敛帮助很大。Scaling of variables significantly helps with convergence. 还可使用本主题的附录中显示的弹性网络回归代替 LinearRegressionWithSGD。Elastic net regression, shown in the Appendix to this topic, can also be used instead of LinearRegressionWithSGD.

使用 SGD 的线性回归Linear regression with SGD

本部分中的代码显示如何使用缩放特征训练使用随机梯度下降线性回归 (SGD) 进行优化的线性回归,以及如何在 Azure Blob 存储 (WASB) 中评分、评估和保存模型。The code in this section shows how to use scaled features to train a linear regression that uses stochastic gradient descent (SGD) for optimization, and how to score, evaluate, and save the model in Azure Blob Storage (WASB).

提示

根据我们的经验,LinearRegressionWithSGD 模型的收敛可能出现问题,需要仔细更改/优化参数以获取有效的模型。In our experience, there can be issues with the convergence of LinearRegressionWithSGD models, and parameters need to be changed/optimized carefully for obtaining a valid model. 变量的缩放对收敛帮助很大。Scaling of variables significantly helps with convergence.

# LINEAR REGRESSION WITH SGD 

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD LIBRARIES
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics
from scipy import stats

# USE SCALED FEATURES TO TRAIN MODEL
linearModel = LinearRegressionWithSGD.train(oneHotTRAINregScaled, iterations=100, step = 0.1, regType='l2', regParam=0.1, intercept = True)

# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features, 
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(linearModel.weights))
print("Intercept: " + str(linearModel.intercept))

# SCORE ON SCALED TEST DATA-SET & EVALUATE
predictionAndLabels = oneHotTESTregScaled.map(lambda lp: (float(linearModel.predict(lp.features)), lp.label))
testMetrics = RegressionMetrics(predictionAndLabels)

print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
linearregressionfilename = "LinearRegressionWithSGD_" + datestamp;
dirfilename = modelDir + linearregressionfilename;

linearModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

系数:[0.0141707753435, -0.0252930927087, -0.0231442517137, 0.247070902996, 0.312544147152, 0.360296120645, 0.0122079566092, -0.00456498588241, -0.0898228505177, 0.0714046248793, 0.102171263868, 0.100022455632, -0.00289545676449, -0.00791124681938, 0.54396316518, -0.536293513569, 0.0119076553369, -0.0173039244582, 0.0119632796147, 0.00146764882502]Coefficients: [0.0141707753435, -0.0252930927087, -0.0231442517137, 0.247070902996, 0.312544147152, 0.360296120645, 0.0122079566092, -0.00456498588241, -0.0898228505177, 0.0714046248793, 0.102171263868, 0.100022455632, -0.00289545676449, -0.00791124681938, 0.54396316518, -0.536293513569, 0.0119076553369, -0.0173039244582, 0.0119632796147, 0.00146764882502]

截距:0.854507624459Intercept: 0.854507624459

RMSE = 1.23485131376RMSE = 1.23485131376

R-sqr = 0.597963951127R-sqr = 0.597963951127

执行以上单元格所花的时间:38.62 秒Time taken to execute above cell: 38.62 seconds

随机林回归Random Forest regression

本部分中的代码显示如何训练、评估和保存随机林模型,该模型预测 NYC 出租车行程数据的小费金额。The code in this section shows how to train, evaluate, and save a random forest model that predicts tip amount for the NYC taxi trip data.

备注

附录中提供通过使用自定义代码的超参数扫描的交叉验证。Cross-validation with parameter sweeping using custom code is provided in the appendix.

#PREDICT TIP AMOUNTS USING RANDOM FOREST

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics


# TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
rfModel = RandomForest.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                    numTrees=25, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=10, maxBins=32)
# UN-COMMENT IF YOU WANT TO PRING TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

# PREDICT AND EVALUATE ON TEST DATA-SET
predictions = rfModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = oneHotTESTreg.map(lambda lp: lp.label).zip(predictions)

testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfregressionfilename = "RandomForestRegression_" + datestamp;
dirfilename = modelDir + rfregressionfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

RMSE = 0.931981967875RMSE = 0.931981967875

R-sqr = 0.733445485802R-sqr = 0.733445485802

执行以上单元格所花的时间:25.98 秒Time taken to execute above cell: 25.98 seconds

梯度提升树回归Gradient boosting trees regression

本部分中的代码显示如何训练、评估和保存梯度提升树模型,该模型预测 NYC 出租车行程数据的小费金额。The code in this section shows how to train, evaluate, and save a gradient boosting trees model that predicts tip amount for the NYC taxi trip data.

训练和评估Train and evaluate

#PREDICT TIP AMOUNTS USING GRADIENT BOOSTING TREES

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils

# TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
gbtModel = GradientBoostedTrees.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo, 
                                                numIterations=10, maxBins=32, maxDepth = 4, learningRate=0.1)

# EVALUATE A TEST DATA-SET
predictions = gbtModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = indexedTESTreg.map(lambda lp: lp.label).zip(predictions)

testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# PLOT SCATTER-PLOT BETWEEN ACTUAL AND PREDICTED TIP VALUES
test_predictions= sqlContext.createDataFrame(predictionAndLabels)
test_predictions_pddf = test_predictions.toPandas()

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btregressionfilename = "GradientBoostingTreeRegression_" + datestamp;
dirfilename = modelDir + btregressionfilename;
gbtModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

RMSE = 0.928172197114RMSE = 0.928172197114

R-sqr = 0.732680354389R-sqr = 0.732680354389

执行以上单元格所花的时间:20.9 秒Time taken to execute above cell: 20.9 seconds

Plot

tmp_results 在上一个单元格中注册为 Hive 表。tmp_results is registered as a Hive table in the previous cell. 表中的结果输出到 sqlResults 数据帧中用于绘图。Results from the table are output into the sqlResults data-frame for plotting. 代码如下Here is the code

# PLOT SCATTER-PLOT BETWEEN ACTUAL AND PREDICTED TIP VALUES

# SELECT RESULTS
%%sql -q -o sqlResults
SELECT * from tmp_results

下面是用于使用 Jupyter 服务器绘制数据的代码。Here is the code to plot the data using the Jupyter server.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
import numpy as np

# PLOT
ax = sqlResults.plot(kind='scatter', figsize = (6,6), x='_1', y='_2', color='blue', alpha = 0.25, label='Actual vs. predicted');
fit = np.polyfit(sqlResults['_1'], sqlResults['_2'], deg=1)
ax.set_title('Actual vs. Predicted Tip Amounts ($)')
ax.set_xlabel("Actual")
ax.set_ylabel("Predicted")
ax.plot(sqlResults['_1'], fit[0] * sqlResults['_1'] + fit[1], color='magenta')
plt.axis([-1, 15, -1, 15])
plt.show(ax)

Actual-vs-predicted-tip-amounts

附录:使用交叉验证和参数扫描的其他回归任务Appendix: Additional regression tasks using cross validation with parameter sweeps

本附录包含的代码显示如何为线性回归使用弹性网络执行 CV 以及如何为随机林回归使用自定义代码执行使用参数扫描的 CV。This appendix contains code showing how to do CV using Elastic net for linear regression and how to do CV with parameter sweep using custom code for random forest regression.

用于线性回归的使用弹性网络的交叉验证Cross validation using Elastic net for linear regression

本部分中的代码显示如何为线性回归使用弹性网络执行交叉验证以及如何参照测试数据评估模型。The code in this section shows how to do cross validation using Elastic net for linear regression and how to evaluate the model against test data.

###  CV USING ELASTIC NET FOR LINEAR REGRESSION

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# DEFINE ALGORITHM/MODEL
lr = LinearRegression()

# DEFINE GRID PARAMETERS
paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
                              .addGrid(lr.maxIter, (5, 10))\
                              .addGrid(lr.tol, (1e-4, 1e-5))\
                              .addGrid(lr.elasticNetParam, (0.25,0.75))\
                              .build() 

# DEFINE PIPELINE 
# SIMPLY THE MODEL HERE, WITHOUT TRANSFORMATIONS
pipeline = Pipeline(stages=[lr])

# DEFINE CV WITH PARAMETER SWEEP
cv = CrossValidator(estimator= lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=RegressionEvaluator(),
                    numFolds=3)

# CONVERT TO DATA FRAME, AS CROSSVALIDATOR WON'T RUN ON RDDS
trainDataFrame = sqlContext.createDataFrame(oneHotTRAINreg, ["features", "label"])

# TRAIN WITH CROSS-VALIDATION
cv_model = cv.fit(trainDataFrame)


# EVALUATE MODEL ON TEST SET
testDataFrame = sqlContext.createDataFrame(oneHotTESTreg, ["features", "label"])

# MAKE PREDICTIONS ON TEST DOCUMENTS
# cvModel uses the best model found (lrModel).
predictionAndLabels = cv_model.transform(testDataFrame)

# CONVERT TO DF AND SAVE REGISTER DF AS TABLE
predictionAndLabels.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

执行以上单元格所花的时间:161.21 秒Time taken to execute above cell: 161.21 seconds

使用 R-SQR 指标评估Evaluate with R-SQR metric

tmp_results 在上一个单元格中注册为 Hive 表。tmp_results is registered as a Hive table in the previous cell. 表中的结果输出到 sqlResults 数据帧中用于绘图。Results from the table are output into the sqlResults data-frame for plotting. 代码如下Here is the code

# SELECT RESULTS
%%sql -q -o sqlResults
SELECT label,prediction from tmp_results

下面是用于计算 R-sqr 的代码。Here is the code to calculate R-sqr.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
from scipy import stats

#R-SQR TEST METRIC
corstats = stats.linregress(sqlResults['label'],sqlResults['prediction'])
r2 = (corstats[2]*corstats[2])
print("R-sqr = %s" % r2)

OUTPUTOUTPUT

R-sqr = 0.619184907088R-sqr = 0.619184907088

用于随机林回归的通过使用自定义代码的参数扫描的交叉验证Cross validation with parameter sweep using custom code for random forest regression

本部分中的代码显示如何为线性回归通过使用自定义代码的参数扫描执行交叉验证以及如何参照测试数据评估模型。The code in this section shows how to do cross validation with parameter sweep using custom code for random forest regression and how to evaluate the model against test data.

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
# GET ACCURARY FOR HYPERPARAMETERS BASED ON CROSS-VALIDATION IN TRAINING DATA-SET
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics
from sklearn.grid_search import ParameterGrid

## CREATE PARAMETER GRID
grid = [{'maxDepth': [5,10], 'numTrees': [25,50]}]
paramGrid = list(ParameterGrid(grid))

## SPECIFY LEVELS OF CATEGORICAL VARIBLES
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

# SPECIFY NUMFOLDS AND ARRAY TO HOLD METRICS
nFolds = 3;
numModels = len(paramGrid)
h = 1.0 / nFolds;
metricSum = np.zeros(numModels);

for i in range(nFolds):
    # Create training and x-validation sets
    validateLB = i * h
    validateUB = (i + 1) * h
    condition = (trainData["rand"] >= validateLB) & (trainData["rand"] < validateUB)
    validation = trainData.filter(condition)
    # Create labeled points from data-frames
    if i > 0:
        trainCVLabPt.unpersist()
        validationLabPt.unpersist()
    trainCV = trainData.filter(~condition)
    trainCVLabPt = trainCV.map(parseRowIndexingRegression)
    trainCVLabPt.cache()
    validationLabPt = validation.map(parseRowIndexingRegression)
    validationLabPt.cache()
    # For parameter sets compute metrics from x-validation
    for j in range(numModels):
        maxD = paramGrid[j]['maxDepth']
        numT = paramGrid[j]['numTrees']
        # Train logistic regression model with hypermarameter set
        rfModel = RandomForest.trainRegressor(trainCVLabPt, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                    numTrees=numT, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=maxD, maxBins=32)
        predictions = rfModel.predict(validationLabPt.map(lambda x: x.features))
        predictionAndLabels = validationLabPt.map(lambda lp: lp.label).zip(predictions)
        # Use ROC-AUC as accuracy metrics
        validMetrics = RegressionMetrics(predictionAndLabels)
        metric = validMetrics.rootMeanSquaredError
        metricSum[j] += metric

avgAcc = metricSum/nFolds;
bestParam = paramGrid[np.argmin(avgAcc)];

# UNPERSIST OBJECTS
trainCVLabPt.unpersist()
validationLabPt.unpersist()

## TRAIN FINAL MODL WIHT BEST PARAMETERS
rfModel = RandomForest.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                    numTrees=bestParam['numTrees'], featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=bestParam['maxDepth'], maxBins=32)

# EVALUATE MODEL ON TEST DATA
predictions = rfModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = indexedTESTreg.map(lambda lp: lp.label).zip(predictions)

#PRINT TEST METRICS
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

OUTPUTOUTPUT

RMSE = 0.906972198262RMSE = 0.906972198262

R-sqr = 0.740751197012R-sqr = 0.740751197012

执行以上单元格所花的时间:69.17 秒Time taken to execute above cell: 69.17 seconds

从内存中清除对象并打印模型位置Clean up objects from memory and print model locations

使用 unpersist() 删除内存中缓存的对象。Use unpersist() to delete objects cached in memory.

# UNPERSIST OBJECTS CACHED IN MEMORY

# REMOVE ORIGINAL DFs
taxi_df_train_cleaned.unpersist()
taxi_df_train_with_newFeatures.unpersist()
trainData.unpersist()
trainData.unpersist()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.unpersist()
indexedTESTbinary.unpersist()
oneHotTRAINbinary.unpersist()
oneHotTESTbinary.unpersist()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.unpersist()
indexedTESTreg.unpersist()
oneHotTRAINreg.unpersist()
oneHotTESTreg.unpersist()

# SCALED FEATURES
oneHotTRAINregScaled.unpersist()
oneHotTESTregScaled.unpersist()

OUTPUTOUTPUT

PythonRDD[122] at RDD at PythonRDD.scala:43PythonRDD[122] at RDD at PythonRDD.scala: 43

输出要在使用笔记本中使用的模型文件的路径。Output path to model files to be used in the consumption notebook. 若要使用独立数据集和为其评分,则需要在“使用笔记本”中复制并粘贴这些文件名。 To consume and score an independent data-set, you need to copy and paste these file names in the "Consumption notebook".

# PRINT MODEL FILE LOCATIONS FOR CONSUMPTION
print "logisticRegFileLoc = modelDir + \"" + logisticregressionfilename + "\"";
print "linearRegFileLoc = modelDir + \"" + linearregressionfilename + "\"";
print "randomForestClassificationFileLoc = modelDir + \"" + rfclassificationfilename + "\"";
print "randomForestRegFileLoc = modelDir + \"" + rfregressionfilename + "\"";
print "BoostedTreeClassificationFileLoc = modelDir + \"" + btclassificationfilename + "\"";
print "BoostedTreeRegressionFileLoc = modelDir + \"" + btregressionfilename + "\"";

OUTPUTOUTPUT

logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-05-0316_47_30.096528"logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-05-0316_47_30.096528"

linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-05-0316_51_28.433670"linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-05-0316_51_28.433670"

randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-05-0316_50_17.454440"randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-05-0316_50_17.454440"

randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-05-0316_51_57.331730"randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-05-0316_51_57.331730"

BoostedTreeClassificationFileLoc = modelDir + "GradientBoostingTreeClassification_2016-05-0316_50_40.138809"BoostedTreeClassificationFileLoc = modelDir + "GradientBoostingTreeClassification_2016-05-0316_50_40.138809"

BoostedTreeRegressionFileLoc = modelDir + "GradientBoostingTreeRegression_2016-05-0316_52_18.827237"BoostedTreeRegressionFileLoc = modelDir + "GradientBoostingTreeRegression_2016-05-0316_52_18.827237"

后续步骤What's next?

现在已使用 Spark MlLib 创建了回归和分类模型,可了解如何评分和评估这些模型。Now that you have created regression and classification models with the Spark MlLib, you are ready to learn how to score and evaluate these models.

使用模型: 若要了解如何评分和评估在本主题中创建的分类和回归模型,请参阅评分和评估 Spark 构建的机器学习模型Model consumption: To learn how to score and evaluate the classification and regression models created in this topic, see Score and evaluate Spark-built machine learning models.