使用 Spark 进行数据探索和建模Data exploration and modeling with Spark

了解如何使用 HDInsight Spark 训练机器学习模型以使用 Spark MLlib 预测出租车费用。Learn how to use HDInsight Spark to train machine learning models for taxi fare prediction using Spark MLlib.

本示例展示了 Team Data Science Process 中的各个步骤。This sample showcases the various steps in the Team Data Science Process. NYC 出租车行程和车费 2013 数据集的一个子集用于加载、探索和准备数据。A subset of the NYC taxi trip and fare 2013 dataset is used to load, explore and prepare data. 然后,使用 Spark MLlib,对二元分类和回归模型进行训练,以预测是否将为行程支付小费并估算小费金额。Then, using Spark MLlib, binary classification and regression models are trained to predict whether a tip will be paid for the trip and estimate the tip amount.

先决条件Prerequisites

需要一个 Azure 帐户和一个 Spark 1.6(或 Spark 2.0)HDInsight 群集来完成本演练。You need an Azure account and a Spark 1.6 (or Spark 2.0) HDInsight cluster to complete this walkthrough. 有关如何满足这些要求的说明,请参阅在 Azure HDInsight 上使用 Spark 的数据科学的概述See the Overview of Data Science using Spark on Azure HDInsight for instructions on how to satisfy these requirements. 该主题还包含此处使用的 NYC 2013 出租车数据的说明以及有关如何在 Spark 群集上执行来自 Jupyter 笔记本的代码的说明。That topic also contains a description of the NYC 2013 Taxi data used here and instructions on how to execute code from a Jupyter notebook on the Spark cluster.

Spark 群集和笔记本Spark clusters and notebooks

本演练中提供的设置步骤和代码适用于 HDInsight Spark 1.6。Setup steps and code are provided in this walkthrough for using an HDInsight Spark 1.6. 但是,Jupyter 笔记本是针对 HDInsight Spark 1.6 和 Spark 2.0 群集提供的。But Jupyter notebooks are provided for both HDInsight Spark 1.6 and Spark 2.0 clusters. 包含这些笔记本的 GitHub 存储库的 Readme.md 中提供了这些笔记本的说明和链接。A description of the notebooks and links to them are provided in the Readme.md for the GitHub repository containing them. 而且,此处和位于链接笔记本中的代码是泛型代码,应适用于任何 Spark 群集。Moreover, the code here and in the linked notebooks is generic and should work on any Spark cluster. 如果不使用 HDInsight Spark,群集设置和管理步骤可能与此处所示内容稍有不同。If you are not using HDInsight Spark, the cluster setup and management steps may be slightly different from what is shown here. 为方便起见,下面提供了适用于 Spark 1.6 的 Jupyter 笔记本的链接(要在 Jupyter 笔记本服务器的 pySpark 内核中运行)和适用于 Spark 2.0 的 Jupyter 笔记本的链接(要在 Jupyter 笔记本服务器的 pySpark3 内核中运行):For convenience, here are the links to the Jupyter notebooks for Spark 1.6 (to be run in the pySpark kernel of the Jupyter Notebook server) and Spark 2.0 (to be run in the pySpark3 kernel of the Jupyter Notebook server):

  • Spark 1.6 笔记本:提供有关如何使用多种不同算法执行数据探索、建模和评分的信息。Spark 1.6 notebooks: Provide information on how to perform data exploration, modeling, and scoring with several different algorithms.
  • Spark 2.0 笔记本:提供有关如何执行回归和分类任务的信息。Spark 2.0 notebooks: Provide information on how to perform regression and classification tasks. 数据集可能有所不同,但步骤和概念适用于各种数据集。Datasets may vary, but the steps and concepts are applicable to various datasets.

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

备注

下面的说明与使用 Spark 1.6 相关。The descriptions below are related to using Spark 1.6. 对于 Spark 2.0 版本,请使用上面所述和链接的笔记本。For Spark 2.0 versions, please use the notebooks described and linked above.

设置Setup

Spark 能够读取和写入 Azure 存储 Blob(也称为 WASB)。Spark is able to read and write to Azure Storage Blob (also known as WASB). 因此,存储在该处的任何现有数据都可以使用 Spark 处理,并将结果再次存储在 WASB 中。So any of your existing data stored there can be processed using Spark and the results stored again in WASB.

若要在 WASB 中保存模型或文件,需要正确指定路径。To save models or files in WASB, the path needs to be specified properly. 可使用开头为以下内容的路径,引用附加到 Spark 群集的默认容器:“wasb:///”。The default container attached to the Spark cluster can be referenced using a path beginning with: "wasb:///". 其他位置通过“wasb://”引用。Other locations are referenced by “wasb://”.

在 WASB 中为存储位置设置目录路径Set directory paths for storage locations in WASB

以下代码示例指定要读取的数据位置,以及用于保存模型输出的模型存储目录的路径:The following code sample specifies the location of the data to be read and the path for the model storage directory to which the model output is saved:

# SET PATHS TO FILE LOCATIONS: DATA AND MODEL STORAGE

# LOCATION OF TRAINING DATA
taxi_train_file_loc = "wasb://mllibwalkthroughs@cdspsparksamples.blob.core.chinacloudapi.cn/Data/NYCTaxi/JoinedTaxiTripFare.Point1Pct.Train.tsv";

# SET THE MODEL STORAGE DIRECTORY PATH 
# NOTE THAT THE FINAL BACKSLASH IN THE PATH IS NEEDED.
modelDir = "wasb:///user/remoteuser/NYCTaxi/Models/" 

导入库Import libraries

设置还要求导入必需的库。Set up also requires importing necessary libraries. 设置 Spark 上下文并使用以下代码导入必要的库:Set spark context and import necessary libraries with the following code:

# IMPORT LIBRARIES
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import atexit
from numpy import array
import numpy as np
import datetime

预设 Spark 上下文和 PySpark magicPreset Spark context and PySpark magics

与 Jupyter 笔记本一起提供的 PySpark 内核具有预设上下文。The PySpark kernels that are provided with Jupyter notebooks have a preset context. 因此在开始处理正在开发的应用程序前无需显式设置 Spark 或 Hive 上下文。So you do not need to set the Spark or Hive contexts explicitly before you start working with the application you are developing. 这些上下文默认可用。These contexts are available for you by default. 这些上下文包括:These contexts are:

  • sc - 用于 Sparksc - for Spark
  • sqlContext - 用于 HivesqlContext - for Hive

PySpark 内核提供一些预定义的“magic”,这是可以结合 %% 调用的特殊命令。The PySpark kernel provides some predefined “magics”, which are special commands that you can call with %%. 在这些代码示例中使用的此类命令有两个。There are two such commands that are used in these code samples.

  • %%local 指定本地执行后续行中的代码。%%local Specifies that the code in subsequent lines is to be executed locally. 代码必须是有效的 Python 代码。Code must be valid Python code.
  • %%sql -o <variable name> 针对 sqlContext 执行 Hive 查询。%%sql -o <variable name> Executes a Hive query against the sqlContext. 如果传递了 -o 参数,则查询的结果以 Pandas 数据帧的形式保存在 %%local Python 上下文中。If the -o parameter is passed, the result of the query is persisted in the %%local Python context as a Pandas DataFrame.

有关 Jupyter 笔记本内核和预定义“magic”的详细信息,请参阅适用于装有 HDInsight 上的 HDInsight Spark Linux 群集的 Jupyter 笔记本的内核For more information on Jupyter notebook kernels and the predefined "magics", see Kernels available for Jupyter notebooks with HDInsight Spark Linux clusters on HDInsight.

加载数据Load the data

数据科学过程的第一步是从源引入数据以供分析,使其驻留在数据探索和建模环境中。The first step in the data science process is to ingest the data to be analyzed from sources where is resides into your data exploration and modeling environment. 在本演练中环境为 Spark。The environment is Spark in this walkthrough. 本部分包含要完成一系列任务的代码:This section contains the code to complete a series of tasks:

  • 引入数据样本以供建模ingest the data sample to be modeled
  • 读取输入数据集(以 .tsv 文件的形式存储)read in the input dataset (stored as a .tsv file)
  • 格式化并清理数据format and clean the data
  • 在内存中创建并缓存对象(RDD 或数据帧)create and cache objects (RDDs or data-frames) in memory
  • 在 SQL 上下文中将其注册为临时表。register it as a temp-table in SQL-context.

下面是用于数据引入的代码。Here is the code for data ingestion.

# INGEST DATA

# RECORD START TIME
timestart = datetime.datetime.now()

# IMPORT FILE FROM PUBLIC BLOB
taxi_train_file = sc.textFile(taxi_train_file_loc)

# GET SCHEMA OF THE FILE FROM HEADER
schema_string = taxi_train_file.first()
fields = [StructField(field_name, StringType(), True) for field_name in schema_string.split('\t')]
fields[7].dataType = IntegerType() #Pickup hour
fields[8].dataType = IntegerType() # Pickup week
fields[9].dataType = IntegerType() # Weekday
fields[10].dataType = IntegerType() # Passenger count
fields[11].dataType = FloatType() # Trip time in secs
fields[12].dataType = FloatType() # Trip distance
fields[19].dataType = FloatType() # Fare amount
fields[20].dataType = FloatType() # Surcharge
fields[21].dataType = FloatType() # Mta_tax
fields[22].dataType = FloatType() # Tip amount
fields[23].dataType = FloatType() # Tolls amount
fields[24].dataType = FloatType() # Total amount
fields[25].dataType = IntegerType() # Tipped or not
fields[26].dataType = IntegerType() # Tip class
taxi_schema = StructType(fields)

# PARSE FIELDS AND CONVERT DATA TYPE FOR SOME FIELDS
taxi_header = taxi_train_file.filter(lambda l: "medallion" in l)
taxi_temp = taxi_train_file.subtract(taxi_header).map(lambda k: k.split("\t"))\
        .map(lambda p: (p[0],p[1],p[2],p[3],p[4],p[5],p[6],int(p[7]),int(p[8]),int(p[9]),int(p[10]),
                        float(p[11]),float(p[12]),p[13],p[14],p[15],p[16],p[17],p[18],float(p[19]),
                        float(p[20]),float(p[21]),float(p[22]),float(p[23]),float(p[24]),int(p[25]),int(p[26])))


# CREATE DATA FRAME
taxi_train_df = sqlContext.createDataFrame(taxi_temp, taxi_schema)

# CREATE A CLEANED DATA-FRAME BY DROPPING SOME UN-NECESSARY COLUMNS & FILTERING FOR UNDESIRED VALUES OR OUTLIERS
taxi_df_train_cleaned = taxi_train_df.drop('medallion').drop('hack_license').drop('store_and_fwd_flag').drop('pickup_datetime')\
    .drop('dropoff_datetime').drop('pickup_longitude').drop('pickup_latitude').drop('dropoff_latitude')\
    .drop('dropoff_longitude').drop('tip_class').drop('total_amount').drop('tolls_amount').drop('mta_tax')\
    .drop('direct_distance').drop('surcharge')\
    .filter("passenger_count > 0 and passenger_count < 8 AND payment_type in ('CSH', 'CRD') AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 150 AND trip_distance > 0 AND trip_distance < 100 AND trip_time_in_secs > 30 AND trip_time_in_secs < 7200" )


# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
taxi_df_train_cleaned.cache()
taxi_df_train_cleaned.count()

# REGISTER DATA-FRAME AS A TEMP-TABLE IN SQL-CONTEXT
taxi_df_train_cleaned.registerTempTable("taxi_train")

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

输出:OUTPUT:

执行以上单元格所花的时间:51.72 秒Time taken to execute above cell: 51.72 seconds

浏览数据Explore the data

将数据引入到 Spark 中后,数据科学过程的下一步是通过探索和可视化更深入地了解数据。Once the data has been brought into Spark, the next step in the data science process is to gain deeper understanding of the data through exploration and visualization. 在本部分中,我们使用 SQL 查询检查出租车数据,并绘制目标变量和预期特征以供视觉检查。In this section, we examine the taxi data using SQL queries and plot the target variables and prospective features for visual inspection. 具体而言,我们绘制出租车行程中乘客数的频率、小费金额的频率以及小费如何随支付金额和类型而变化。Specifically, we plot the frequency of passenger counts in taxi trips, the frequency of tip amounts, and how tips vary by payment amount and type.

绘制出租车小费样本中乘客数频率的直方图Plot a histogram of passenger count frequencies in the sample of taxi trips

此代码和后续代码段使用 SQL magic 查询样本,使用本地 magic 绘制数据。This code and subsequent snippets use SQL magic to query the sample and local magic to plot the data.

  • SQL magic (%%sql) HDInsight PySpark 内核支持针对 sqlContext 的轻松内联 HiveQL 查询。SQL magic (%%sql) The HDInsight PySpark kernel supports easy inline HiveQL queries against the sqlContext. (-o VARIABLE_NAME) 参数在 Jupyter 服务器上将 SQL 查询的输出保留为 Pandas 数据帧。The (-o VARIABLE_NAME) argument persists the output of the SQL query as a Pandas DataFrame on the Jupyter server. 此设置使输出在本地模式下可用。This setting makes the output available in the local mode.
  • %%local magic 用于在 Jupyter 服务器上本地运行代码,该服务器是 HDInsight 群集的头节点。The %%local magic is used to run code locally on the Jupyter server, which is the headnode of the HDInsight cluster. 通常,将 %%local magic 与 %%sql magic 和 -o 参数结合使用。Typically, you use %%local magic in conjunction with the %%sql magic with -o parameter. -o 参数将本地保留 SQL 查询的输出,然后 %%local magic 将触发下一组代码片段,以针对本地保留的 SQL 查询输出本地运行The -o parameter would persist the output of the SQL query locally and then %%local magic would trigger the next set of code snippet to run locally against the output of the SQL queries that is persisted locally

该输出在运行代码后自动可视化。The output is automatically visualized after you run the code.

此查询通过乘客数检索行程。This query retrieves the trips by passenger count.

# PLOT FREQUENCY OF PASSENGER COUNTS IN TAXI TRIPS

# HIVEQL QUERY AGAINST THE sqlContext
%%sql -q -o sqlResults
SELECT passenger_count, COUNT(*) as trip_counts 
FROM taxi_train 
WHERE passenger_count > 0 and passenger_count < 7 
GROUP BY passenger_count 

此代码从查询输出创建本地数据帧,并绘制数据。This code creates a local data-frame from the query output and plots the data. %%local magic 创建本地数据帧 sqlResults,可用于使用 matplotlib 进行绘制。The %%local magic creates a local data-frame, sqlResults, which can be used for plotting with matplotlib.

备注

此 PySpark magic 在本演练中多次使用。This PySpark magic is used multiple times in this walkthrough. 如果数据量很大,应采样以创建本地内存可容纳的数据帧。If the amount of data is large, you should sample to create a data-frame that can fit in local memory.

#CREATE LOCAL DATA-FRAME AND USE FOR MATPLOTLIB PLOTTING

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# USE THE JUPYTER AUTO-PLOTTING FEATURE TO CREATE INTERACTIVE FIGURES. 
# CLICK ON THE TYPE OF PLOT TO BE GENERATED (E.G. LINE, AREA, BAR ETC.)
sqlResults

下面是用于按乘客数绘制行程的代码Here is the code to plot the trips by passenger counts

# PLOT PASSENGER NUMBER VS. TRIP COUNTS
%%local
import matplotlib.pyplot as plt
%matplotlib inline

x_labels = sqlResults['passenger_count'].values
fig = sqlResults[['trip_counts']].plot(kind='bar', facecolor='lightblue')
fig.set_xticklabels(x_labels)
fig.set_title('Counts of trips by passenger count')
fig.set_xlabel('Passenger count in trips')
fig.set_ylabel('Trip counts')
plt.show()

输出:OUTPUT:

按乘客数的行程频率

可通过使用笔记本中的“类型”菜单按钮从多个不同类型的可视化(表、饼图、线图、面积图或条形图)中选择。You can select among several different types of visualizations (Table, Pie, Line, Area, or Bar) by using the Type menu buttons in the notebook. 此处显示了条形图。The Bar plot is shown here.

绘制小费金额以及小费金额如何随乘客数和车费金额变化的直方图。Plot a histogram of tip amounts and how tip amount varies by passenger count and fare amounts.

使用 SQL 查询采样数据。Use a SQL query to sample data.

# PLOT HISTOGRAM OF TIP AMOUNTS AND VARIATION BY PASSENGER COUNT AND PAYMENT TYPE

# HIVEQL QUERY AGAINST THE sqlContext
%%sql -q -o sqlResults
SELECT fare_amount, passenger_count, tip_amount, tipped 
FROM taxi_train 
WHERE passenger_count > 0 
AND passenger_count < 7 
AND fare_amount > 0 
AND fare_amount < 200 
AND payment_type in ('CSH', 'CRD') 
AND tip_amount > 0 
AND tip_amount < 25

此代码单元格使用 SQL 查询创建三个数据图。This code cell uses the SQL query to create three plots the data.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# HISTOGRAM OF TIP AMOUNTS AND PASSENGER COUNT
ax1 = sqlResults[['tip_amount']].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('Tip amount distribution')
ax1.set_xlabel('Tip Amount ($)')
ax1.set_ylabel('Counts')
plt.suptitle('')
plt.show()

# TIP BY PASSENGER COUNT
ax2 = sqlResults.boxplot(column=['tip_amount'], by=['passenger_count'])
ax2.set_title('Tip amount by Passenger count')
ax2.set_xlabel('Passenger count')
ax2.set_ylabel('Tip Amount ($)')
plt.suptitle('')
plt.show()

# TIP AMOUNT BY FARE AMOUNT, POINTS ARE SCALED BY PASSENGER COUNT
ax = sqlResults.plot(kind='scatter', x= 'fare_amount', y = 'tip_amount', c='blue', alpha = 0.10, s=5*(sqlResults.passenger_count))
ax.set_title('Tip amount by Fare amount')
ax.set_xlabel('Fare Amount ($)')
ax.set_ylabel('Tip Amount ($)')
plt.axis([-2, 100, -2, 20])
plt.show()

输出:OUTPUT:

小费金额分布

按乘客数的小费金额

按车费金额的小费金额

准备数据Prepare the data

本部分介绍并提供使数据准备好在 ML 建模中使用的过程的代码。This section describes and provides the code for procedures used to prepare data for use in ML modeling. 它介绍如何执行以下任务:It shows how to do the following tasks:

  • 通过将小时装入交通时间存储桶来创建新特征Create a new feature by binning hours into traffic time buckets
  • 为分类特征编制索引并进行编码Index and encode categorical features
  • 创建标签点对象用于输入到 ML 函数中Create labeled point objects for input into ML functions
  • 创建数据的随机子采样,并将其拆分为训练集和测试集Create a random subsampling of the data and split it into training and testing sets
  • 特征缩放Feature scaling
  • 在内存中缓存对象Cache objects in memory

通过将小时装入交通时间存储桶来创建新特征Create a new feature by binning hours into traffic time buckets

此代码显示如何通过将小时装入交通时间存储桶创建新特征,以及如何在内存中缓存生成的数据帧。This code shows how to create a new feature by binning hours into traffic time buckets and then how to cache the resulting data frame in memory. 当重复使用弹性分布式数据集 (RDD) 和数据帧时,缓存导致执行时间改善。Where Resilient Distributed Datasets (RDDs) and data-frames are used repeatedly, caching leads to improved execution times. 相应地,我们在演练中的多个阶段缓存 RDD 和数据帧。Accordingly, we cache RDDs and data-frames at several stages in the walkthrough.

# CREATE FOUR BUCKETS FOR TRAFFIC TIMES
sqlStatement = """
    SELECT *,
    CASE
     WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night" 
     WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush" 
     WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
     WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
    END as TrafficTimeBins
    FROM taxi_train 
"""
taxi_df_train_with_newFeatures = sqlContext.sql(sqlStatement)

# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
# THE .COUNT() GOES THROUGH THE ENTIRE DATA-FRAME,
# MATERIALIZES IT IN MEMORY, AND GIVES THE COUNT OF ROWS.
taxi_df_train_with_newFeatures.cache()
taxi_df_train_with_newFeatures.count()

输出:OUTPUT:

126050126050

为分类特征编制索引并进行编码以输入到建模函数中Index and encode categorical features for input into modeling functions

本部分介绍如何为分类特征编制索引或进行编码以输入到建模函数中。This section shows how to index or encode categorical features for input into the modeling functions. MLlib 的建模和预测函数需要带有分类输入数据的特征在使用前编制索引或进行编码。The modeling and predict functions of MLlib require features with categorical input data to be indexed or encoded prior to use. 根据模型,需要以不同方式为它们编制索引或进行编码:Depending on the model, you need to index or encode them in different ways:

  • 基于树的建模需要将类别编码为数值(例如,具有三个类别的特征可能会使用 0、1、2 进行编码)。Tree-based modeling requires categories to be encoded as numerical values (for example, a feature with three categories may be encoded with 0, 1, 2). 此算法由 MLlib 的 StringIndexer 函数提供。This algorithm is provided by MLlib’s StringIndexer function. 此函数将标签的字符串列编码为标签索引列,它们将按标签频率进行排序。This function encodes a string column of labels to a column of label indices that are ordered by label frequencies. 虽然使用输入和数据处理的数值编制索引,但可指定基于树的算法,将其视为类别进行相应处理。Although indexed with numerical values for input and data handling, the tree-based algorithms can be specified to treat them appropriately as categories.
  • 逻辑和线性回归模型需要独热编码,例如,具有三个类别的特征可扩展为三个特征列,每个根据观察的类别包含 0 或 1。Logistic and Linear Regression models require one-hot encoding, where, for example, a feature with three categories can be expanded into three feature columns, with each containing 0 or 1 depending on the category of an observation. MLlib 提供 OneHotEncoder 函数用于执行独热编码。MLlib provides OneHotEncoder function to do one-hot encoding. 此编码器将标签索引列映射到二元向量列,该列最多只有单个值。This encoder maps a column of label indices to a column of binary vectors, with at most a single one-value. 此编码允许将预期数值特征的算法(如逻辑回归)应用到分类特征。This encoding allows algorithms that expect numerical valued features, such as logistic regression, to be applied to categorical features.

下面是用于为分类特征编制索引和编码的代码:Here is the code to index and encode categorical features:

# INDEX AND ENCODE CATEGORICAL FEATURES

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES    
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer

# INDEX AND ENCODE VENDOR_ID
stringIndexer = StringIndexer(inputCol="vendor_id", outputCol="vendorIndex")
model = stringIndexer.fit(taxi_df_train_with_newFeatures) # Input data-frame is the cleaned one from above
indexed = model.transform(taxi_df_train_with_newFeatures)
encoder = OneHotEncoder(dropLast=False, inputCol="vendorIndex", outputCol="vendorVec")
encoded1 = encoder.transform(indexed)

# INDEX AND ENCODE RATE_CODE
stringIndexer = StringIndexer(inputCol="rate_code", outputCol="rateIndex")
model = stringIndexer.fit(encoded1)
indexed = model.transform(encoded1)
encoder = OneHotEncoder(dropLast=False, inputCol="rateIndex", outputCol="rateVec")
encoded2 = encoder.transform(indexed)

# INDEX AND ENCODE PAYMENT_TYPE
stringIndexer = StringIndexer(inputCol="payment_type", outputCol="paymentIndex")
model = stringIndexer.fit(encoded2)
indexed = model.transform(encoded2)
encoder = OneHotEncoder(dropLast=False, inputCol="paymentIndex", outputCol="paymentVec")
encoded3 = encoder.transform(indexed)

# INDEX AND TRAFFIC TIME BINS
stringIndexer = StringIndexer(inputCol="TrafficTimeBins", outputCol="TrafficTimeBinsIndex")
model = stringIndexer.fit(encoded3)
indexed = model.transform(encoded3)
encoder = OneHotEncoder(dropLast=False, inputCol="TrafficTimeBinsIndex", outputCol="TrafficTimeBinsVec")
encodedFinal = encoder.transform(indexed)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:1.28 秒Time taken to execute above cell: 1.28 seconds

创建标签点对象用于输入到 ML 函数中Create labeled point objects for input into ML functions

本部分包含的代码显示如何将分类文本数据编制索引为标签点数据类型,并对其编码,以便它可用于训练和测试 MLlib 逻辑回归和其他分类模型。This section contains code that shows how to index categorical text data as a labeled point data type and encode it so that it can be used to train and test MLlib logistic regression and other classification models. 标签点对象是弹性分布式数据集 (RDD),格式化为 MLlib 中的大多数 ML 算法所需的输入数据。Labeled point objects are Resilient Distributed Datasets (RDD) formatted in a way that is needed as input data by most of ML algorithms in MLlib. 标签点是本地向量,可能密集,也可能稀疏,与标签/响应相关联。A labeled point is a local vector, either dense or sparse, associated with a label/response.

本部分包含的代码显示如何将分类文本数据编制索引为标签点数据类型,并对其编码,以便它可用于训练并测试 MLlib 逻辑回归和其他分类模型。This section contains code that shows how to index categorical text data as a labeled point data type and encode it so that it can be used to train and test MLlib logistic regression and other classification models. 标签点对象是弹性分布式数据集 (RDD),它们包含标签(目标/响应变量)和特征向量。Labeled point objects are Resilient Distributed Datasets (RDD) consisting of a label (target/response variable) and feature vector. 此格式是 MLlib 中许多 ML 算法输入时所需的格式。This format is needed as input by many ML algorithms in MLlib.

下面是用于针对二元分类为文本特征编制索引和编码的代码。Here is the code to index and encode text features for binary classification.

# FUNCTIONS FOR BINARY CLASSIFICATION

# LOAD LIBRARIES
from pyspark.mllib.regression import LabeledPoint
from numpy import array

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingBinary(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex,
                         line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs, 
                         line.trip_distance, line.fare_amount])
    labPt = LabeledPoint(line.tipped, features)
    return  labPt

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO LOGISTIC REGRESSION MODELS
def parseRowOneHotBinary(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                                        line.vendorVec.toArray(), line.rateVec.toArray(), 
                                        line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
    labPt = LabeledPoint(line.tipped, features)
    return  labPt

下面是用于针对线性回归分析为分类文本特征编码和编制索引的代码。Here is the code to encode and index categorical text features for linear regression analysis.

# FUNCTIONS FOR REGRESSION WITH TIP AMOUNT AS TARGET VARIABLE

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingRegression(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex, 
                         line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs, 
                         line.trip_distance, line.fare_amount])

    labPt = LabeledPoint(line.tip_amount, features)
    return  labPt

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO LINEAR REGRESSION MODELS
def parseRowOneHotRegression(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                                        line.vendorVec.toArray(), line.rateVec.toArray(), 
                                        line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
    labPt = LabeledPoint(line.tip_amount, features)
    return  labPt

创建数据的随机子采样,并将其拆分为训练集和测试集Create a random subsampling of the data and split it into training and testing sets

此代码创建数据的随机采样(此处使用 25%)。This code creates a random sampling of the data (25% is used here). 尽管由于数据集的大小,本示例不需要此操作,但我们演示了如何在此处采样,以便你知道如何在需要时针对自己的问题使用它。Although it is not required for this example due to the size of the dataset, we demonstrate how you can sample here so you know how to use it for your own problem when needed. 当样本很大时,采样可以在训练模型时节省大量时间。When samples are large, sampling can save significant time while training models. 接下来我们将样本拆分为训练部分(此处为 75%)和测试部分(此处为 25%),用于分类和回归建模。Next we split the sample into a training part (75% here) and a testing part (25% here) to use in classification and regression modeling.

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.sql.functions import rand

# SPECIFY SAMPLING AND SPLITTING FRACTIONS
samplingFraction = 0.25;
trainingFraction = 0.75; testingFraction = (1-trainingFraction);
seed = 1234;
encodedFinalSampled = encodedFinal.sample(False, samplingFraction, seed=seed)

# SPLIT SAMPLED DATA-FRAME INTO TRAIN/TEST
# INCLUDE RAND COLUMN FOR CREATING CROSS-VALIDATION FOLDS (FOR USE LATER IN AN ADVANCED TOPIC)
dfTmpRand = encodedFinalSampled.select("*", rand(0).alias("rand"));
trainData, testData = dfTmpRand.randomSplit([trainingFraction, testingFraction], seed=seed);

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary = trainData.map(parseRowIndexingBinary)
indexedTESTbinary = testData.map(parseRowIndexingBinary)
oneHotTRAINbinary = trainData.map(parseRowOneHotBinary)
oneHotTESTbinary = testData.map(parseRowOneHotBinary)

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg = trainData.map(parseRowIndexingRegression)
indexedTESTreg = testData.map(parseRowIndexingRegression)
oneHotTRAINreg = trainData.map(parseRowOneHotRegression)
oneHotTESTreg = testData.map(parseRowOneHotRegression)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:0.24 秒Time taken to execute above cell: 0.24 second

特征缩放Feature scaling

特征缩放(也称为数据规范化)确保具有广泛分散的值的特征不在目标函数中得到过多权重。Feature scaling, also known as data normalization, insures that features with widely disbursed values are not given excessive weigh in the objective function. 用于特征缩放的代码使用 StandardScaler 将特征缩放为单位差异。The code for feature scaling uses the StandardScaler to scale the features to unit variance. 它由 MLlib 提供,用于使用随机梯度下降 (SGD) 的线性回归,随机梯度下降是一种用于训练范围广泛的其他机器学习模型(如正则化回归或支持向量机 (SVM))的流行算法。It is provided by MLlib for use in linear regression with Stochastic Gradient Descent (SGD), a popular algorithm for training a wide range of other machine learning models such as regularized regressions or support vector machines (SVM).

备注

我们发现,LinearRegressionWithSGD 算法对特征缩放很敏感。We have found the LinearRegressionWithSGD algorithm to be sensitive to feature scaling.

下面是用于缩放变量以用于正则化线性 SGD 算法的代码。Here is the code to scale variables for use with the regularized linear SGD algorithm.

# FEATURE SCALING

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.util import MLUtils

# SCALE VARIABLES FOR REGULARIZED LINEAR SGD ALGORITHM
label = oneHotTRAINreg.map(lambda x: x.label)
features = oneHotTRAINreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTRAINregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

label = oneHotTESTreg.map(lambda x: x.label)
features = oneHotTESTreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTESTregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:13.17 秒Time taken to execute above cell: 13.17 seconds

在内存中缓存对象Cache objects in memory

可通过缓存用于分类、回归和缩放特征的输入数据时间帧来缩短训练和测试 ML 算法所需的时间。The time taken for training and testing of ML algorithms can be reduced by caching the input data frame objects used for classification, regression, and scaled features.

# RECORD START TIME
timestart = datetime.datetime.now()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.cache()
indexedTESTbinary.cache()
oneHotTRAINbinary.cache()
oneHotTESTbinary.cache()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.cache()
indexedTESTreg.cache()
oneHotTRAINreg.cache()
oneHotTESTreg.cache()

# SCALED FEATURES
oneHotTRAINregScaled.cache()
oneHotTESTregScaled.cache()

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

执行以上单元格所花的时间:0.15 秒Time taken to execute above cell: 0.15 second

训练二元分类模型Train a binary classification model

本部分介绍如何将二元分类任务的三个模型用于预测是否为某个出租车行程支付小费。This section shows how use three models for the binary classification task of predicting whether or not a tip is paid for a taxi trip. 显示的模型包括:The models presented are:

  • 正则化逻辑回归Regularized logistic regression
  • 随机林模型Random forest model
  • 梯度提升树Gradient Boosting Trees

每个模型生成代码部分拆分为多个步骤:Each model building code section is split into steps:

  1. 带有一个参数集的模型训练数据Model training data with one parameter set
  2. 测试数据集上的使用指标的模型评估Model evaluation on a test data set with metrics
  3. 在 blob 中保存模型以供将来使用Saving model in blob for future consumption

使用逻辑回归的分类Classification using logistic regression

本部分中的代码显示如何使用 LBFGS 训练、评估和保存逻辑回归模型,该模型在 NYC 出租车行程和车费数据集中预测某个行程是否支付小费。The code in this section shows how to train, evaluate, and save a logistic regression model with LBFGS that predicts whether or not a tip is paid for a trip in the NYC taxi trip and fare dataset.

使用 CV 和超参数扫描训练逻辑回归模型Train the logistic regression model using CV and hyperparameter sweeping

# LOGISTIC REGRESSION CLASSIFICATION WITH CV AND HYPERPARAMETER SWEEPING

# GET ACCURACY FOR HYPERPARAMETERS BASED ON CROSS-VALIDATION IN TRAINING DATA-SET

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD LIBRARIES
from pyspark.mllib.classification import LogisticRegressionWithLBFGS 
from sklearn.metrics import roc_curve,auc
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics


# CREATE MODEL WITH ONE SET OF PARAMETERS
logitModel = LogisticRegressionWithLBFGS.train(oneHotTRAINbinary, iterations=20, initialWeights=None, 
                                               regParam=0.01, regType='l2', intercept=True, corrections=10, 
                                               tolerance=0.0001, validateData=True, numClasses=2)

# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features, 
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(logitModel.weights))
print("Intercept: " + str(logitModel.intercept))

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

系数:[0.0082065285375, -0.0223675576104, -0.0183812028036, -3.48124578069e-05, -0.00247646947233, -0.00165897881503, 0.0675394837328, -0.111823113101, -0.324609912762, -0.204549780032, -1.36499216354, 0.591088507921, -0.664263411392, -1.00439726852, 3.46567827545, -3.51025855172, -0.0471341112232, -0.043521833294, 0.000243375810385, 0.054518719222]Coefficients: [0.0082065285375, -0.0223675576104, -0.0183812028036, -3.48124578069e-05, -0.00247646947233, -0.00165897881503, 0.0675394837328, -0.111823113101, -0.324609912762, -0.204549780032, -1.36499216354, 0.591088507921, -0.664263411392, -1.00439726852, 3.46567827545, -3.51025855172, -0.0471341112232, -0.043521833294, 0.000243375810385, 0.054518719222]

截距:-0.0111216486893Intercept: -0.0111216486893

执行以上单元格所花的时间:14.43 秒Time taken to execute above cell: 14.43 seconds

通过标准指标评估二元分类模型Evaluate the binary classification model with standard metrics

#EVALUATE LOGISTIC REGRESSION MODEL WITH LBFGS

# RECORD START TIME
timestart = datetime.datetime.now()

# PREDICT ON TEST DATA WITH MODEL
predictionAndLabels = oneHotTESTbinary.map(lambda lp: (float(logitModel.predict(lp.features)), lp.label))

# INSTANTIATE METRICS OBJECT
metrics = BinaryClassificationMetrics(predictionAndLabels)

# AREA UNDER PRECISION-RECALL CURVE
print("Area under PR = %s" % metrics.areaUnderPR)

# AREA UNDER ROC CURVE
print("Area under ROC = %s" % metrics.areaUnderROC)
metrics = MulticlassMetrics(predictionAndLabels)

# OVERALL STATISTICS
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)


## SAVE MODEL WITH DATE-STAMP
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
logisticregressionfilename = "LogisticRegressionWithLBFGS_" + datestamp;
dirfilename = modelDir + logisticregressionfilename;
logitModel.save(sc, dirfilename);

# OUTPUT PROBABILITIES AND REGISTER TEMP TABLE
logitModel.clearThreshold(); # This clears threshold for classification (0.5) and outputs probabilities
predictionAndLabelsDF = predictionAndLabels.toDF()
predictionAndLabelsDF.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

输出:OUTPUT:

PR 下的面积 = 0.985297691373Area under PR = 0.985297691373

ROC 下的面积 = 0.983714670256Area under ROC = 0.983714670256

摘要统计信息Summary Stats

精度 = 0.984304060189Precision = 0.984304060189

召回率 = 0.984304060189Recall = 0.984304060189

F1 分数 = 0.984304060189F1 Score = 0.984304060189

执行以上单元格所花的时间:57.61 秒Time taken to execute above cell: 57.61 seconds

绘制 ROC 曲线。Plot the ROC curve.

在上一个单元格中,predictionAndLabelsDF 注册为表 tmp_resultsThe predictionAndLabelsDF is registered as a table, tmp_results, in the previous cell. tmp_results 可用于执行查询并将结果输出到 sqlResults 数据帧中用于绘图。tmp_results can be used to do queries and output results into the sqlResults data-frame for plotting. 代码如下。Here is the code.

# QUERY RESULTS                              
%%sql -q -o sqlResults
SELECT * from tmp_results

下面是进行预测并绘制 ROC 曲线的代码。Here is the code to make predictions and plot the ROC-curve.

# MAKE PREDICTIONS AND PLOT ROC-CURVE

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline
from sklearn.metrics import roc_curve,auc

# MAKE PREDICTIONS
predictions_pddf = test_predictions.rename(columns={'_1': 'probability', '_2': 'label'})
prob = predictions_pddf["probability"] 
fpr, tpr, thresholds = roc_curve(predictions_pddf['label'], prob, pos_label=1);
roc_auc = auc(fpr, tpr)

# PLOT ROC CURVE
plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

输出:OUTPUT:

Logistic regression ROC curve.png

随机林分类Random forest classification

本部分中的代码显示如何训练、评估和保存随机林模型,该模型在 NYC 出租车行程和车费数据集中预测某个行程是否支付小费。The code in this section shows how to train, evaluate, and save a random forest model that predicts whether or not a tip is paid for a trip in the NYC taxi trip and fare dataset.

#PREDICT WHETHER A TIP IS PAID OR NOT USING RANDOM FOREST

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

# TRAIN RANDOMFOREST MODEL
rfModel = RandomForest.trainClassifier(indexedTRAINbinary, numClasses=2, 
                                       categoricalFeaturesInfo=categoricalFeaturesInfo,
                                       numTrees=25, featureSubsetStrategy="auto",
                                       impurity='gini', maxDepth=5, maxBins=32)
## UN-COMMENT IF YOU WANT TO PRINT TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = rfModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# AREA UNDER ROC CURVE
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfclassificationfilename = "RandomForestClassification_" + datestamp;
dirfilename = modelDir + rfclassificationfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

ROC 下的面积 = 0.985297691373Area under ROC = 0.985297691373

执行以上单元格所花的时间:31.09 秒Time taken to execute above cell: 31.09 seconds

梯度提升树分类Gradient boosting trees classification

本部分中的代码显示如何训练、评估和保存渐变提升树模型,该模型在 NYC 出租车行程和车费数据集中预测某个行程是否支付小费。The code in this section shows how to train, evaluate, and save a gradient boosting trees model that predicts whether or not a tip is paid for a trip in the NYC taxi trip and fare dataset.

#PREDICT WHETHER A TIP IS PAID OR NOT USING GRADIENT BOOSTING TREES

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

gbtModel = GradientBoostedTrees.trainClassifier(indexedTRAINbinary, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=5)
## UNCOMMENT IF YOU WANT TO PRINT TREE DETAILS
#print('Learned classification GBT model:')
#print(bgtModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = gbtModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# AREA UNDER ROC CURVE
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN A BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btclassificationfilename = "GradientBoostingTreeClassification_" + datestamp;
dirfilename = modelDir + btclassificationfilename;

gbtModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

ROC 下的面积 = 0.985297691373Area under ROC = 0.985297691373

执行以上单元格所花的时间:19.76 秒Time taken to execute above cell: 19.76 seconds

训练回归模型Train a regression model

本部分介绍如何针对基于其他小费特征预测某个出租车行程支付的小费金额的回归任务使用三个模型。This section shows how use three models for the regression task of predicting the amount of the tip paid for a taxi trip based on other tip features. 显示的模型包括:The models presented are:

  • 正则化线性回归Regularized linear regression
  • 随机林Random forest
  • 梯度提升树Gradient Boosting Trees

简介中介绍了这些模型。These models were described in the introduction. 每个模型生成代码部分拆分为多个步骤:Each model building code section is split into steps:

  1. 带有一个参数集的模型训练数据Model training data with one parameter set
  2. 测试数据集上的使用指标的模型评估Model evaluation on a test data set with metrics
  3. 在 blob 中保存模型以供将来使用Saving model in blob for future consumption

使用 SGD 的线性回归Linear regression with SGD

本部分中的代码显示如何使用缩放特征训练使用随机梯度下降线性回归 (SGD) 进行优化的线性回归,以及如何在 Azure Blob 存储 (WASB) 中评分、评估和保存模型。The code in this section shows how to use scaled features to train a linear regression that uses stochastic gradient descent (SGD) for optimization, and how to score, evaluate, and save the model in Azure Blob Storage (WASB).

提示

根据我们的经验,LinearRegressionWithSGD 模型的收敛可能出现问题,需要仔细更改/优化参数以获取有效的模型。In our experience, there can be issues with the convergence of LinearRegressionWithSGD models, and parameters need to be changed/optimized carefully for obtaining a valid model. 变量的缩放对收敛帮助很大。Scaling of variables significantly helps with convergence.

#PREDICT TIP AMOUNTS USING LINEAR REGRESSION WITH SGD

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD LIBRARIES
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics
from scipy import stats

# USE SCALED FEATURES TO TRAIN MODEL
linearModel = LinearRegressionWithSGD.train(oneHotTRAINregScaled, iterations=100, step = 0.1, regType='l2', regParam=0.1, intercept = True)

# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features, 
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(linearModel.weights))
print("Intercept: " + str(linearModel.intercept))

# SCORE ON SCALED TEST DATA-SET & EVALUATE
predictionAndLabels = oneHotTESTregScaled.map(lambda lp: (float(linearModel.predict(lp.features)), lp.label))
testMetrics = RegressionMetrics(predictionAndLabels)

# PRINT TEST METRICS
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL WITH DATE-STAMP IN THE DEFAULT BLOB FOR THE CLUSTER
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
linearregressionfilename = "LinearRegressionWithSGD_" + datestamp;
dirfilename = modelDir + linearregressionfilename;

linearModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

系数:[0.00457675809917, -0.0226314167349, -0.0191910355236, 0.246793409578, 0.312047890459, 0.359634405999, 0.00928692253981, -0.000987181489428, -0.0888306617845, 0.0569376211553, 0.115519551711, 0.149250164995, -0.00990211159703, -0.00637410344522, 0.545083566179, -0.536756072402, 0.0105762393099, -0.0130117577055, 0.0129304737772, -0.00171065945959]Coefficients: [0.00457675809917, -0.0226314167349, -0.0191910355236, 0.246793409578, 0.312047890459, 0.359634405999, 0.00928692253981, -0.000987181489428, -0.0888306617845, 0.0569376211553, 0.115519551711, 0.149250164995, -0.00990211159703, -0.00637410344522, 0.545083566179, -0.536756072402, 0.0105762393099, -0.0130117577055, 0.0129304737772, -0.00171065945959]

截距:0.853872718283Intercept: 0.853872718283

RMSE = 1.24190115863RMSE = 1.24190115863

R-sqr = 0.608017146081R-sqr = 0.608017146081

执行以上单元格所花的时间:58.42 秒Time taken to execute above cell: 58.42 seconds

随机林回归Random Forest regression

本部分中的代码显示如何训练、评估和保存随机林回归,它将预测 NYC 出租车行程数据的小费金额。The code in this section shows how to train, evaluate, and save a random forest regression that predicts tip amount for the NYC taxi trip data.

#PREDICT TIP AMOUNTS USING RANDOM FOREST

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics


## TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
rfModel = RandomForest.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                    numTrees=25, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=10, maxBins=32)
## UN-COMMENT IF YOU WANT TO PRING TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

## PREDICT AND EVALUATE ON TEST DATA-SET
predictions = rfModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = oneHotTESTreg.map(lambda lp: lp.label).zip(predictions)

# TEST METRICS
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfregressionfilename = "RandomForestRegression_" + datestamp;
dirfilename = modelDir + rfregressionfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

RMSE = 0.891209218139RMSE = 0.891209218139

R-sqr = 0.759661334921R-sqr = 0.759661334921

执行以上单元格所花的时间:49.21 秒Time taken to execute above cell: 49.21 seconds

梯度提升树回归Gradient boosting trees regression

本部分中的代码显示如何训练、评估和保存梯度提升树模型,该模型预测 NYC 出租车行程数据的小费金额。The code in this section shows how to train, evaluate, and save a gradient boosting trees model that predicts tip amount for the NYC taxi trip data.

训练和评估Train and evaluate

#PREDICT TIP AMOUNTS USING GRADIENT BOOSTING TREES

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils

## TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
gbtModel = GradientBoostedTrees.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo, 
                                                numIterations=10, maxBins=32, maxDepth = 4, learningRate=0.1)

## EVALUATE A TEST DATA-SET
predictions = gbtModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = indexedTESTreg.map(lambda lp: lp.label).zip(predictions)

# TEST METRICS
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btregressionfilename = "GradientBoostingTreeRegression_" + datestamp;
dirfilename = modelDir + btregressionfilename;
gbtModel.save(sc, dirfilename)

# CONVERT RESULTS TO DF AND REGISTER TEMP TABLE
test_predictions = sqlContext.createDataFrame(predictionAndLabels)
test_predictions.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

输出:OUTPUT:

RMSE = 0.908473148639RMSE = 0.908473148639

R-sqr = 0.753835096681R-sqr = 0.753835096681

执行以上单元格所花的时间:34.52 秒Time taken to execute above cell: 34.52 seconds

Plot

tmp_results 在上一个单元格中注册为 Hive 表。tmp_results is registered as a Hive table in the previous cell. 表中的结果输出到 sqlResults 数据帧中用于绘图。Results from the table are output into the sqlResults data-frame for plotting. 代码如下Here is the code

# PLOT SCATTER-PLOT BETWEEN ACTUAL AND PREDICTED TIP VALUES

# SELECT RESULTS
%%sql -q -o sqlResults
SELECT * from tmp_results

下面是用于使用 Jupyter 服务器绘制数据的代码。Here is the code to plot the data using the Jupyter server.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline
import numpy as np

# PLOT 
ax = test_predictions_pddf.plot(kind='scatter', figsize = (6,6), x='_1', y='_2', color='blue', alpha = 0.25, label='Actual vs. predicted');
fit = np.polyfit(test_predictions_pddf['_1'], test_predictions_pddf['_2'], deg=1)
ax.set_title('Actual vs. Predicted Tip Amounts ($)')
ax.set_xlabel("Actual")
ax.set_ylabel("Predicted")
ax.plot(test_predictions_pddf['_1'], fit[0] * test_predictions_pddf['_1'] + fit[1], color='magenta')
plt.axis([-1, 20, -1, 20])
plt.show(ax)

输出:OUTPUT:

Actual-vs-predicted-tip-amounts

从内存中清理对象Clean up objects from memory

使用 unpersist() 删除内存中缓存的对象。Use unpersist() to delete objects cached in memory.

# REMOVE ORIGINAL DFs
taxi_df_train_cleaned.unpersist()
taxi_df_train_with_newFeatures.unpersist()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.unpersist()
indexedTESTbinary.unpersist()
oneHotTRAINbinary.unpersist()
oneHotTESTbinary.unpersist()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.unpersist()
indexedTESTreg.unpersist()
oneHotTRAINreg.unpersist()
oneHotTESTreg.unpersist()

# SCALED FEATURES
oneHotTRAINregScaled.unpersist()
oneHotTESTregScaled.unpersist()

保存模型Save the models

要按照对 Spark 生成的机器学习模型评分和评估主题中所述内容使用和评分独立数据集,必须复制此处所创建的保存模型的这些文件名,并将其粘贴到 Consumption Jupyter 笔记本。To consume and score an independent dataset described in the Score and evaluate Spark-built machine learning models topic, you need to copy and paste these file names containing the saved models created here into the Consumption Jupyter notebook. 下面是打印该处所需模型文件的路径的代码。Here is the code to print out the paths to model files you need there.

# MODEL FILE LOCATIONS FOR CONSUMPTION
print "logisticRegFileLoc = modelDir + \"" + logisticregressionfilename + "\"";
print "linearRegFileLoc = modelDir + \"" + linearregressionfilename + "\"";
print "randomForestClassificationFileLoc = modelDir + \"" + rfclassificationfilename + "\"";
print "randomForestRegFileLoc = modelDir + \"" + rfregressionfilename + "\"";
print "BoostedTreeClassificationFileLoc = modelDir + \"" + btclassificationfilename + "\"";
print "BoostedTreeRegressionFileLoc = modelDir + \"" + btregressionfilename + "\"";

OUTPUTOUTPUT

logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-05-0317_03_23.516568"logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-05-0317_03_23.516568"

linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-05-0317_05_21.577773"linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-05-0317_05_21.577773"

randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-05-0317_04_11.950206"randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-05-0317_04_11.950206"

randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-05-0317_06_08.723736"randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-05-0317_06_08.723736"

BoostedTreeClassificationFileLoc = modelDir + "GradientBoostingTreeClassification_2016-05-0317_04_36.346583"BoostedTreeClassificationFileLoc = modelDir + "GradientBoostingTreeClassification_2016-05-0317_04_36.346583"

BoostedTreeRegressionFileLoc = modelDir + "GradientBoostingTreeRegression_2016-05-0317_06_51.737282"BoostedTreeRegressionFileLoc = modelDir + "GradientBoostingTreeRegression_2016-05-0317_06_51.737282"

后续步骤What's next?

现在已使用 Spark MlLib 创建了回归和分类模型,可了解如何评分和评估这些模型。Now that you have created regression and classification models with the Spark MlLib, you are ready to learn how to score and evaluate these models. 高级数据浏览和建模笔记本深入探讨到包括交叉验证、超参数扫描和模型评估。The advanced data exploration and modeling notebook dives deeper into including cross-validation, hyper-parameter sweeping, and model evaluation.

使用模型: 若要了解如何评分和评估在本主题中创建的分类和回归模型,请参阅评分和评估 Spark 构建的机器学习模型Model consumption: To learn how to score and evaluate the classification and regression models created in this topic, see Score and evaluate Spark-built machine learning models.

交叉验证和超参数扫描:请参阅使用 Spark 进行高级数据探索和建模,了解如何使用交叉验证和超参数扫描训练模型Cross-validation and hyperparameter sweeping: See Advanced data exploration and modeling with Spark on how models can be trained using cross-validation and hyper-parameter sweeping