# 使用 Spark 进行高级数据探索和建模Advanced data exploration and modeling with Spark

• 二元分类任务用于预测某个行程是否会支付小费。The binary classification task is to predict whether or not a tip is paid for the trip.
• 回归任务用于根据其他小费特征预测小费的金额。The regression task is to predict the amount of the tip based on other tip features.

• 使用 SGD 的线性回归是使用随机梯度下降 (SGD) 方法进行优化和特征缩放的线性回归模型，以预测支付的小费金额。Linear regression with SGD is a linear regression model that uses a Stochastic Gradient Descent (SGD) method and for optimization and feature scaling to predict the tip amounts paid.
• 使用 LBFGS 的逻辑回归或“logit”回归，是可在因变量分类时用于执行数据分类的回归模型。Logistic regression with LBFGS or "logit" regression, is a regression model that can be used when the dependent variable is categorical to do data classification. LBFGS 是拟牛顿优化算法，近似于使用有限计算机内存量的 Broyden–Fletcher–Goldfarb–Shanno (BFGS) 算法，在机器学习中广泛使用。LBFGS is a quasi-Newton optimization algorithm that approximates the Broyden–Fletcher–Goldfarb–Shanno (BFGS) algorithm using a limited amount of computer memory and that is widely used in machine learning.
• 随机林是决策树的整体。Random forests are ensembles of decision trees. 它们组合了许多决策树以降低过度拟合的风险。They combine many decision trees to reduce the risk of overfitting. 随机林用于回归和分类，并且可处理分类特征，也可扩展到多类分类设置。Random forests are used for regression and classification and can handle categorical features and can be extended to the multiclass classification setting. 它们不需要特征缩放，并且能够捕获非线性和特征交互。They do not require feature scaling and are able to capture non-linearities and feature interactions. 随机林是用于分类和回归的最成功的机器学习模型之一。Random forests are one of the most successful machine learning models for classification and regression.
• 梯度提升树 (GBTS) 是决策树的整体。Gradient boosted trees (GBTS) are ensembles of decision trees. GBTS 以迭代方式训练决策树以最大程度减少损失函数。GBTS train decision trees iteratively to minimize a loss function. GBTS 用于回归和分类，并且可处理分类特征，不需要功能缩放，并且能够捕获非线性和特征交互。GBTS is used for regression and classification and can handle categorical features, do not require feature scaling, and are able to capture non-linearities and feature interactions. 它们还可以在多类分类设置中使用。They can also be used in a multiclass-classification setting.

## 设置：Spark 群集和笔记本Setup: Spark clusters and notebooks

### Spark 1.6 笔记本Spark 1.6 notebooks

pySpark-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb：包括笔记本 #1 中的主题，以及使用超参数优化和交叉验证的模型开发。pySpark-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb: Includes topics in notebook #1, and model development using hyperparameter tuning and cross-validation.

### Spark 2.0 笔记本Spark 2.0 notebooks

Spark2.0-pySpark3-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb：此文件提供有关如何在 Spark 2.0 群集中执行数据探索、建模和评分的信息。Spark2.0-pySpark3-machine-learning-data-science-spark-advanced-data-exploration-modeling.ipynb: This file provides information on how to perform data exploration, modeling, and scoring in Spark 2.0 clusters.

HDInsight 群集是基于分钟按比例计费，而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it.

## 设置：存储位置、库和预设 Spark 上下文Setup: storage locations, libraries, and the preset Spark context

Spark 能够读取和写入 Azure 存储 Blob（也称为 WASB）。Spark is able to read and write to Azure Storage Blob (also known as WASB). 因此，存储在该处的任何现有数据都可以使用 Spark 处理，并将结果再次存储在 WASB 中。So any of your existing data stored there can be processed using Spark and the results stored again in WASB.

### 在 WASB 中为存储位置设置目录路径Set directory paths for storage locations in WASB

``````# SET PATHS TO FILE LOCATIONS: DATA AND MODEL STORAGE

# LOCATION OF TRAINING DATA
taxi_train_file_loc = "wasb://mllibwalkthroughs@cdspsparksamples.blob.core.chinacloudapi.cn/Data/NYCTaxi/JoinedTaxiTripFare.Point1Pct.Train.tsv";

# SET THE MODEL STORAGE DIRECTORY PATH
# NOTE THAT THE FINAL BACKSLASH IN THE PATH IS NEEDED.
modelDir = "wasb:///user/remoteuser/NYCTaxi/Models/";

# PRINT START TIME
import datetime
datetime.datetime.now()
``````

OUTPUTOUTPUT

datetime.datetime(2016, 4, 18, 17, 36, 27, 832799)datetime.datetime(2016, 4, 18, 17, 36, 27, 832799)

### 导入库Import libraries

``````# LOAD PYSPARK LIBRARIES
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import atexit
from numpy import array
import numpy as np
import datetime
``````

### 预设 Spark 上下文和 PySpark magicPreset Spark context and PySpark magics

• sc - 用于 Sparksc - for Spark
• sqlContext - 用于 HivesqlContext - for Hive

PySpark 内核提供一些预定义的“magic”，这是可以结合 %% 调用的特殊命令。The PySpark kernel provides some predefined “magics”, which are special commands that you can call with %%. 在这些代码示例中使用的此类命令有两个。There are two such commands that are used in these code samples.

• %%local 指定本地执行后续行中的代码。%%local Specifies that the code in subsequent lines is to be executed locally. 代码必须是有效的 Python 代码。Code must be valid Python code.
• %%sql -o <variable name> 针对 sqlContext 执行 Hive 查询。%%sql -o <variable name> Executes a Hive query against the sqlContext. 如果传递了 -o 参数，则查询的结果以 Pandas 数据帧的形式保存在 %%local Python 上下文中。If the -o parameter is passed, the result of the query is persisted in the %%local Python context as a Pandas DataFrame.

## 来自公共 blob 的数据引入：Data ingestion from public blob:

• 引入数据样本以供建模ingest the data sample to be modeled
• 读取输入数据集（以 .tsv 文件的形式存储）read in the input dataset (stored as a .tsv file)
• 格式化并清理数据format and clean the data
• 在内存中创建并缓存对象（RDD 或数据帧）create and cache objects (RDDs or data-frames) in memory
• 在 SQL 上下文中将其注册为临时表。register it as a temp-table in SQL-context.

``````# RECORD START TIME
timestart = datetime.datetime.now()

# IMPORT FILE FROM PUBLIC BLOB
taxi_train_file = sc.textFile(taxi_train_file_loc)

# GET SCHEMA OF THE FILE FROM HEADER
schema_string = taxi_train_file.first()
fields = [StructField(field_name, StringType(), True) for field_name in schema_string.split('\t')]
fields[7].dataType = IntegerType() #Pickup hour
fields[8].dataType = IntegerType() # Pickup week
fields[9].dataType = IntegerType() # Weekday
fields[10].dataType = IntegerType() # Passenger count
fields[11].dataType = FloatType() # Trip time in secs
fields[12].dataType = FloatType() # Trip distance
fields[19].dataType = FloatType() # Fare amount
fields[20].dataType = FloatType() # Surcharge
fields[21].dataType = FloatType() # Mta_tax
fields[22].dataType = FloatType() # Tip amount
fields[23].dataType = FloatType() # Tolls amount
fields[24].dataType = FloatType() # Total amount
fields[25].dataType = IntegerType() # Tipped or not
fields[26].dataType = IntegerType() # Tip class
taxi_schema = StructType(fields)

# PARSE FIELDS AND CONVERT DATA TYPE FOR SOME FIELDS
taxi_header = taxi_train_file.filter(lambda l: "medallion" in l)
.map(lambda p: (p[0],p[1],p[2],p[3],p[4],p[5],p[6],int(p[7]),int(p[8]),int(p[9]),int(p[10]),
float(p[11]),float(p[12]),p[13],p[14],p[15],p[16],p[17],p[18],float(p[19]),
float(p[20]),float(p[21]),float(p[22]),float(p[23]),float(p[24]),int(p[25]),int(p[26])))

# CREATE DATA FRAME
taxi_train_df = sqlContext.createDataFrame(taxi_temp, taxi_schema)

# CREATE A CLEANED DATA-FRAME BY DROPPING SOME UN-NECESSARY COLUMNS & FILTERING FOR UNDESIRED VALUES OR OUTLIERS
.drop('dropoff_datetime').drop('pickup_longitude').drop('pickup_latitude').drop('dropoff_latitude')\
.drop('dropoff_longitude').drop('tip_class').drop('total_amount').drop('tolls_amount').drop('mta_tax')\
.drop('direct_distance').drop('surcharge')\
.filter("passenger_count > 0 and passenger_count < 8 AND payment_type in ('CSH', 'CRD') AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 150 AND trip_distance > 0 AND trip_distance < 100 AND trip_time_in_secs > 30 AND trip_time_in_secs < 7200" )

# CACHE & MATERIALIZE DATA-FRAME IN MEMORY. GOING THROUGH AND COUNTING NUMBER OF ROWS MATERIALIZES THE DATA-FRAME IN MEMORY
taxi_df_train_cleaned.cache()
taxi_df_train_cleaned.count()

# REGISTER DATA-FRAME AS A TEMP-TABLE IN SQL-CONTEXT
taxi_df_train_cleaned.registerTempTable("taxi_train")

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

## 数据探索和可视化Data exploration & visualization

### 绘制出租车小费样本中乘客数频率的直方图Plot a histogram of passenger count frequencies in the sample of taxi trips

• SQL magic (`%%sql`) HDInsight PySpark 内核支持针对 sqlContext 的轻松内联 HiveQL 查询。SQL magic (`%%sql`) The HDInsight PySpark kernel supports easy inline HiveQL queries against the sqlContext. (-o VARIABLE_NAME) 参数在 Jupyter 服务器上将 SQL 查询的输出保留为 Pandas 数据帧。The (-o VARIABLE_NAME) argument persists the output of the SQL query as a Pandas DataFrame on the Jupyter server. 这意味着它在本地模式下可用。This means it is available in the local mode.
• `%%local` magic 用于在 Jupyter 服务器上本地运行代码，该服务器是 HDInsight 群集的头节点。The `%%local` magic is used to run code locally on the Jupyter server, which is the headnode of the HDInsight cluster. 通常，在将 `%%sql -o` magic 用于运行查询后，使用 `%%local` magic。Typically, you use `%%local` magic after the `%%sql -o` magic is used to run a query. -o 参数会在本地保留 SQL 查询的输出。The -o parameter would persist the output of the SQL query locally. 然后，`%%local` magic 触发下一组代码片段，以在本地针对已保留在本地的 SQL 查询输出运行。Then the `%%local` magic triggers the next set of code snippets to run locally against the output of the SQL queries that has been persisted locally. 该输出在运行代码后自动可视化。The output is automatically visualized after you run the code.

``````# PLOT FREQUENCY OF PASSENGER COUNTS IN TAXI TRIPS

# SQL QUERY
%%sql -q -o sqlResults
SELECT passenger_count, COUNT(*) as trip_counts FROM taxi_train WHERE passenger_count > 0 and passenger_count < 7 GROUP BY passenger_count
``````

``````# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# USE THE JUPYTER AUTO-PLOTTING FEATURE TO CREATE INTERACTIVE FIGURES.
# CLICK ON THE TYPE OF PLOT TO BE GENERATED (E.G. LINE, AREA, BAR ETC.)
sqlResults
``````

``````# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
import matplotlib.pyplot as plt
%matplotlib inline

# PLOT PASSENGER NUMBER VS TRIP COUNTS
x_labels = sqlResults['passenger_count'].values
fig = sqlResults[['trip_counts']].plot(kind='bar', facecolor='lightblue')
fig.set_xticklabels(x_labels)
fig.set_title('Counts of trips by passenger count')
fig.set_xlabel('Passenger count in trips')
fig.set_ylabel('Trip counts')
plt.show()
``````

OUTPUTOUTPUT

### 绘制小费金额以及小费金额如何随乘客数和车费金额变化的直方图。Plot a histogram of tip amounts and how tip amount varies by passenger count and fare amounts.

``````# SQL SQUERY
%%sql -q -o sqlResults
SELECT fare_amount, passenger_count, tip_amount, tipped
FROM taxi_train
WHERE passenger_count > 0
AND passenger_count < 7
AND fare_amount > 0
AND fare_amount < 200
AND payment_type in ('CSH', 'CRD')
AND tip_amount > 0
AND tip_amount < 25
``````

``````# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline

# TIP BY PAYMENT TYPE AND PASSENGER COUNT
ax1 = resultsPDDF[['tip_amount']].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('Tip amount distribution')
ax1.set_xlabel('Tip Amount (\$)')
ax1.set_ylabel('Counts')
plt.suptitle('')
plt.show()

# TIP BY PASSENGER COUNT
ax2 = resultsPDDF.boxplot(column=['tip_amount'], by=['passenger_count'])
ax2.set_title('Tip amount (\$) by Passenger count')
ax2.set_xlabel('Passenger count')
ax2.set_ylabel('Tip Amount (\$)')
plt.suptitle('')
plt.show()

# TIP AMOUNT BY FARE AMOUNT, POINTS ARE SCALED BY PASSENGER COUNT
ax = resultsPDDF.plot(kind='scatter', x= 'fare_amount', y = 'tip_amount', c='blue', alpha = 0.10, s=5*(resultsPDDF.passenger_count))
ax.set_title('Tip amount by Fare amount (\$)')
ax.set_xlabel('Fare Amount')
ax.set_ylabel('Tip Amount')
plt.axis([-2, 120, -2, 30])
plt.show()
``````

## 用于建模的特征工程、转换和数据准备Feature engineering, transformation, and data preparation for modeling

• 通过将小时划分到交通时间箱来创建新特征Create a new feature by partitioning hours into traffic time bins
• 为分类特征编制索引并进行独热编码Index and on-hot encode categorical features
• 创建标签点对象用于输入到 ML 函数中Create labeled point objects for input into ML functions
• 创建数据的随机子采样，并将其拆分为训练集和测试集Create a random subsampling of the data and split it into training and testing sets
• 特征缩放Feature scaling
• 在内存中缓存对象Cache objects in memory

### 通过将交通时间划分到箱来创建新特征Create a new feature by partitioning traffic times into bins

``````# CREATE FOUR BUCKETS FOR TRAFFIC TIMES
sqlStatement = """
SELECT *,
CASE
WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night"
WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush"
WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
END as TrafficTimeBins
FROM taxi_train
"""
taxi_df_train_with_newFeatures = sqlContext.sql(sqlStatement)
``````
``````# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
# THE .COUNT() GOES THROUGH THE ENTIRE DATA-FRAME,
# MATERIALIZES IT IN MEMORY, AND GIVES THE COUNT OF ROWS.
taxi_df_train_with_newFeatures.cache()
taxi_df_train_with_newFeatures.count()
``````

OUTPUTOUTPUT

126050126050

### 为分类特征编制索引并进行独热编码Index and one-hot encode categorical features

``````# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, OneHotEncoder, VectorIndexer

# INDEX AND ENCODE VENDOR_ID
stringIndexer = StringIndexer(inputCol="vendor_id", outputCol="vendorIndex")
model = stringIndexer.fit(taxi_df_train_with_newFeatures) # Input data-frame is the cleaned one from above
indexed = model.transform(taxi_df_train_with_newFeatures)
encoder = OneHotEncoder(dropLast=False, inputCol="vendorIndex", outputCol="vendorVec")
encoded1 = encoder.transform(indexed)

# INDEX AND ENCODE RATE_CODE
stringIndexer = StringIndexer(inputCol="rate_code", outputCol="rateIndex")
model = stringIndexer.fit(encoded1)
indexed = model.transform(encoded1)
encoder = OneHotEncoder(dropLast=False, inputCol="rateIndex", outputCol="rateVec")
encoded2 = encoder.transform(indexed)

# INDEX AND ENCODE PAYMENT_TYPE
stringIndexer = StringIndexer(inputCol="payment_type", outputCol="paymentIndex")
model = stringIndexer.fit(encoded2)
indexed = model.transform(encoded2)
encoder = OneHotEncoder(dropLast=False, inputCol="paymentIndex", outputCol="paymentVec")
encoded3 = encoder.transform(indexed)

# INDEX AND TRAFFIC TIME BINS
stringIndexer = StringIndexer(inputCol="TrafficTimeBins", outputCol="TrafficTimeBinsIndex")
model = stringIndexer.fit(encoded3)
indexed = model.transform(encoded3)
encoder = OneHotEncoder(dropLast=False, inputCol="TrafficTimeBinsIndex", outputCol="TrafficTimeBinsVec")
encodedFinal = encoder.transform(indexed)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

### 创建标签点对象用于输入到 ML 函数中Create labeled point objects for input into ML functions

``````# FUNCTIONS FOR BINARY CLASSIFICATION

from pyspark.mllib.regression import LabeledPoint
from numpy import array

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingBinary(line):
features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.pickup_hour, line.weekday,
line.passenger_count, line.trip_time_in_secs, line.trip_distance, line.fare_amount])
labPt = LabeledPoint(line.tipped, features)
return  labPt

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO LOGISTIC REGRESSION MODELS
def parseRowOneHotBinary(line):
features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
line.trip_time_in_secs, line.trip_distance, line.fare_amount]),
line.vendorVec.toArray(), line.rateVec.toArray(), line.paymentVec.toArray()), axis=0)
labPt = LabeledPoint(line.tipped, features)
return  labPt
``````

``````# FUNCTIONS FOR REGRESSION WITH TIP AMOUNT AS TARGET VARIABLE

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingRegression(line):
features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex,
line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs,
line.trip_distance, line.fare_amount])
labPt = LabeledPoint(line.tip_amount, features)
return  labPt

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO LINEAR REGRESSION MODELS
def parseRowOneHotRegression(line):
features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
line.trip_time_in_secs, line.trip_distance, line.fare_amount]),
line.vendorVec.toArray(), line.rateVec.toArray(),
line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
labPt = LabeledPoint(line.tip_amount, features)
return  labPt
``````

### 创建数据的随机子采样，并将其拆分为训练集和测试集Create a random subsampling of the data and split it into training and testing sets

``````# RECORD START TIME
timestart = datetime.datetime.now()

# SPECIFY SAMPLING AND SPLITTING FRACTIONS
from pyspark.sql.functions import rand

samplingFraction = 0.25;
trainingFraction = 0.75; testingFraction = (1-trainingFraction);
seed = 1234;
encodedFinalSampled = encodedFinal.sample(False, samplingFraction, seed=seed)

# SPLIT SAMPLED DATA-FRAME INTO TRAIN/TEST, WITH A RANDOM COLUMN ADDED FOR DOING CV (SHOWN LATER)
# INCLUDE RAND COLUMN FOR CREATING CROSS-VALIDATION FOLDS
dfTmpRand = encodedFinalSampled.select("*", rand(0).alias("rand"));
trainData, testData = dfTmpRand.randomSplit([trainingFraction, testingFraction], seed=seed);

# CACHE TRAIN AND TEST DATA
trainData.cache()
testData.cache()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary = trainData.map(parseRowIndexingBinary)
indexedTESTbinary = testData.map(parseRowIndexingBinary)
oneHotTRAINbinary = trainData.map(parseRowOneHotBinary)
oneHotTESTbinary = testData.map(parseRowOneHotBinary)

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg = trainData.map(parseRowIndexingRegression)
indexedTESTreg = testData.map(parseRowIndexingRegression)
oneHotTRAINreg = trainData.map(parseRowOneHotRegression)
oneHotTESTreg = testData.map(parseRowOneHotRegression)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

### 特征缩放Feature scaling

``````# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.util import MLUtils

# SCALE VARIABLES FOR REGULARIZED LINEAR SGD ALGORITHM
label = oneHotTRAINreg.map(lambda x: x.label)
features = oneHotTRAINreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTRAINregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

label = oneHotTESTreg.map(lambda x: x.label)
features = oneHotTESTreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTESTregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

### 在内存中缓存对象Cache objects in memory

``````# RECORD START TIME
timestart = datetime.datetime.now()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.cache()
indexedTESTbinary.cache()
oneHotTRAINbinary.cache()
oneHotTESTbinary.cache()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.cache()
indexedTESTreg.cache()
oneHotTRAINreg.cache()
oneHotTESTreg.cache()

# SCALED FEATURES
oneHotTRAINregScaled.cache()
oneHotTESTregScaled.cache()

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

## 通过二元分类模型预测是否支付小费Predict whether or not a tip is paid with binary classification models

• 逻辑回归Logistic regression
• 随机林Random forest

1. 带有一个参数集的模型训练数据Model training data with one parameter set
2. 测试数据集上的使用指标的模型评估Model evaluation on a test data set with metrics
3. 在 blob 中保存模型以供将来使用Saving model in blob for future consumption

1. 使用泛型自定义代码，此代码可应用到 MLlib 中的任何算法以及算法中的任何参数集。Using generic custom code that can be applied to any algorithm in MLlib and to any parameter sets in an algorithm.

2. 使用 pySpark CrossValidator 管道函数Using the pySpark CrossValidator pipeline function. CrossValidator 对于 Spark 1.5.0 有几个限制：CrossValidator has a few limitations for Spark 1.5.0:

• 无法保存或保留管道模型以供将来使用。Pipeline models cannot be saved or persisted for future consumption.
• 无法用于模型中的每个参数。Cannot be used for every parameter in a model.
• 无法用于每个 MLlib 算法。Cannot be used for every MLlib algorithm.

### 与二元分类的逻辑回归算法一起使用的泛型交叉验证和超参数扫描Generic cross validation and hyperparameter sweeping used with the logistic regression algorithm for binary classification

``````# LOGISTIC REGRESSION CLASSIFICATION WITH CV AND HYPERPARAMETER SWEEPING

# GET ACCURACY FOR HYPERPARAMETERS BASED ON CROSS-VALIDATION IN TRAINING DATA-SET

# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# CREATE PARAMETER GRID FOR LOGISTIC REGRESSION PARAMETER SWEEP
from sklearn.grid_search import ParameterGrid
grid = [{'regParam': [0.01, 0.1], 'iterations': [5, 10], 'regType': ["l1", "l2"], 'tolerance': [1e-3, 1e-4]}]
paramGrid = list(ParameterGrid(grid))
numModels = len(paramGrid)

# SET NUM FOLDS AND NUM PARAMETER SETS TO SWEEP ON
nFolds = 3;
h = 1.0 / nFolds;
metricSum = np.zeros(numModels);

# BEGIN CV WITH PARAMETER SWEEP
for i in range(nFolds):
# Create training and x-validation sets
validateLB = i * h
validateUB = (i + 1) * h
condition = (trainData["rand"] >= validateLB) & (trainData["rand"] < validateUB)
validation = trainData.filter(condition)
# Create LabeledPoints from data-frames
if i > 0:
trainCVLabPt.unpersist()
validationLabPt.unpersist()
trainCV = trainData.filter(~condition)
trainCVLabPt = trainCV.map(parseRowOneHotBinary)
trainCVLabPt.cache()
validationLabPt = validation.map(parseRowOneHotBinary)
validationLabPt.cache()
# For parameter sets compute metrics from x-validation
for j in range(numModels):
regt = paramGrid[j]['regType']
regp = paramGrid[j]['regParam']
iters = paramGrid[j]['iterations']
tol = paramGrid[j]['tolerance']
# Train logistic regression model with hypermarameter set
model = LogisticRegressionWithLBFGS.train(trainCVLabPt, regType=regt, iterations=iters,
regParam=regp, tolerance = tol, intercept=True)
predictionAndLabels = validationLabPt.map(lambda lp: (float(model.predict(lp.features)), lp.label))
# Use ROC-AUC as accuracy metrics
validMetrics = BinaryClassificationMetrics(predictionAndLabels)
metric = validMetrics.areaUnderROC
metricSum[j] += metric

avgAcc = metricSum / nFolds;
bestParam = paramGrid[np.argmax(avgAcc)];

# UNPERSIST OBJECTS
trainCVLabPt.unpersist()
validationLabPt.unpersist()

# TRAIN ON FULL TRAIING SET USING BEST PARAMETERS FROM CV/PARAMETER SWEEP
logitBest = LogisticRegressionWithLBFGS.train(oneHotTRAINbinary, regType=bestParam['regType'],
iterations=bestParam['iterations'],
regParam=bestParam['regParam'], tolerance = bestParam['tolerance'],
intercept=True)

# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features,
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(logitBest.weights))
print("Intercept: " + str(logitBest.intercept))

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

``````# RECORD START TIME
timestart = datetime.datetime.now()

#IMPORT LIBRARIES
from sklearn.metrics import roc_curve,auc
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

# PREDICT ON TEST DATA WITH BEST/FINAL MODEL
predictionAndLabels = oneHotTESTbinary.map(lambda lp: (float(logitBest.predict(lp.features)), lp.label))

# INSTANTIATE METRICS OBJECT
metrics = BinaryClassificationMetrics(predictionAndLabels)

# AREA UNDER PRECISION-RECALL CURVE
print("Area under PR = %s" % metrics.areaUnderPR)

# AREA UNDER ROC CURVE
print("Area under ROC = %s" % metrics.areaUnderROC)
metrics = MulticlassMetrics(predictionAndLabels)

# OVERALL STATISTICS
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

# OUTPUT PROBABILITIES AND REGISTER TEMP TABLE
logitBest.clearThreshold(); # This clears threshold for classification (0.5) and outputs probabilities
predictionAndLabelsDF = predictionAndLabels.toDF()
predictionAndLabelsDF.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

PR = 0.985336538462 下的面积Area under PR = 0.985336538462

ROC = 0.983383274312 下的面积Area under ROC = 0.983383274312

F1 分数 = 0.984174341679F1 Score = 0.984174341679

``````# QUERY RESULTS
%%sql -q -o sqlResults
SELECT * from tmp_results
``````

``````# MAKE PREDICTIONS AND PLOT ROC-CURVE

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline
from sklearn.metrics import roc_curve,auc

#PREDICTIONS
predictions_pddf = sqlResults.rename(columns={'_1': 'probability', '_2': 'label'})
prob = predictions_pddf["probability"]
fpr, tpr, thresholds = roc_curve(predictions_pddf['label'], prob, pos_label=1);
roc_auc = auc(fpr, tpr)

# PLOT ROC CURVES
plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()
``````

OUTPUTOUTPUT

``````# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.mllib.classification import LogisticRegressionModel

# PERSIST MODEL
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
logisticregressionfilename = "LogisticRegressionWithLBFGS_" + datestamp;
dirfilename = modelDir + logisticregressionfilename;

logitBest.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

### 将 MLlib 的 CrossValidator 管道函数与逻辑回归（弹性回归）模型一起使用Use MLlib's CrossValidator pipeline function with logistic regression (Elastic regression) model

``````# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from sklearn.metrics import roc_curve,auc

# DEFINE ALGORITHM / MODEL
lr = LogisticRegression()

# DEFINE GRID PARAMETERS
.build()

# DEFINE CV WITH PARAMETER SWEEP
cv = CrossValidator(estimator= lr,
estimatorParamMaps=paramGrid,
evaluator=BinaryClassificationEvaluator(),
numFolds=3)

# CONVERT TO DATA-FRAME: THIS DOES NOT RUN ON RDDs
trainDataFrame = sqlContext.createDataFrame(oneHotTRAINbinary, ["features", "label"])

# TRAIN WITH CROSS-VALIDATION
cv_model = cv.fit(trainDataFrame)

## PREDICT AND EVALUATE ON TEST DATA-SET

# USE TEST DATASET FOR PREDICTION
testDataFrame = sqlContext.createDataFrame(oneHotTESTbinary, ["features", "label"])
test_predictions = cv_model.transform(testDataFrame)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

``````# QUERY RESULTS
%%sql -q -o sqlResults
SELECT label, prediction, probability from tmp_results
``````

``````# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
from sklearn.metrics import roc_curve,auc

# ROC CURVE
prob = [x["values"][1] for x in sqlResults["probability"]]
fpr, tpr, thresholds = roc_curve(sqlResults['label'], prob, pos_label=1);
roc_auc = auc(fpr, tpr)

#PLOT
plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()
``````

OUTPUTOUTPUT

### 随机林分类Random forest classification

``````# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

# TRAIN RANDOMFOREST MODEL
rfModel = RandomForest.trainClassifier(indexedTRAINbinary, numClasses=2,
categoricalFeaturesInfo=categoricalFeaturesInfo,
numTrees=25, featureSubsetStrategy="auto",
impurity='gini', maxDepth=5, maxBins=32)
## UN-COMMENT IF YOU WANT TO PRING TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = rfModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# AREA UNDER ROC CURVE
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfclassificationfilename = "RandomForestClassification_" + datestamp;
dirfilename = modelDir + rfclassificationfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

ROC = 0.985336538462 下的面积Area under ROC = 0.985336538462

``````# RECORD START TIME
timestart = datetime.datetime.now()

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

numIterations=10)
## UNCOMMENT IF YOU WANT TO PRINT TREE DETAILS
#print('Learned classification GBT model:')
#print(bgtModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = gbtModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# Area under ROC curve
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN A BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
dirfilename = modelDir + btclassificationfilename;

gbtModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

ROC = 0.985336538462 下的面积Area under ROC = 0.985336538462

## 通过回归模型预测小费金额（不使用 CV）Predict tip amount with regression models (not using CV)

• 正则化线性回归Regularized linear regression
• 随机林Random forest

1. 带有一个参数集的模型训练数据Model training data with one parameter set
2. 测试数据集上的使用指标的模型评估Model evaluation on a test data set with metrics
3. 在 blob 中保存模型以供将来使用Saving model in blob for future consumption

### 使用 SGD 的线性回归Linear regression with SGD

``````# LINEAR REGRESSION WITH SGD

# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics
from scipy import stats

# USE SCALED FEATURES TO TRAIN MODEL
linearModel = LinearRegressionWithSGD.train(oneHotTRAINregScaled, iterations=100, step = 0.1, regType='l2', regParam=0.1, intercept = True)

# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features,
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(linearModel.weights))
print("Intercept: " + str(linearModel.intercept))

# SCORE ON SCALED TEST DATA-SET & EVALUATE
predictionAndLabels = oneHotTESTregScaled.map(lambda lp: (float(linearModel.predict(lp.features)), lp.label))
testMetrics = RegressionMetrics(predictionAndLabels)

print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
linearregressionfilename = "LinearRegressionWithSGD_" + datestamp;
dirfilename = modelDir + linearregressionfilename;

linearModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

RMSE = 1.23485131376RMSE = 1.23485131376

R-sqr = 0.597963951127R-sqr = 0.597963951127

### 随机林回归Random Forest regression

``````#PREDICT TIP AMOUNTS USING RANDOM FOREST

# RECORD START TIME
timestart= datetime.datetime.now()

from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics

# TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
rfModel = RandomForest.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo,
numTrees=25, featureSubsetStrategy="auto",
impurity='variance', maxDepth=10, maxBins=32)
# UN-COMMENT IF YOU WANT TO PRING TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

# PREDICT AND EVALUATE ON TEST DATA-SET
predictions = rfModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = oneHotTESTreg.map(lambda lp: lp.label).zip(predictions)

testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfregressionfilename = "RandomForestRegression_" + datestamp;
dirfilename = modelDir + rfregressionfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

RMSE = 0.931981967875RMSE = 0.931981967875

R-sqr = 0.733445485802R-sqr = 0.733445485802

``````#PREDICT TIP AMOUNTS USING GRADIENT BOOSTING TREES

# RECORD START TIME
timestart= datetime.datetime.now()

from pyspark.mllib.util import MLUtils

# TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
numIterations=10, maxBins=32, maxDepth = 4, learningRate=0.1)

# EVALUATE A TEST DATA-SET
predictions = gbtModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = indexedTESTreg.map(lambda lp: lp.label).zip(predictions)

testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# PLOT SCATTER-PLOT BETWEEN ACTUAL AND PREDICTED TIP VALUES
test_predictions= sqlContext.createDataFrame(predictionAndLabels)
test_predictions_pddf = test_predictions.toPandas()

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
dirfilename = modelDir + btregressionfilename;
gbtModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

RMSE = 0.928172197114RMSE = 0.928172197114

R-sqr = 0.732680354389R-sqr = 0.732680354389

Plot

tmp_results 在上一个单元格中注册为 Hive 表。tmp_results is registered as a Hive table in the previous cell. 表中的结果输出到 sqlResults 数据帧中用于绘图。Results from the table are output into the sqlResults data-frame for plotting. 代码如下Here is the code

``````# PLOT SCATTER-PLOT BETWEEN ACTUAL AND PREDICTED TIP VALUES

# SELECT RESULTS
%%sql -q -o sqlResults
SELECT * from tmp_results
``````

``````# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
import numpy as np

# PLOT
ax = sqlResults.plot(kind='scatter', figsize = (6,6), x='_1', y='_2', color='blue', alpha = 0.25, label='Actual vs. predicted');
fit = np.polyfit(sqlResults['_1'], sqlResults['_2'], deg=1)
ax.set_title('Actual vs. Predicted Tip Amounts (\$)')
ax.set_xlabel("Actual")
ax.set_ylabel("Predicted")
ax.plot(sqlResults['_1'], fit[0] * sqlResults['_1'] + fit[1], color='magenta')
plt.axis([-1, 15, -1, 15])
plt.show(ax)
``````

### 用于线性回归的使用弹性网络的交叉验证Cross validation using Elastic net for linear regression

``````###  CV USING ELASTIC NET FOR LINEAR REGRESSION

# RECORD START TIME
timestart = datetime.datetime.now()

from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# DEFINE ALGORITHM/MODEL
lr = LinearRegression()

# DEFINE GRID PARAMETERS
.build()

# DEFINE PIPELINE
# SIMPLY THE MODEL HERE, WITHOUT TRANSFORMATIONS
pipeline = Pipeline(stages=[lr])

# DEFINE CV WITH PARAMETER SWEEP
cv = CrossValidator(estimator= lr,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(),
numFolds=3)

# CONVERT TO DATA FRAME, AS CROSSVALIDATOR WON'T RUN ON RDDS
trainDataFrame = sqlContext.createDataFrame(oneHotTRAINreg, ["features", "label"])

# TRAIN WITH CROSS-VALIDATION
cv_model = cv.fit(trainDataFrame)

# EVALUATE MODEL ON TEST SET
testDataFrame = sqlContext.createDataFrame(oneHotTESTreg, ["features", "label"])

# MAKE PREDICTIONS ON TEST DOCUMENTS
# cvModel uses the best model found (lrModel).
predictionAndLabels = cv_model.transform(testDataFrame)

# CONVERT TO DF AND SAVE REGISTER DF AS TABLE
predictionAndLabels.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

tmp_results 在上一个单元格中注册为 Hive 表。tmp_results is registered as a Hive table in the previous cell. 表中的结果输出到 sqlResults 数据帧中用于绘图。Results from the table are output into the sqlResults data-frame for plotting. 代码如下Here is the code

``````# SELECT RESULTS
%%sql -q -o sqlResults
SELECT label,prediction from tmp_results
``````

``````# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
from scipy import stats

#R-SQR TEST METRIC
corstats = stats.linregress(sqlResults['label'],sqlResults['prediction'])
r2 = (corstats[2]*corstats[2])
print("R-sqr = %s" % r2)
``````

OUTPUTOUTPUT

R-sqr = 0.619184907088R-sqr = 0.619184907088

### 用于随机林回归的通过使用自定义代码的参数扫描的交叉验证Cross validation with parameter sweep using custom code for random forest regression

``````# RECORD START TIME
timestart= datetime.datetime.now()

# GET ACCURARY FOR HYPERPARAMETERS BASED ON CROSS-VALIDATION IN TRAINING DATA-SET
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics
from sklearn.grid_search import ParameterGrid

## CREATE PARAMETER GRID
grid = [{'maxDepth': [5,10], 'numTrees': [25,50]}]
paramGrid = list(ParameterGrid(grid))

## SPECIFY LEVELS OF CATEGORICAL VARIBLES
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

# SPECIFY NUMFOLDS AND ARRAY TO HOLD METRICS
nFolds = 3;
numModels = len(paramGrid)
h = 1.0 / nFolds;
metricSum = np.zeros(numModels);

for i in range(nFolds):
# Create training and x-validation sets
validateLB = i * h
validateUB = (i + 1) * h
condition = (trainData["rand"] >= validateLB) & (trainData["rand"] < validateUB)
validation = trainData.filter(condition)
# Create labeled points from data-frames
if i > 0:
trainCVLabPt.unpersist()
validationLabPt.unpersist()
trainCV = trainData.filter(~condition)
trainCVLabPt = trainCV.map(parseRowIndexingRegression)
trainCVLabPt.cache()
validationLabPt = validation.map(parseRowIndexingRegression)
validationLabPt.cache()
# For parameter sets compute metrics from x-validation
for j in range(numModels):
maxD = paramGrid[j]['maxDepth']
numT = paramGrid[j]['numTrees']
# Train logistic regression model with hypermarameter set
rfModel = RandomForest.trainRegressor(trainCVLabPt, categoricalFeaturesInfo=categoricalFeaturesInfo,
numTrees=numT, featureSubsetStrategy="auto",
impurity='variance', maxDepth=maxD, maxBins=32)
predictions = rfModel.predict(validationLabPt.map(lambda x: x.features))
predictionAndLabels = validationLabPt.map(lambda lp: lp.label).zip(predictions)
# Use ROC-AUC as accuracy metrics
validMetrics = RegressionMetrics(predictionAndLabels)
metric = validMetrics.rootMeanSquaredError
metricSum[j] += metric

avgAcc = metricSum/nFolds;
bestParam = paramGrid[np.argmin(avgAcc)];

# UNPERSIST OBJECTS
trainCVLabPt.unpersist()
validationLabPt.unpersist()

## TRAIN FINAL MODL WIHT BEST PARAMETERS
rfModel = RandomForest.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo,
numTrees=bestParam['numTrees'], featureSubsetStrategy="auto",
impurity='variance', maxDepth=bestParam['maxDepth'], maxBins=32)

# EVALUATE MODEL ON TEST DATA
predictions = rfModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = indexedTESTreg.map(lambda lp: lp.label).zip(predictions)

#PRINT TEST METRICS
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2)
print "Time taken to execute above cell: " + str(timedelta) + " seconds";
``````

OUTPUTOUTPUT

RMSE = 0.906972198262RMSE = 0.906972198262

R-sqr = 0.740751197012R-sqr = 0.740751197012

### 从内存中清除对象并打印模型位置Clean up objects from memory and print model locations

``````# UNPERSIST OBJECTS CACHED IN MEMORY

# REMOVE ORIGINAL DFs
taxi_df_train_cleaned.unpersist()
taxi_df_train_with_newFeatures.unpersist()
trainData.unpersist()
trainData.unpersist()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.unpersist()
indexedTESTbinary.unpersist()
oneHotTRAINbinary.unpersist()
oneHotTESTbinary.unpersist()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.unpersist()
indexedTESTreg.unpersist()
oneHotTRAINreg.unpersist()
oneHotTESTreg.unpersist()

# SCALED FEATURES
oneHotTRAINregScaled.unpersist()
oneHotTESTregScaled.unpersist()
``````

OUTPUTOUTPUT

PythonRDD[122] at RDD at PythonRDD.scala:43PythonRDD[122] at RDD at PythonRDD.scala: 43

``````# PRINT MODEL FILE LOCATIONS FOR CONSUMPTION
print "logisticRegFileLoc = modelDir + \"" + logisticregressionfilename + "\"";
print "linearRegFileLoc = modelDir + \"" + linearregressionfilename + "\"";
print "randomForestClassificationFileLoc = modelDir + \"" + rfclassificationfilename + "\"";
print "randomForestRegFileLoc = modelDir + \"" + rfregressionfilename + "\"";
print "BoostedTreeClassificationFileLoc = modelDir + \"" + btclassificationfilename + "\"";
print "BoostedTreeRegressionFileLoc = modelDir + \"" + btregressionfilename + "\"";
``````

OUTPUTOUTPUT

logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-05-0316_47_30.096528"logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-05-0316_47_30.096528"

linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-05-0316_51_28.433670"linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-05-0316_51_28.433670"

randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-05-0316_50_17.454440"randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-05-0316_50_17.454440"

randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-05-0316_51_57.331730"randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-05-0316_51_57.331730"