在 Azure 上使用 Scala 和 Spark 展开数据科研Data Science using Scala and Spark on Azure

本文介绍如何在 Azure HDInsight Spark 群集上通过 Spark 可缩放 MLlib 和 Spark ML 包使用 Scala 进行监管式的机器学习任务。This article shows you how to use Scala for supervised machine learning tasks with the Spark scalable MLlib and Spark ML packages on an Azure HDInsight Spark cluster. 它将指导完成数据科学过程所需的任务:数据引入和浏览、可视化、特征工程、建模和模型使用。It walks you through the tasks that constitute the Data Science process: data ingestion and exploration, visualization, feature engineering, modeling, and model consumption. 本文中的模型包括逻辑和线性回归、随机林和梯度提升树 (GBT),以及两个常见的监管式机器学习任务:The models in the article include logistic and linear regression, random forests, and gradient-boosted trees (GBTs), in addition to two common supervised machine learning tasks:

  • 回归问题:预测某个出租车行程的小费金额 ($)Regression problem: Prediction of the tip amount ($) for a taxi trip
  • 二元分类:预测某个出租车行程是否支付小费 (1/0)Binary classification: Prediction of tip or no tip (1/0) for a taxi trip

建模过程需要对测试数据集和相关准确性度量值进行定型和评估。The modeling process requires training and evaluation on a test data set and relevant accuracy metrics. 在本文中,可以了解如何在 Azure Blob 存储中存储这些模型,以及如何对其预测性能进行评分和评估。In this article, you can learn how to store these models in Azure Blob storage and how to score and evaluate their predictive performance. 本文还介绍了如何通过使用交叉验证和超参数扫描优化模型的更高级主题。This article also covers the more advanced topics of how to optimize models by using cross-validation and hyper-parameter sweeping. 所使用的数据是 GitHub 上提供的 2013 年 NYC 出租车行程和费用数据集样本。The data used is a sample of the 2013 NYC taxi trip and fare data set available on GitHub.

Scala 是一种基于 Java 虚拟机的语言,其集成面向对象和功能语言概念。Scala, a language based on the Java virtual machine, integrates object-oriented and functional language concepts. 这是一种可扩展的语言,非常适合云中的分布式处理,且在 Azure Spark 群集上运行。It's a scalable language that is well suited to distributed processing in the cloud, and runs on Azure Spark clusters.

Spark 是一种开放源代码并行处理框架,支持内存中处理,以提升大数据分析应用程序的性能。Spark is an open-source parallel-processing framework that supports in-memory processing to boost the performance of big data analytics applications. Spark 处理引擎是专为速度、易用性和复杂分析打造的产品。The Spark processing engine is built for speed, ease of use, and sophisticated analytics. Spark 的内存中分布式计算功能使其成为机器学习和图形计算中的迭代算法的最佳选择。Spark's in-memory distributed computation capabilities make it a good choice for iterative algorithms in machine learning and graph computations. Spark.ml 包提供了一组统一的高级 API,它们构建在数据帧之上,可以帮助创建和优化实际机器学习管道。The spark.ml package provides a uniform set of high-level APIs built on top of data frames that can help you create and tune practical machine learning pipelines. MLlib 是 Spark 的可扩展机器学习库,为此分布式环境提供建模功能。MLlib is Spark's scalable machine learning library, which brings modeling capabilities to this distributed environment.

HDInsight Spark 是 Azure 托管的开放源代码 Spark 产品。HDInsight Spark is the Azure-hosted offering of open-source Spark. 它还包括对 Spark 群集上 Jupyter Scala 笔记本的支持,并且可运行 Spark SQL 交互式查询以便对存储在 Azure Blob 存储中的数据进行转换、筛选和可视化。It also includes support for Jupyter Scala notebooks on the Spark cluster, and can run Spark SQL interactive queries to transform, filter, and visualize data stored in Azure Blob storage. 本文中的 Scala 代码片段提供解决方案,并显示相关图表,以可视化安装在 Spark 群集上的 Jupyter 笔记本中运行的数据。The Scala code snippets in this article that provide the solutions and show the relevant plots to visualize the data run in Jupyter notebooks installed on the Spark clusters. 这些主题中的建模步骤具有代码,显示每种类型模型的定型、评估、保存和使用方式。The modeling steps in these topics have code that shows you how to train, evaluate, save, and consume each type of model.

本文中的设置步骤和代码适用于 Azure HDInsight 3.4 Spark 1.6。The setup steps and code in this article are for Azure HDInsight 3.4 Spark 1.6. 但是,本文和 Scala Jupyter Notebook中的代码是通用的,应该可以在任何 Spark 群集上使用。However, the code in this article and in the Scala Jupyter Notebook are generic and should work on any Spark cluster. 如果不使用 HDInsight Spark,群集设置和管理步骤可能与本文中显示的稍有不同。The cluster setup and management steps might be slightly different from what is shown in this article if you are not using HDInsight Spark.

备注

有关演示如何使用 Python 而非 Scala 完成端到端数据科学过程任务的主题,请参阅在 Azure HDInsight 上使用 Spark 展开数据科学For a topic that shows you how to use Python rather than Scala to complete tasks for an end-to-end Data Science process, see Data Science using Spark on Azure HDInsight.

先决条件Prerequisites

  • 必须拥有 Azure 订阅。You must have an Azure subscription.
  • 需要 Azure HDInsight 3.4 Spark 1.6 群集来完成以下过程。You need an Azure HDInsight 3.4 Spark 1.6 cluster to complete the following procedures. 若要创建群集,请参阅入门:在 Azure HDInsight 上创建 Apache Spark 中的说明。To create a cluster, see the instructions in Get started: Create Apache Spark on Azure HDInsight. 在“选择群集类型”菜单上设置群集类型和版本。Set the cluster type and version on the Select Cluster Type menu.

HDInsight 群集类型配置

警告

HDInsight 群集是基于分钟按比例计费,而不管用户是否使用它们。Billing for HDInsight clusters is prorated per minute, whether you use them or not. 请务必在使用完群集之后将其删除。Be sure to delete your cluster after you finish using it. 请参阅如何删除 HDInsight 群集See how to delete an HDInsight cluster.

有关 NYC 出租车行程数据说明以及如何从 Spark 群集上的 Jupyter notebook 执行代码的说明,请参阅在 Azure HDInsight 上使用 Spark 展开数据科学的概述中的相关部分。For a description of the NYC taxi trip data and instructions on how to execute code from a Jupyter notebook on the Spark cluster, see the relevant sections in Overview of Data Science using Spark on Azure HDInsight.

从 Spark 群集上的 Jupyter notebook 执行 Scala 代码Execute Scala code from a Jupyter notebook on the Spark cluster

可以从 Azure 门户启动 Jupyter notebook。You can launch a Jupyter notebook from the Azure portal. 在仪表板上找到 Spark 群集,并单击进入群集管理页面。Find the Spark cluster on your dashboard, and then click it to enter the management page for your cluster. 接下来,单击“群集仪表板”,再单击“Jupyter Notebook”打开与 Spark 群集关联的笔记本。Next, click Cluster Dashboards, and then click Jupyter Notebook to open the notebook associated with the Spark cluster.

群集仪表板和 Jupyter notebook

还可以在 https://<clustername>.azurehdinsight.net/jupyter 访问 Jupyter notebook。You also can access Jupyter notebooks at https://<clustername>.azurehdinsight.net/jupyter. clustername 替换为群集名称。Replace clustername with the name of your cluster. 需要使用管理员帐户密码访问 Jupyter notebook。You need the password for your administrator account to access the Jupyter notebooks.

使用群集名称可转到 Jupyter notebook

选择“Scala”,查看包含数个使用 PySpark API 的预打包笔记本示例的目录。Select Scala to see a directory that has a few examples of prepackaged notebooks that use the PySpark API. 浏览建模和评分可在 GitHub 上获得,它使用 Scala.ipynb 笔记本,笔记本中包含此套 Spark 主题的代码示例。The Exploration Modeling and Scoring using Scala.ipynb notebook that contains the code samples for this suite of Spark topics is available on GitHub.

可以将笔记本直接从 GitHub 上传到 Spark 群集上的 Jupyter Notebook 服务器。You can upload the notebook directly from GitHub to the Jupyter Notebook server on your Spark cluster. 在 Jupyter 主页上,单击“上传”按钮。On your Jupyter home page, click the Upload button. 在文件资源管理器中,粘贴 Scala notebook 的 GitHub(原始内容)URL,并单击“打开”。In the file explorer, paste the GitHub (raw content) URL of the Scala notebook, and then click Open. 可通过以下 URL 获取 Scala notebook:The Scala notebook is available at the following URL:

Exploration-Modeling-and-Scoring-using-Scala.ipynbExploration-Modeling-and-Scoring-using-Scala.ipynb

设置:预设 Spark 和 Hive 上下文、Spark magic 和 Spark 库Setup: Preset Spark and Hive contexts, Spark magics, and Spark libraries

预设的 Spark 和 Hive 上下文Preset Spark and Hive contexts

# SET THE START TIME
import java.util.Calendar
val beginningTime = Calendar.getInstance().getTime()

与 Jupyter notebook 一起提供的 Spark 内核具有预设上下文。The Spark kernels that are provided with Jupyter notebooks have preset contexts. 在开始处理正在开发的应用程序前,无需显式设置 Spark 或 Hive 上下文。You don't need to explicitly set the Spark or Hive contexts before you start working with the application you are developing. 预设上下文是:The preset contexts are:

  • SparkContext 的 scsc for SparkContext
  • HiveContext 的 sqlContextsqlContext for HiveContext

Spark magicSpark magics

Spark 内核提供一些预定义的“magic”,这是可以结合 %% 调用的特殊命令。The Spark kernel provides some predefined “magics,” which are special commands that you can call with %%. 以下代码示例中使用了其中两个命令。Two of these commands are used in the following code samples.

  • %%local 后续行中的代码会在本地执行。%%local specifies that the code in subsequent lines will be executed locally. 代码必须是有效的 Scala 代码。The code must be valid Scala code.
  • %%sql -o <variable name> 针对 sqlContext 执行 Hive 查询。%%sql -o <variable name> executes a Hive query against sqlContext. 如果传递了 -o 参数,则查询的结果以 Spark 数据帧的形式保存在 %%local Scala 上下文中。If the -o parameter is passed, the result of the query is persisted in the %%local Scala context as a Spark data frame.

若要详细了解 Jupyter notebook 内核及调用的其 %%(如 %%local)预定义“magic”,请参阅适用于 HDInsight 上具有 HDInsight Spark Linux 群集的 Jupyter notebook 的内核For more information about the kernels for Jupyter notebooks and their predefined "magics" that you call with %% (for example, %%local), see Kernels available for Jupyter notebooks with HDInsight Spark Linux clusters on HDInsight.

导入库Import libraries

使用以下代码导入 Spark、MLlib 和其他需要的库。Import the Spark, MLlib, and other libraries you'll need by using the following code.

# IMPORT SPARK AND JAVA LIBRARIES
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import java.text.SimpleDateFormat
import java.util.Calendar
import sqlContext.implicits._
import org.apache.spark.sql.Row

# IMPORT SPARK SQL FUNCTIONS
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType, DoubleType}
import org.apache.spark.sql.functions.rand

# IMPORT SPARK ML FUNCTIONS
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, OneHotEncoder, VectorIndexer, Binarizer}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit, CrossValidator}
import org.apache.spark.ml.regression.{LinearRegression, LinearRegressionModel, RandomForestRegressor, RandomForestRegressionModel, GBTRegressor, GBTRegressionModel}
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel, RandomForestClassifier, RandomForestClassificationModel, GBTClassifier, GBTClassificationModel}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, RegressionEvaluator, MulticlassClassificationEvaluator}

# IMPORT SPARK MLLIB FUNCTIONS
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.{LogisticRegressionWithLBFGS, LogisticRegressionModel}
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel}
import org.apache.spark.mllib.tree.{GradientBoostedTrees, RandomForest}
import org.apache.spark.mllib.tree.configuration.BoostingStrategy
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, RandomForestModel, Predict}
import org.apache.spark.mllib.evaluation.{BinaryClassificationMetrics, MulticlassMetrics, RegressionMetrics}

# SPECIFY SQLCONTEXT
val sqlContext = new SQLContext(sc)

数据引入Data ingestion

数据科学过程的第一步是引入要分析的数据。The first step in the Data Science process is to ingest the data that you want to analyze. 将数据从外部源或其所在的系统引入数据浏览和建模环境中。You bring the data from external sources or systems where it resides into your data exploration and modeling environment. 本文中,引入的数据是 0.1% 出租车行程和费用文件(以 .tsv 文件的形式存储)示例的结合。In this article, the data you ingest is a joined 0.1% sample of the taxi trip and fare file (stored as a .tsv file). 数据浏览和建模环境是 Spark。The data exploration and modeling environment is Spark. 本部分包含用于完成以下一系列任务的代码:This section contains the code to complete the following series of tasks:

  1. 设置数据和模型存储的目录路径。Set directory paths for data and model storage.
  2. 读取输入数据集(以 .tsv 文件的形式存储)。Read in the input data set (stored as a .tsv file).
  3. 定义数据架构,并清理数据。Define a schema for the data and clean the data.
  4. 创建清理数据帧并将其缓存在内存中。Create a cleaned data frame and cache it in memory.
  5. 将数据注册为 SQLContext 中的临时表。Register the data as a temporary table in SQLContext.
  6. 查询表,并将结果导入数据帧。Query the table and import the results into a data frame.

在 Azure Blob 存储中为存储位置设置目录路径Set directory paths for storage locations in Azure Blob storage

Spark 可以读取和写入到 Azure Blob 存储。Spark can read and write to Azure Blob storage. 可以使用 Spark 处理任何现有数据,然后将结果再次存储在 Blob 存储中。You can use Spark to process any of your existing data, and then store the results again in Blob storage.

若要在 Blob 存储中保存模型或文件,需要指定正确路径。To save models or files in Blob storage, you need to properly specify the path. 可使用以 wasb:/// 开头的路径引用附加到 Spark 群集的默认容器。Reference the default container attached to the Spark cluster by using a path that begins with wasb:///. 使用 wasb:// 引用其他位置。Reference other locations by using wasb://.

以下代码示例指定要读取的输入数据位置以及到 Blob 存储的路径,该存储附加到将要保存模型的 Spark 群集。The following code sample specifies the location of the input data to be read and the path to Blob storage that is attached to the Spark cluster where the model will be saved.

# SET PATHS TO DATA AND MODEL FILE LOCATIONS
# INGEST DATA AND SPECIFY HEADERS FOR COLUMNS
    val taxi_train_file = sc.textFile("wasb://mllibwalkthroughs@cdspsparksamples.blob.core.chinacloudapi.cn/Data/NYCTaxi/JoinedTaxiTripFare.Point1Pct.Train.tsv")
val header = taxi_train_file.first;

# SET THE MODEL STORAGE DIRECTORY PATH
# NOTE THAT THE FINAL BACKSLASH IN THE PATH IS REQUIRED.
val modelDir = "wasb:///user/remoteuser/NYCTaxi/Models/";

导入数据,创建 RDD,并根据架构定义数据帧Import data, create an RDD, and define a data frame according to the schema

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# DEFINE THE SCHEMA BASED ON THE HEADER OF THE FILE
val sqlContext = new SQLContext(sc)
val taxi_schema = StructType(
    Array(
        StructField("medallion", StringType, true),
        StructField("hack_license", StringType, true),
        StructField("vendor_id", StringType, true),
        StructField("rate_code", DoubleType, true),
        StructField("store_and_fwd_flag", StringType, true),
        StructField("pickup_datetime", StringType, true),
        StructField("dropoff_datetime", StringType, true),
        StructField("pickup_hour", DoubleType, true),
        StructField("pickup_week", DoubleType, true),
        StructField("weekday", DoubleType, true),
        StructField("passenger_count", DoubleType, true),
        StructField("trip_time_in_secs", DoubleType, true),
        StructField("trip_distance", DoubleType, true),
        StructField("pickup_longitude", DoubleType, true),
        StructField("pickup_latitude", DoubleType, true),
        StructField("dropoff_longitude", DoubleType, true),
        StructField("dropoff_latitude", DoubleType, true),
        StructField("direct_distance", StringType, true),
        StructField("payment_type", StringType, true),
        StructField("fare_amount", DoubleType, true),
        StructField("surcharge", DoubleType, true),
        StructField("mta_tax", DoubleType, true),
        StructField("tip_amount", DoubleType, true),
        StructField("tolls_amount", DoubleType, true),
        StructField("total_amount", DoubleType, true),
        StructField("tipped", DoubleType, true),
        StructField("tip_class", DoubleType, true)
        )
    )

# CAST VARIABLES ACCORDING TO THE SCHEMA
val taxi_temp = (taxi_train_file.map(_.split("\t"))
                        .filter((r) => r(0) != "medallion")
                         .map(p => Row(p(0), p(1), p(2),
                            p(3).toDouble, p(4), p(5), p(6), p(7).toDouble, p(8).toDouble, p(9).toDouble, p(10).toDouble,
                            p(11).toDouble, p(12).toDouble, p(13).toDouble, p(14).toDouble, p(15).toDouble, p(16).toDouble,
                            p(17), p(18), p(19).toDouble, p(20).toDouble, p(21).toDouble, p(22).toDouble,
                            p(23).toDouble, p(24).toDouble, p(25).toDouble, p(26).toDouble)))


# CREATE AN INITIAL DATA FRAME AND DROP COLUMNS, AND THEN CREATE A CLEANED DATA FRAME BY FILTERING FOR UNWANTED VALUES OR OUTLIERS
val taxi_train_df = sqlContext.createDataFrame(taxi_temp, taxi_schema)

val taxi_df_train_cleaned = (taxi_train_df.drop(taxi_train_df.col("medallion"))
        .drop(taxi_train_df.col("hack_license")).drop(taxi_train_df.col("store_and_fwd_flag"))
        .drop(taxi_train_df.col("pickup_datetime")).drop(taxi_train_df.col("dropoff_datetime"))
        .drop(taxi_train_df.col("pickup_longitude")).drop(taxi_train_df.col("pickup_latitude"))
        .drop(taxi_train_df.col("dropoff_longitude")).drop(taxi_train_df.col("dropoff_latitude"))
        .drop(taxi_train_df.col("surcharge")).drop(taxi_train_df.col("mta_tax"))
        .drop(taxi_train_df.col("direct_distance")).drop(taxi_train_df.col("tolls_amount"))
        .drop(taxi_train_df.col("total_amount")).drop(taxi_train_df.col("tip_class"))
        .filter("passenger_count > 0 and passenger_count < 8 AND payment_type in ('CSH', 'CRD') AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 150 AND trip_distance > 0 AND trip_distance < 100 AND trip_time_in_secs > 30 AND trip_time_in_secs < 7200"));

# CACHE AND MATERIALIZE THE CLEANED DATA FRAME IN MEMORY
taxi_df_train_cleaned.cache()
taxi_df_train_cleaned.count()

# REGISTER THE DATA FRAME AS A TEMPORARY TABLE IN SQLCONTEXT
taxi_df_train_cleaned.registerTempTable("taxi_train")

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

输出:Output:

运行该单元格的时间:8 秒。Time to run the cell: 8 seconds.

查询表,并将结果导入数据帧Query the table and import results in a data frame

接下来,查询表格中的费用、乘客和小费数据;筛选出期限外和范围外的数据;并打印数行。Next, query the table for fare, passenger, and tip data; filter out corrupt and outlying data; and print several rows.

# QUERY THE DATA
val sqlStatement = """
    SELECT fare_amount, passenger_count, tip_amount, tipped
    FROM taxi_train
    WHERE passenger_count > 0 AND passenger_count < 7
    AND fare_amount > 0 AND fare_amount < 200
    AND payment_type in ('CSH', 'CRD')
    AND tip_amount > 0 AND tip_amount < 25
"""
val sqlResultsDF = sqlContext.sql(sqlStatement)

# SHOW ONLY THE TOP THREE ROWS
sqlResultsDF.show(3)

输出:Output:

fare_amountfare_amount passenger_countpassenger_count tip_amounttip_amount tippedtipped
13.513.5 1.01.0 2.92.9 1.01.0
16.016.0 2.02.0 3.43.4 1.01.0
10.510.5 2.02.0 1.01.0 1.01.0

数据浏览和可视化Data exploration and visualization

将数据引入到 Spark 中后,数据科学过程的下一步是通过浏览和可视化更深入地了解数据。After you bring the data into Spark, the next step in the Data Science process is to gain a deeper understanding of the data through exploration and visualization. 在此部分中,可以使用 SQL 查询检查出租车数据。In this section, you examine the taxi data by using SQL queries. 然后,将结果导入到数据帧中,使用自动可视化 Jupyter 功能绘制目标变量和预期功能,以进行目测。Then, import the results into a data frame to plot the target variables and prospective features for visual inspection by using the automatic visualization Jupyter feature.

使用本地和 SQL magic 绘制数据Use local and SQL magic to plot data

默认情况下,从 Jupyter notebook 运行的任何代码片段的输出,在保留于辅助角色节点上的会话上下文中可用。By default, the output of any code snippet that you run from a Jupyter notebook is available within the context of the session that is persisted on the worker nodes. 如果要将行程保存到辅助角色节点,以便每次计算,并且计算所需的所有数据都可以在 Jupyter 服务器节点(这是头节点)上本地可用,则可以使用 %%local magic 在 Jupyter 服务器上运行代码片段。If you want to save a trip to the worker nodes for every computation, and if all the data that you need for your computation is available locally on the Jupyter server node (which is the head node), you can use the %%local magic to run the code snippet on the Jupyter server.

  • SQL magic (%%sql).SQL magic (%%sql). HDInsight Spark 内核支持针对 SQLContext 的简单内联 HiveQL 查询。The HDInsight Spark kernel supports easy inline HiveQL queries against SQLContext. -o VARIABLE_NAME)参数在 Jupyter 服务器上将 SQL 查询的输出保留为 Pandas 数据帧。The (-o VARIABLE_NAME) argument persists the output of the SQL query as a Pandas data frame on the Jupyter server. 此设置表示输出将可在本地模式下使用。This setting means the output will be available in the local mode.
  • %%local magic。%%local magic. %%localmagic 在 Jupyter 服务器上本地运行代码,该服务器是 HDInsight 群集的头节点。The %%local magic runs the code locally on the Jupyter server, which is the head node of the HDInsight cluster. 通常,将 %%local magic 与 %%sql magic 和 -o 参数结合使用。Typically, you use %%local magic in conjunction with the %%sql magic with the -o parameter. -o 参数将本地保留 SQL 查询的输出,然后 %%local 将触发下一组代码片段,针对本地保留的 SQL 查询输出本地运行。The -o parameter would persist the output of the SQL query locally, and then %%local magic would trigger the next set of code snippet to run locally against the output of the SQL queries that is persisted locally.

使用 SQL 查询数据Query the data by using SQL

此查询按费用金额、乘客数量和小费金额检索出租车行程。This query retrieves the taxi trips by fare amount, passenger count, and tip amount.

# RUN THE SQL QUERY
%%sql -q -o sqlResults
SELECT fare_amount, passenger_count, tip_amount, tipped FROM taxi_train WHERE passenger_count > 0 AND passenger_count < 7 AND fare_amount > 0 AND fare_amount < 200 AND payment_type in ('CSH', 'CRD') AND tip_amount > 0 AND tip_amount < 25

在以下代码中,%%local magic 创建本地数据帧 sqlResults。In the following code, the %%local magic creates a local data frame, sqlResults. 可以使用 sqlResults 通过 matplotlib 进行绘图。You can use sqlResults to plot by using matplotlib.

提示

本文中多次使用本地 magic。Local magic is used multiple times in this article. 如果数据集较大,请采样创建本地内存可容纳的数据帧。If your data set is large, please sample to create a data frame that can fit in local memory.

绘制数据Plot the data

可以在数据帧处于本地上下文之后,使用 Python 代码作为 Pandas 数据帧进行绘制。You can plot by using Python code after the data frame is in local context as a Pandas data frame.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# USE THE JUPYTER AUTO-PLOTTING FEATURE TO CREATE INTERACTIVE FIGURES.
# CLICK THE TYPE OF PLOT TO GENERATE (LINE, AREA, BAR, ETC.)
sqlResults

运行代码后,Spark 内核自动可视化 SQL (HiveQL) 查询的输出。The Spark kernel automatically visualizes the output of SQL (HiveQL) queries after you run the code. 可以在以下几种类型的可视化之间进行选择:You can choose between several types of visualizations:

  • Table
  • 饼图Pie
  • Line
  • 区域Area
  • 条形图​​Bar

以下是用于绘制数据的代码:Here's the code to plot the data:

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
import matplotlib.pyplot as plt
%matplotlib inline

# PLOT TIP BY PAYMENT TYPE AND PASSENGER COUNT
ax1 = sqlResults[['tip_amount']].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('Tip amount distribution')
ax1.set_xlabel('Tip Amount ($)')
ax1.set_ylabel('Counts')
plt.suptitle('')
plt.show()

# PLOT TIP BY PASSENGER COUNT
ax2 = sqlResults.boxplot(column=['tip_amount'], by=['passenger_count'])
ax2.set_title('Tip amount by Passenger count')
ax2.set_xlabel('Passenger count')
ax2.set_ylabel('Tip Amount ($)')
plt.suptitle('')
plt.show()

# PLOT TIP AMOUNT BY FARE AMOUNT; SCALE POINTS BY PASSENGER COUNT
ax = sqlResults.plot(kind='scatter', x= 'fare_amount', y = 'tip_amount', c='blue', alpha = 0.10, s=5*(sqlResults.passenger_count))
ax.set_title('Tip amount by Fare amount')
ax.set_xlabel('Fare Amount ($)')
ax.set_ylabel('Tip Amount ($)')
plt.axis([-2, 80, -2, 20])
plt.show()

输出:Output:

小费金额直方图

按乘客数的小费金额

按车费金额的小费金额

创建特征和转换特征,并准备输入到建模函数中的数据Create features and transform features, and then prep data for input into modeling functions

对于 Spark ML 和 MLlib 中基于树的建模函数,必须使用各种技术(如装箱、索引、独热编码和矢量化)准备目标和特征。For tree-based modeling functions from Spark ML and MLlib, you have to prepare target and features by using a variety of techniques, such as binning, indexing, one-hot encoding, and vectorization. 以下是本部分中要遵循的步骤:Here are the procedures to follow in this section:

  1. 通过将小时装箱到交通时间存储桶来创建新特征。Create a new feature by binning hours into traffic time buckets.
  2. 索引和独热编码应用于分类特征。Apply indexing and one-hot encoding to categorical features.
  3. 将数据集采样和拆分为定型和测试分数。Sample and split the data set into training and test fractions.
  4. 指定定型变量和特征,并创建索引或独热编码定型和测试输入标记点弹性分布式数据集 (RDD) 或数据帧。Specify training variable and features, and then create indexed or one-hot encoded training and testing input labeled point resilient distributed datasets (RDDs) or data frames.
  5. 自动对特征和目标进行分类和矢量化,以用作机器学习模型的输入。Automatically categorize and vectorize features and targets to use as inputs for machine learning models.

通过将小时装入交通时间存储桶来创建新特征Create a new feature by binning hours into traffic time buckets

此代码显示如何通过将小时装入交通时间存储桶创建新特征,以及如何在内存中缓存生成的数据帧。This code shows you how to create a new feature by binning hours into traffic time buckets and how to cache the resulting data frame in memory. 重复使用 RDD 和数据帧时,缓存会改善执行时间。Where RDDs and data frames are used repeatedly, caching leads to improved execution times. 相应地,会在以下过程中的多个节点缓存 RDD 和数据帧。Accordingly, you'll cache RDDs and data frames at several stages in the following procedures.

# CREATE FOUR BUCKETS FOR TRAFFIC TIMES
val sqlStatement = """
    SELECT *,
    CASE
     WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night"
     WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush"
     WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
     WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
    END as TrafficTimeBins
    FROM taxi_train
"""
val taxi_df_train_with_newFeatures = sqlContext.sql(sqlStatement)

# CACHE THE DATA FRAME IN MEMORY AND MATERIALIZE THE DATA FRAME IN MEMORY
taxi_df_train_with_newFeatures.cache()
taxi_df_train_with_newFeatures.count()

分类特征的索引和独热编码Indexing and one-hot encoding of categorical features

MLlib 的建模和预测函数需要带有分类输入数据的特征在使用前编制索引或进行编码。The modeling and predict functions of MLlib require features with categorical input data to be indexed or encoded prior to use. 本部分介绍如何对分类特征进行索引和独热编码,输入到建模函数中。This section shows you how to index or encode categorical features for input into the modeling functions.

根据模型,需要以不同方式为其编制索引或进行独热编码。You need to index or encode your models in different ways, depending on the model. 例如,逻辑和线性回归模型需进行独热编码。For example, logistic and linear regression models require one-hot encoding. 例如,具有三个类别的特性可以扩展为三个特征列。For example, a feature with three categories can be expanded into three feature columns. 根据观察类别,每个列将包含 0 或 1 个类别。Each column would contain 0 or 1 depending on the category of an observation. MLlib 为独热编码提供 OneHotEncoderMLlib provides the OneHotEncoder function for one-hot encoding. 此编码器将标签索引列映射到二元向量列,该列最多只有单个值。This encoder maps a column of label indices to a column of binary vectors with at most a single one-value. 使用此编码,可将预期数值特征的算法(如逻辑回归)应用到分类特征。With this encoding, algorithms that expect numerical valued features, such as logistic regression, can be applied to categorical features.

此处,只需转换四个字符串变量来显示示例。Here you transform only four variables to show examples, which are character strings. 还可以将由数值表示的其他变量(例如工作日)编制索引为类别变量。You also can index other variables, such as weekday, represented by numerical values, as categorical variables.

对于索引,请使用 StringIndexer(),对于独热编码,请使用 MLlib 中的 OneHotEncoder() 函数。For indexing, use StringIndexer(), and for one-hot encoding, use OneHotEncoder() functions from MLlib. 下面是用于为分类特征编制索引和编码的代码:Here is the code to index and encode categorical features:

# CREATE INDEXES AND ONE-HOT ENCODED VECTORS FOR SEVERAL CATEGORICAL FEATURES

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# INDEX AND ENCODE VENDOR_ID
val stringIndexer = new StringIndexer().setInputCol("vendor_id").setOutputCol("vendorIndex").fit(taxi_df_train_with_newFeatures)
val indexed = stringIndexer.transform(taxi_df_train_with_newFeatures)
val encoder = new OneHotEncoder().setInputCol("vendorIndex").setOutputCol("vendorVec")
val encoded1 = encoder.transform(indexed)

# INDEX AND ENCODE RATE_CODE
val stringIndexer = new StringIndexer().setInputCol("rate_code").setOutputCol("rateIndex").fit(encoded1)
val indexed = stringIndexer.transform(encoded1)
val encoder = new OneHotEncoder().setInputCol("rateIndex").setOutputCol("rateVec")
val encoded2 = encoder.transform(indexed)

# INDEX AND ENCODE PAYMENT_TYPE
val stringIndexer = new StringIndexer().setInputCol("payment_type").setOutputCol("paymentIndex").fit(encoded2)
val indexed = stringIndexer.transform(encoded2)
val encoder = new OneHotEncoder().setInputCol("paymentIndex").setOutputCol("paymentVec")
val encoded3 = encoder.transform(indexed)

# INDEX AND TRAFFIC TIME BINS
val stringIndexer = new StringIndexer().setInputCol("TrafficTimeBins").setOutputCol("TrafficTimeBinsIndex").fit(encoded3)
val indexed = stringIndexer.transform(encoded3)
val encoder = new OneHotEncoder().setInputCol("TrafficTimeBinsIndex").setOutputCol("TrafficTimeBinsVec")
val encodedFinal = encoder.transform(indexed)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

输出:Output:

运行该单元格的时间:4 秒。Time to run the cell: 4 seconds.

将数据集采样和拆分为定型和测试分数Sample and split the data set into training and test fractions

此代码创建数据的随机采样(本示例中为 25%)。This code creates a random sampling of the data (25%, in this example). 尽管由于数据集的大小限制,本示例不需要进行采样,但本文介绍了如何采样,以便了解如何在需要时针对自己的问题使用它。Although sampling is not required for this example due to the size of the data set, the article shows you how you can sample so that you know how to use it for your own problems when needed. 若样本很大,此操作可在定型模型时节省大量时间。When samples are large, this can save significant time while you train models. 接下来将样本拆分为定型部分(本示例中为 75%)和测试部分(本示例中为 25%),用于分类和回归建模。Next, split the sample into a training part (75%, in this example) and a testing part (25%, in this example) to use in classification and regression modeling.

在每一行(“rand”列)中添加一个随机数(0 和 1 之间),可用于在定型期间选择交叉验证折叠。Add a random number (between 0 and 1) to each row (in a "rand" column) that can be used to select cross-validation folds during training.

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# SPECIFY SAMPLING AND SPLITTING FRACTIONS
val samplingFraction = 0.25;
val trainingFraction = 0.75;
val testingFraction = (1-trainingFraction);
val seed = 1234;
val encodedFinalSampledTmp = encodedFinal.sample(withReplacement = false, fraction = samplingFraction, seed = seed)
val sampledDFcount = encodedFinalSampledTmp.count().toInt

val generateRandomDouble = udf(() => {
    scala.util.Random.nextDouble
})

# ADD A RANDOM NUMBER FOR CROSS-VALIDATION
val encodedFinalSampled = encodedFinalSampledTmp.withColumn("rand", generateRandomDouble());

# SPLIT THE SAMPLED DATA FRAME INTO TRAIN AND TEST, WITH A RANDOM COLUMN ADDED FOR DOING CROSS-VALIDATION (SHOWN LATER)
# INCLUDE A RANDOM COLUMN FOR CREATING CROSS-VALIDATION FOLDS
val splits = encodedFinalSampled.randomSplit(Array(trainingFraction, testingFraction), seed = seed)
val trainData = splits(0)
val testData = splits(1)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

输出:Output:

运行该单元格的时间:2 秒。Time to run the cell: 2 seconds.

指定定型变量和特征,并创建索引或独热编码定型和测试输入标记点 RDD 或数据帧Specify training variable and features, and then create indexed or one-hot encoded training and testing input labeled point RDDs or data frames

本部分包含的代码演示了如何将分类文本数据编制索引为标签点数据类型,并对其编码,以便其可用于定型和测试 MLlib 逻辑回归和其他分类模型。This section contains code that shows you how to index categorical text data as a labeled point data type, and encode it so you can use it to train and test MLlib logistic regression and other classification models. 标签点对象是 RDD,格式为 MLlib 中的大多数机器学习算法所需的输入数据。Labeled point objects are RDDs that are formatted in a way that is needed as input data by most of machine learning algorithms in MLlib. 标签点是本地向量,可能密集,也可能稀疏,与标签/响应相关联。A labeled point is a local vector, either dense or sparse, associated with a label/response.

在此代码中,指定用于定型模型的目标(依赖)变量和特征。In this code, you specify the target (dependent) variable and the features to use to train models. 然后创建索引或独热编码定型和测试输入标记点 RDD 或数据帧。Then, you create indexed or one-hot encoded training and testing input labeled point RDDs or data frames.

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# MAP NAMES OF FEATURES AND TARGETS FOR CLASSIFICATION AND REGRESSION PROBLEMS
val featuresIndOneHot = List("paymentVec", "vendorVec", "rateVec", "TrafficTimeBinsVec", "pickup_hour", "weekday", "passenger_count", "trip_time_in_secs", "trip_distance", "fare_amount").map(encodedFinalSampled.columns.indexOf(_))
val featuresIndIndex = List("paymentIndex", "vendorIndex", "rateIndex", "TrafficTimeBinsIndex", "pickup_hour", "weekday", "passenger_count", "trip_time_in_secs", "trip_distance", "fare_amount").map(encodedFinalSampled.columns.indexOf(_))

# SPECIFY THE TARGET FOR CLASSIFICATION ('tipped') AND REGRESSION ('tip_amount') PROBLEMS
val targetIndBinary = List("tipped").map(encodedFinalSampled.columns.indexOf(_))
val targetIndRegression = List("tip_amount").map(encodedFinalSampled.columns.indexOf(_))

# CREATE INDEXED LABELED POINT RDD OBJECTS
val indexedTRAINbinary = trainData.rdd.map(r => LabeledPoint(r.getDouble(targetIndBinary(0).toInt), Vectors.dense(featuresIndIndex.map(r.getDouble(_)).toArray)))
val indexedTESTbinary = testData.rdd.map(r => LabeledPoint(r.getDouble(targetIndBinary(0).toInt), Vectors.dense(featuresIndIndex.map(r.getDouble(_)).toArray)))
val indexedTRAINreg = trainData.rdd.map(r => LabeledPoint(r.getDouble(targetIndRegression(0).toInt), Vectors.dense(featuresIndIndex.map(r.getDouble(_)).toArray)))
val indexedTESTreg = testData.rdd.map(r => LabeledPoint(r.getDouble(targetIndRegression(0).toInt), Vectors.dense(featuresIndIndex.map(r.getDouble(_)).toArray)))

# CREATE INDEXED DATA FRAMES THAT YOU CAN USE TO TRAIN BY USING SPARK ML FUNCTIONS
val indexedTRAINbinaryDF = indexedTRAINbinary.toDF()
val indexedTESTbinaryDF = indexedTESTbinary.toDF()
val indexedTRAINregDF = indexedTRAINreg.toDF()
val indexedTESTregDF = indexedTESTreg.toDF()

# CREATE ONE-HOT ENCODED (VECTORIZED) DATA FRAMES THAT YOU CAN USE TO TRAIN BY USING SPARK ML FUNCTIONS
val assemblerOneHot = new VectorAssembler().setInputCols(Array("paymentVec", "vendorVec", "rateVec", "TrafficTimeBinsVec", "pickup_hour", "weekday", "passenger_count", "trip_time_in_secs", "trip_distance", "fare_amount")).setOutputCol("features")
val OneHotTRAIN = assemblerOneHot.transform(trainData)
val OneHotTEST = assemblerOneHot.transform(testData)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

输出:Output:

运行该单元格的时间:4 秒。Time to run the cell: 4 seconds.

自动对特征和目标进行分类和矢量化,以用作机器学习模型的输入Automatically categorize and vectorize features and targets to use as inputs for machine learning models

使用 Spark ML 对用于基于树的建模函数的目标和特征进行分类。Use Spark ML to categorize the target and features to use in tree-based modeling functions. 代码完成两个任务:The code completes two tasks:

  • 通过使用阈值 0.5 为 0 到 1 之间的每个数据点分配值 0 或 1,创建用于分类的二元目标。Creates a binary target for classification by assigning a value of 0 or 1 to each data point between 0 and 1 by using a threshold value of 0.5.
  • 自动对特征进行分类。Automatically categorizes features. 如果任何特征的不同数值的数量小于 32,则对该特征进行分类。If the number of distinct numerical values for any feature is less than 32, that feature is categorized.

下面是这两个任务的代码。Here's the code for these two tasks.

# CATEGORIZE FEATURES AND BINARIZE THE TARGET FOR THE BINARY CLASSIFICATION PROBLEM

# TRAIN DATA
val indexer = new VectorIndexer().setInputCol("features").setOutputCol("featuresCat").setMaxCategories(32)
val indexerModel = indexer.fit(indexedTRAINbinaryDF)
val indexedTrainwithCatFeat = indexerModel.transform(indexedTRAINbinaryDF)
val binarizer: Binarizer = new Binarizer().setInputCol("label").setOutputCol("labelBin").setThreshold(0.5)
val indexedTRAINwithCatFeatBinTarget = binarizer.transform(indexedTrainwithCatFeat)

# TEST DATA
val indexerModel = indexer.fit(indexedTESTbinaryDF)
val indexedTrainwithCatFeat = indexerModel.transform(indexedTESTbinaryDF)
val binarizer: Binarizer = new Binarizer().setInputCol("label").setOutputCol("labelBin").setThreshold(0.5)
val indexedTESTwithCatFeatBinTarget = binarizer.transform(indexedTrainwithCatFeat)

# CATEGORIZE FEATURES FOR THE REGRESSION PROBLEM
# CREATE PROPERLY INDEXED AND CATEGORIZED DATA FRAMES FOR TREE-BASED MODELS

# TRAIN DATA
val indexer = new VectorIndexer().setInputCol("features").setOutputCol("featuresCat").setMaxCategories(32)
val indexerModel = indexer.fit(indexedTRAINregDF)
val indexedTRAINwithCatFeat = indexerModel.transform(indexedTRAINregDF)

# TEST DATA
val indexerModel = indexer.fit(indexedTESTbinaryDF)
val indexedTESTwithCatFeat = indexerModel.transform(indexedTESTregDF)

二元分类模型:预测是否应支付小费Binary classification model: Predict whether a tip should be paid

在本部分中,创建三种类型的二元分类模型,用以预测是否应支付小费:In this section, you create three types of binary classification models to predict whether or not a tip should be paid:

  • 使用 Spark ML LogisticRegression() 函数创建逻辑回归模型A logistic regression model by using the Spark ML LogisticRegression() function
  • 使用 Spark ML RandomForestClassifier() 函数创建随机林分类模型A random forest classification model by using the Spark ML RandomForestClassifier() function
  • 使用 MLlib GradientBoostedTrees() 函数创建梯度提升树分类模型A gradient boosting tree classification model by using the MLlib GradientBoostedTrees() function

创建逻辑回归模型Create a logistic regression model

下一步,使用 Spark ML LogisticRegression() 函数创建逻辑回归模型。Next, create a logistic regression model by using the Spark ML LogisticRegression() function. 通过一系列步骤创建模型构建代码:You create the model building code in a series of steps:

  1. 使用一个参数集定型模型数据。Train the model data with one parameter set.
  2. 使用度量值评估测试数据集上的模型Evaluate the model on a test data set with metrics.
  3. 在 Blob 存储中保存模型以供将来使用。Save the model in Blob storage for future consumption.
  4. 根据测试数据对模型进行评分Score the model against test data.
  5. 使用接收者操作特性 (ROC) 曲线绘制结果Plot the results with receiver operating characteristic (ROC) curves.

下面是这些过程的代码:Here's the code for these procedures:

# CREATE A LOGISTIC REGRESSION MODEL
val lr = new LogisticRegression().setLabelCol("tipped").setFeaturesCol("features").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)
val lrModel = lr.fit(OneHotTRAIN)

# PREDICT ON THE TEST DATA SET
val predictions = lrModel.transform(OneHotTEST)

# SELECT `BinaryClassificationEvaluator()` TO COMPUTE THE TEST ERROR
val evaluator = new BinaryClassificationEvaluator().setLabelCol("tipped").setRawPredictionCol("probability").setMetricName("areaUnderROC")
val ROC = evaluator.evaluate(predictions)
println("ROC on test data = " + ROC)

# SAVE THE MODEL
val datestamp = Calendar.getInstance().getTime().toString.replaceAll(" ", ".").replaceAll(":", "_");
val modelName = "LogisticRegression__"
val filename = modelDir.concat(modelName).concat(datestamp)
lrModel.save(filename);

加载、评分和保存结果。Load, score, and save the results.

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# LOAD THE SAVED MODEL AND SCORE THE TEST DATA SET
val savedModel = org.apache.spark.ml.classification.LogisticRegressionModel.load(filename)
println(s"Coefficients: ${savedModel.coefficients} Intercept: ${savedModel.intercept}")

# SCORE THE MODEL ON THE TEST DATA
val predictions = savedModel.transform(OneHotTEST).select("tipped","probability","rawPrediction")
predictions.registerTempTable("testResults")

# SELECT `BinaryClassificationEvaluator()` TO COMPUTE THE TEST ERROR
val evaluator = new BinaryClassificationEvaluator().setLabelCol("tipped").setRawPredictionCol("probability").setMetricName("areaUnderROC")
val ROC = evaluator.evaluate(predictions)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

# PRINT THE ROC RESULTS
println("ROC on test data = " + ROC)

输出:Output:

测试数据的 ROC = 0.9827381497557599ROC on test data = 0.9827381497557599

在本地 Pandas 数据帧上使用 Python 绘制 ROC 曲线。Use Python on local Pandas data frames to plot the ROC curve.

# QUERY THE RESULTS
%%sql -q -o sqlResults
SELECT tipped, probability from testResults


# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline
from sklearn.metrics import roc_curve,auc

sqlResults['probFloat'] = sqlResults.apply(lambda row: row['probability'].values()[0][1], axis=1)
predictions_pddf = sqlResults[["tipped","probFloat"]]

# PREDICT THE ROC CURVE
# predictions_pddf = sqlResults.rename(columns={'_1': 'probability', 'tipped': 'label'})
prob = predictions_pddf["probFloat"]
fpr, tpr, thresholds = roc_curve(predictions_pddf['tipped'], prob, pos_label=1);
roc_auc = auc(fpr, tpr)

# PLOT THE ROC CURVE
plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

输出:Output:

有无小费 ROC 曲线

创建随机林分类模型Create a random forest classification model

下一步,使用 Spark ML RandomForestClassifier() 函数创建随机林分类模型,并根据测试数据评估模型。Next, create a random forest classification model by using the Spark ML RandomForestClassifier() function, and then evaluate the model on test data.

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# CREATE THE RANDOM FOREST CLASSIFIER MODEL
val rf = new RandomForestClassifier().setLabelCol("labelBin").setFeaturesCol("featuresCat").setNumTrees(10).setSeed(1234)

# FIT THE MODEL
val rfModel = rf.fit(indexedTRAINwithCatFeatBinTarget)
val predictions = rfModel.transform(indexedTESTwithCatFeatBinTarget)

# EVALUATE THE MODEL
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val Test_f1Score = evaluator.evaluate(predictions)
println("F1 score on test data: " + Test_f1Score);

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

# CALCULATE BINARY CLASSIFICATION EVALUATION METRICS
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("probability").setMetricName("areaUnderROC")
val ROC = evaluator.evaluate(predictions)
println("ROC on test data = " + ROC)

输出:Output:

测试数据的 ROC = 0.9847103571552683ROC on test data = 0.9847103571552683

创建 GBT 分类模型Create a GBT classification model

下一步,使用 MLlib 的 GradientBoostedTrees() 函数创建 GBT 分类模型,并根据测试数据评估模型。Next, create a GBT classification model by using MLlib's GradientBoostedTrees() function, and then evaluate the model on test data.

# TRAIN A GBT CLASSIFICATION MODEL BY USING MLLIB AND A LABELED POINT

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# DEFINE THE GBT CLASSIFICATION MODEL
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.numIterations = 20
boostingStrategy.treeStrategy.numClasses = 2
boostingStrategy.treeStrategy.maxDepth = 5
boostingStrategy.treeStrategy.categoricalFeaturesInfo = Map[Int, Int]((0,2),(1,2),(2,6),(3,4))

# TRAIN THE MODEL
val gbtModel = GradientBoostedTrees.train(indexedTRAINbinary, boostingStrategy)

# SAVE THE MODEL IN BLOB STORAGE
val datestamp = Calendar.getInstance().getTime().toString.replaceAll(" ", ".").replaceAll(":", "_");
val modelName = "GBT_Classification__"
val filename = modelDir.concat(modelName).concat(datestamp)
gbtModel.save(sc, filename);

# EVALUATE THE MODEL ON TEST INSTANCES AND THE COMPUTE TEST ERROR
val labelAndPreds = indexedTESTbinary.map { point =>
  val prediction = gbtModel.predict(point.features)
  (point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / indexedTRAINbinary.count()
//println("Learned classification GBT model:\n" + gbtModel.toDebugString)
println("Test Error = " + testErr)

# USE BINARY AND MULTICLASS METRICS TO EVALUATE THE MODEL ON THE TEST DATA
val metrics = new MulticlassMetrics(labelAndPreds)
println(s"Precision: ${metrics.precision}")
println(s"Recall: ${metrics.recall}")
println(s"F1 Score: ${metrics.fMeasure}")

val metrics = new BinaryClassificationMetrics(labelAndPreds)
println(s"Area under PR curve: ${metrics.areaUnderPR}")
println(s"Area under ROC curve: ${metrics.areaUnderROC}")

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

# PRINT THE ROC METRIC
println(s"Area under ROC curve: ${metrics.areaUnderROC}")

输出:Output:

ROC 曲线下的面积:0.9846895479241554Area under ROC curve: 0.9846895479241554

回归模型:预测小费金额Regression model: Predict tip amount

在本部分中,创建两种类型的回归模型,预测小费金额:In this section, you create two types of regression models to predict the tip amount:

  • 使用 Spark ML LinearRegression() 函数创建正则化线性回归模型A regularized linear regression model by using the Spark ML LinearRegression() function. 保存此模型,并根据测试数据评估模型。You'll save the model and evaluate the model on test data.
  • 使用 Spark ML GBTRegressor() 函数创建梯度提升树回归模型A gradient-boosting tree regression model by using the Spark ML GBTRegressor() function.

创建正则化线性回归模型Create a regularized linear regression model

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# CREATE A REGULARIZED LINEAR REGRESSION MODEL BY USING THE SPARK ML FUNCTION AND DATA FRAMES
val lr = new LinearRegression().setLabelCol("tip_amount").setFeaturesCol("features").setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8)

# FIT THE MODEL BY USING DATA FRAMES
val lrModel = lr.fit(OneHotTRAIN)
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

# SUMMARIZE THE MODEL OVER THE TRAINING SET AND PRINT METRICS
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: ${trainingSummary.objectiveHistory.toList}")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")

# SAVE THE MODEL IN AZURE BLOB STORAGE
val datestamp = Calendar.getInstance().getTime().toString.replaceAll(" ", ".").replaceAll(":", "_");
val modelName = "LinearRegression__"
val filename = modelDir.concat(modelName).concat(datestamp)
lrModel.save(filename);

# PRINT THE COEFFICIENTS
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

# SCORE THE MODEL ON TEST DATA
val predictions = lrModel.transform(OneHotTEST)

# EVALUATE THE MODEL ON TEST DATA
val evaluator = new RegressionEvaluator().setLabelCol("tip_amount").setPredictionCol("prediction").setMetricName("r2")
val r2 = evaluator.evaluate(predictions)
println("R-sqr on test data = " + r2)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

输出:Output:

运行该单元格的时间:13 秒。Time to run the cell: 13 seconds.

# LOAD A SAVED LINEAR REGRESSION MODEL FROM BLOB STORAGE AND SCORE A TEST DATA SET

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# LOAD A SAVED LINEAR REGRESSION MODEL FROM AZURE BLOB STORAGE
val savedModel = org.apache.spark.ml.regression.LinearRegressionModel.load(filename)
println(s"Coefficients: ${savedModel.coefficients} Intercept: ${savedModel.intercept}")

# SCORE THE MODEL ON TEST DATA
val predictions = savedModel.transform(OneHotTEST).select("tip_amount","prediction")
predictions.registerTempTable("testResults")

# EVALUATE THE MODEL ON TEST DATA
val evaluator = new RegressionEvaluator().setLabelCol("tip_amount").setPredictionCol("prediction").setMetricName("r2")
val r2 = evaluator.evaluate(predictions)
println("R-sqr on test data = " + r2)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

# PRINT THE RESULTS
println("R-sqr on test data = " + r2)

输出:Output:

测试数据的 R-sqr = 0.5960320470835743R-sqr on test data = 0.5960320470835743

下一步,将测试结果作为数据帧进行查询,并使用 AutoVizWidget 和 matplotlib 将其可视化。Next, query the test results as a data frame and use AutoVizWidget and matplotlib to visualize it.

# RUN A SQL QUERY
%%sql -q -o sqlResults
select * from testResults

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# USE THE JUPYTER AUTO-PLOTTING FEATURE TO CREATE INTERACTIVE FIGURES
# CLICK THE TYPE OF PLOT TO GENERATE (LINE, AREA, BAR, AND SO ON)
sqlResults

该代码从查询输出创建本地数据帧,并绘制数据。The code creates a local data frame from the query output and plots the data. %%localmagic 创建本地数据帧sqlResults,可用于使用 matplotlib 进行绘制。The %%local magic creates a local data frame, sqlResults, which you can use to plot with matplotlib.

备注

本文中多次使用此 Spark magic。This Spark magic is used multiple times in this article. 如果数据量很大,应采样创建本地内存可容纳的数据帧。If the amount of data is large, you should sample to create a data frame that can fit in local memory.

使用 Python matplotlib 创建绘图。Create plots by using Python matplotlib.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
sqlResults
%matplotlib inline
import numpy as np

# PLOT THE RESULTS
ax = sqlResults.plot(kind='scatter', figsize = (6,6), x='tip_amount', y='prediction', color='blue', alpha = 0.25, label='Actual vs. predicted');
fit = np.polyfit(sqlResults['tip_amount'], sqlResults['prediction'], deg=1)
ax.set_title('Actual vs. Predicted Tip Amounts ($)')
ax.set_xlabel("Actual")
ax.set_ylabel("Predicted")
#ax.plot(sqlResults['tip_amount'], fit[0] * sqlResults['prediction'] + fit[1], color='magenta')
plt.axis([-1, 15, -1, 8])
plt.show(ax)

输出:Output:

小费金额:实际与预测

创建 GBT 回归模型Create a GBT regression model

使用 Spark ML GBTRegressor() 函数创建 GBT 回归模型,并根据测试数据评估模型。Create a GBT regression model by using the Spark ML GBTRegressor() function, and then evaluate the model on test data.

梯度提升树 (GBTS) 是决策树的整体。Gradient-boosted trees (GBTS) are ensembles of decision trees. GBTS 以迭代方式训练决策树,以便将损失函数最小化。GBTS trains decision trees iteratively to minimize a loss function. 可使用 GBTS 进行回归和分类。You can use GBTS for regression and classification. 其可处理分类特征,不需要特征缩放,并且能够捕获非线性和特征交互。They can handle categorical features, do not require feature scaling, and can capture nonlinearities and feature interactions. 它们还可以在多类分类设置中使用。You also can use them in a multiclass-classification setting.

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# TRAIN A GBT REGRESSION MODEL
val gbt = new GBTRegressor().setLabelCol("label").setFeaturesCol("featuresCat").setMaxIter(10)
val gbtModel = gbt.fit(indexedTRAINwithCatFeat)

# MAKE PREDICTIONS
val predictions = gbtModel.transform(indexedTESTwithCatFeat)

# COMPUTE TEST SET R2
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("r2")
val Test_R2 = evaluator.evaluate(predictions)


# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

# PRINT THE RESULTS
println("Test R-sqr is: " + Test_R2);

输出:Output:

测试 R-sqr 为:0.7655383534596654Test R-sqr is: 0.7655383534596654

用于优化的高级建模实用程序Advanced modeling utilities for optimization

本部分中,使用开发人员经常用于模型优化的机器学习实用程序。In this section, you use machine learning utilities that developers frequently use for model optimization. 具体而言,可以通过使用参数扫描和交叉验证,以三种不同的方式优化机器学习模型:Specifically, you can optimize machine learning models three different ways by using parameter sweeping and cross-validation:

  • 将数据拆分成定型和验证数据集,通过对定型数据集使用超参数扫描优化模型,并且对验证数据集进行评价(线性回归)Split the data into train and validation sets, optimize the model by using hyper-parameter sweeping on a training set, and evaluate on a validation set (linear regression)
  • 使用 Spark ML 的 CrossValidator 函数,通过交叉验证和超参数扫描优化模型(二元分类)Optimize the model by using cross-validation and hyper-parameter sweeping by using Spark ML's CrossValidator function (binary classification)
  • 通过自定义交叉验证和参数扫描代码,使用任何机器学习函数和参数集优化模型(线性回归)Optimize the model by using custom cross-validation and parameter-sweeping code to use any machine learning function and parameter set (linear regression)

交叉验证是一种技术,用于评估在已知数据集上定型的模型的概括性,利用概括性来预测未对其进行定型的数据集的特征。Cross-validation is a technique that assesses how well a model trained on a known set of data will generalize to predict the features of data sets on which it has not been trained. 此技术背后的一般理念是模型在已知数据的数据集上定型,并参照独立数据集测试其预测的准确性。The general idea behind this technique is that a model is trained on a data set of known data, and then the accuracy of its predictions is tested against an independent data set. 此处使用的常见实现是将数据集划分为 K 折叠,然后以轮询机制方式在所有折叠上定型模型(其中一个折叠除外)。A common implementation is to divide a data set into k-folds, and then train the model in a round-robin fashion on all but one of the folds.

超参数优化是为学习算法选择一组超参数的问题,通常目标是优化算法在独立数据集上的性能度量值。Hyper-parameter optimization is the problem of choosing a set of hyper-parameters for a learning algorithm, usually with the goal of optimizing a measure of the algorithm's performance on an independent data set. 超参数是必须在模型定型过程外指定的值。A hyper-parameter is a value that you must specify outside the model training procedure. 关于超参数值的假设可能影响模型的灵活性和准确性。Assumptions about hyper-parameter values can affect the flexibility and accuracy of the model. 例如,决策树具有超参数,如所需的深度和树中的树叶数量。Decision trees have hyper-parameters, for example, such as the desired depth and number of leaves in the tree. 必须为支持向量机 (SVM) 设置错误分类惩罚项。You must set a misclassification penalty term for a support vector machine (SVM).

执行超参数优化的常用方法是使用网格搜索,也称为参数扫描A common way to perform hyper-parameter optimization is to use a grid search, also called a parameter sweep. 在网格搜索中,为学习算法在这些值(超参数空间的指定子集)中执行详尽搜索。In a grid search, an exhaustive search is performed through the values of a specified subset of the hyper-parameter space for a learning algorithm. 交叉验证可提供性能指标,用于为网格搜索算法生成的最佳结果排序。Cross-validation can supply a performance metric to sort out the optimal results produced by the grid search algorithm. 如果使用交叉验证超参数扫描,有助于限制模型过度拟合以定型数据等问题。If you use cross-validation hyper-parameter sweeping, you can help limit problems like overfitting a model to training data. 如此,模型可保留应用于从中提取定型数据的一般数据集的容量。This way, the model retains the capacity to apply to the general set of data from which the training data was extracted.

使用超参数扫描优化线性回归模型Optimize a linear regression model with hyper-parameter sweeping

下一步,将数据拆分成定型和验证数据集,对定型数据集使用超参数扫描,并且对验证数据集进行评价(线性回归)Next, split data into train and validation sets, use hyper-parameter sweeping on a training set to optimize the model, and evaluate on a validation set (linear regression).

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# RENAME `tip_amount` AS A LABEL
val OneHotTRAINLabeled = OneHotTRAIN.select("tip_amount","features").withColumnRenamed(existingName="tip_amount",newName="label")
val OneHotTESTLabeled = OneHotTEST.select("tip_amount","features").withColumnRenamed(existingName="tip_amount",newName="label")
OneHotTRAINLabeled.cache()
OneHotTESTLabeled.cache()

# DEFINE THE ESTIMATOR FUNCTION: `THE LinearRegression()` FUNCTION
val lr = new LinearRegression().setLabelCol("label").setFeaturesCol("features").setMaxIter(10)

# DEFINE THE PARAMETER GRID
val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Array(0.1, 0.01, 0.001)).addGrid(lr.fitIntercept).addGrid(lr.elasticNetParam, Array(0.1, 0.5, 0.9)).build()

# DEFINE THE PIPELINE WITH A TRAIN/TEST VALIDATION SPLIT (75% IN THE TRAINING SET), AND THEN THE SPECIFY ESTIMATOR, EVALUATOR, AND PARAMETER GRID
val trainPct = 0.75
val trainValidationSplit = new TrainValidationSplit().setEstimator(lr).setEvaluator(new RegressionEvaluator).setEstimatorParamMaps(paramGrid).setTrainRatio(trainPct)

# RUN THE TRAIN VALIDATION SPLIT AND CHOOSE THE BEST SET OF PARAMETERS
val model = trainValidationSplit.fit(OneHotTRAINLabeled)

# MAKE PREDICTIONS ON THE TEST DATA BY USING THE MODEL WITH THE COMBINATION OF PARAMETERS THAT PERFORMS THE BEST
val testResults = model.transform(OneHotTESTLabeled).select("label", "prediction")

# COMPUTE TEST SET R2
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("r2")
val Test_R2 = evaluator.evaluate(testResults)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

println("Test R-sqr is: " + Test_R2);

输出:Output:

测试 R-sqr 为:0.6226484708501209Test R-sqr is: 0.6226484708501209

使用交叉验证和超参数扫描优化二元分类模型Optimize the binary classification model by using cross-validation and hyper-parameter sweeping

本部分介绍如何使用交叉验证和超参数扫描优化二元分类模型。This section shows you how to optimize a binary classification model by using cross-validation and hyper-parameter sweeping. 这使用 Spark ML CrossValidator 函数。This uses the Spark ML CrossValidator function.

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# CREATE DATA FRAMES WITH PROPERLY LABELED COLUMNS TO USE WITH THE TRAIN AND TEST SPLIT
val indexedTRAINwithCatFeatBinTargetRF = indexedTRAINwithCatFeatBinTarget.select("labelBin","featuresCat").withColumnRenamed(existingName="labelBin",newName="label").withColumnRenamed(existingName="featuresCat",newName="features")
val indexedTESTwithCatFeatBinTargetRF = indexedTESTwithCatFeatBinTarget.select("labelBin","featuresCat").withColumnRenamed(existingName="labelBin",newName="label").withColumnRenamed(existingName="featuresCat",newName="features")
indexedTRAINwithCatFeatBinTargetRF.cache()
indexedTESTwithCatFeatBinTargetRF.cache()

# DEFINE THE ESTIMATOR FUNCTION
val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setImpurity("gini").setSeed(1234).setFeatureSubsetStrategy("auto").setMaxBins(32)

# DEFINE THE PARAMETER GRID
val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth, Array(4,8)).addGrid(rf.numTrees, Array(5,10)).addGrid(rf.minInstancesPerNode, Array(100,300)).build()

# SPECIFY THE NUMBER OF FOLDS
val numFolds = 3

# DEFINE THE TRAIN/TEST VALIDATION SPLIT (75% IN THE TRAINING SET)
val CrossValidator = new CrossValidator().setEstimator(rf).setEvaluator(new BinaryClassificationEvaluator).setEstimatorParamMaps(paramGrid).setNumFolds(numFolds)

# RUN THE TRAIN VALIDATION SPLIT AND CHOOSE THE BEST SET OF PARAMETERS
val model = CrossValidator.fit(indexedTRAINwithCatFeatBinTargetRF)

# MAKE PREDICTIONS ON THE TEST DATA BY USING THE MODEL WITH THE COMBINATION OF PARAMETERS THAT PERFORMS THE BEST
val testResults = model.transform(indexedTESTwithCatFeatBinTargetRF).select("label", "prediction")

# COMPUTE THE TEST F1 SCORE
val evaluator = new MulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("f1")
val Test_f1Score = evaluator.evaluate(testResults)

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");

输出:Output:

运行该单元格的时间:33 秒。Time to run the cell: 33 seconds.

使用自定义交叉验证和参数扫描代码优化线性回归模型Optimize the linear regression model by using custom cross-validation and parameter-sweeping code

接下来,通过使用自定义代码优化模型,并通过使用最高准确性条件标识最佳模型参数。Next, optimize the model by using custom code, and identify the best model parameters by using the criterion of highest accuracy. 然后,创建最终模型,根据测试数据评估模型,并在 Blob 存储中保存此模型。Then, create the final model, evaluate the model on test data, and save the model in Blob storage. 最后,加载模型、测试数据评分并评估准确性。Finally, load the model, score test data, and evaluate accuracy.

# RECORD THE START TIME
val starttime = Calendar.getInstance().getTime()

# DEFINE THE PARAMETER GRID AND THE NUMBER OF FOLDS
val paramGrid = new ParamGridBuilder().addGrid(rf.maxDepth, Array(5,10)).addGrid(rf.numTrees, Array(10,25,50)).build()

val nFolds = 3
val numModels = paramGrid.size
val numParamsinGrid = 2

# SPECIFY THE NUMBER OF CATEGORIES FOR CATEGORICAL VARIABLES
val categoricalFeaturesInfo = Map[Int, Int]((0,2),(1,2),(2,6),(3,4))

var maxDepth = -1
var numTrees = -1
var param = ""
var paramval = -1
var validateLB = -1.0
var validateUB = -1.0
val h = 1.0 / nFolds;
val RMSE  = Array.fill(numModels)(0.0)

# CREATE K-FOLDS
val splits = MLUtils.kFold(indexedTRAINbinary, numFolds = nFolds, seed=1234)


# LOOP THROUGH K-FOLDS AND THE PARAMETER GRID TO GET AND IDENTIFY THE BEST PARAMETER SET BY LEVEL OF ACCURACY
for (i <- 0 to (nFolds-1)) {
    validateLB = i * h
    validateUB = (i + 1) * h
    val validationCV = trainData.filter($"rand" >= validateLB  && $"rand" < validateUB)
    val trainCV = trainData.filter($"rand" < validateLB  || $"rand" >= validateUB)
    val validationLabPt = validationCV.rdd.map(r => LabeledPoint(r.getDouble(targetIndRegression(0).toInt), Vectors.dense(featuresIndIndex.map(r.getDouble(_)).toArray)));
    val trainCVLabPt = trainCV.rdd.map(r => LabeledPoint(r.getDouble(targetIndRegression(0).toInt), Vectors.dense(featuresIndIndex.map(r.getDouble(_)).toArray)));
    validationLabPt.cache()
    trainCVLabPt.cache()

    for (nParamSets <- 0 to (numModels-1)) {
        for (nParams <- 0 to (numParamsinGrid-1)) {
            param = paramGrid(nParamSets).toSeq(nParams).param.toString.split("__")(1)
            paramval = paramGrid(nParamSets).toSeq(nParams).value.toString.toInt
            if (param == "maxDepth") {maxDepth = paramval}
            if (param == "numTrees") {numTrees = paramval}
        }
        val rfModel = RandomForest.trainRegressor(trainCVLabPt, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                                  numTrees=numTrees, maxDepth=maxDepth,
                                                  featureSubsetStrategy="auto",impurity="variance", maxBins=32)
        val labelAndPreds = validationLabPt.map { point =>
                                                 val prediction = rfModel.predict(point.features)
                                                  ( prediction, point.label )
                                                }
        val validMetrics = new RegressionMetrics(labelAndPreds)
        val rmse = validMetrics.rootMeanSquaredError
        RMSE(nParamSets) += rmse
    }
    validationLabPt.unpersist();
    trainCVLabPt.unpersist();
}
val minRMSEindex = RMSE.indexOf(RMSE.min)

# GET THE BEST PARAMETERS FROM A CROSS-VALIDATION AND PARAMETER SWEEP
var best_maxDepth = -1
var best_numTrees = -1
for (nParams <- 0 to (numParamsinGrid-1)) {
    param = paramGrid(minRMSEindex).toSeq(nParams).param.toString.split("__")(1)
    paramval = paramGrid(minRMSEindex).toSeq(nParams).value.toString.toInt
    if (param == "maxDepth") {best_maxDepth = paramval}
    if (param == "numTrees") {best_numTrees = paramval}
}

# CREATE THE BEST MODEL WITH THE BEST PARAMETERS AND A FULL TRAINING DATA SET
val best_rfModel = RandomForest.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                                  numTrees=best_numTrees, maxDepth=best_maxDepth,
                                                  featureSubsetStrategy="auto",impurity="variance", maxBins=32)

# SAVE THE BEST RANDOM FOREST MODEL IN BLOB STORAGE
val datestamp = Calendar.getInstance().getTime().toString.replaceAll(" ", ".").replaceAll(":", "_");
val modelName = "BestCV_RF_Regression__"
val filename = modelDir.concat(modelName).concat(datestamp)
best_rfModel.save(sc, filename);

# PREDICT ON THE TRAINING SET WITH THE BEST MODEL AND THEN EVALUATE
val labelAndPreds = indexedTESTreg.map { point =>
                                        val prediction = best_rfModel.predict(point.features)
                                        ( prediction, point.label )
                                       }

val test_rmse = new RegressionMetrics(labelAndPreds).rootMeanSquaredError
val test_rsqr = new RegressionMetrics(labelAndPreds).r2

# GET THE TIME TO RUN THE CELL
val endtime = Calendar.getInstance().getTime()
val elapsedtime =  ((endtime.getTime() - starttime.getTime())/1000).toString;
println("Time taken to run the above cell: " + elapsedtime + " seconds.");


# LOAD THE MODEL
val savedRFModel = RandomForestModel.load(sc, filename)

val labelAndPreds = indexedTESTreg.map { point =>
                                        val prediction = savedRFModel.predict(point.features)
                                        ( prediction, point.label )
                                       }
# TEST THE MODEL
val test_rmse = new RegressionMetrics(labelAndPreds).rootMeanSquaredError
val test_rsqr = new RegressionMetrics(labelAndPreds).r2

输出:Output:

运行该单元格的时间:61 秒。Time to run the cell: 61 seconds.

通过 Scala 自动使用 Spark 构建的机器学习模型Consume Spark-built machine learning models automatically with Scala

获取主题概述,了解包含在 Azure 中的数据科学过程的任务,请参阅团队数据科学过程For an overview of topics that walk you through the tasks that comprise the Data Science process in Azure, see Team Data Science Process.

Team Data Science Process 演练针对特定方案,介绍了其他端到端演练,演示 Team Data Science Process 中的步骤。Team Data Science Process walkthroughs describes other end-to-end walkthroughs that demonstrate the steps in the Team Data Science Process for specific scenarios. 该演练还展示了如何将云、本地工具以及服务结合到一个工作流或管道中,以创建智能应用程序。The walkthroughs also illustrate how to combine cloud and on-premises tools and services into a workflow or pipeline to create an intelligent application.

为 Spark 构建的机器学习模型评分展示如何使用 Scala 代码通过内置在 Spark 中且保存在 Azure Blob 存储中的机器学习模型自动加载数据并对新数据评分。Score Spark-built machine learning models shows you how to use Scala code to automatically load and score new data sets with machine learning models built in Spark and saved in Azure Blob storage. 可以按照文中提供的说明进行操作,只需使用本文中的 Scala 代码替换 Python 代码,便可自动使用。You can follow the instructions provided there, and simply replace the Python code with Scala code in this article for automated consumption.