使用 Apache Spark MLlib 生成机器学习应用程序并分析数据集Use Apache Spark MLlib to build a machine learning application and analyze a dataset

了解如何使用 Apache Spark MLlib 创建机器学习应用程序,以便对打开的数据集执行简单预测分析。Learn how to use Apache Spark MLlib to create a machine learning application to do simple predictive analysis on an open dataset. 本示例摘自 Spark 的内置机器学习库,它通过逻辑回归使用分类。From Spark's built-in machine learning libraries, this example uses classification through logistic regression.

MLlib 是一个核心 Spark 库,它提供了许多可用于机器学习任务的实用工具,包括适用于以下任务的实用工具:MLlib is a core Spark library that provides many utilities useful for machine learning tasks, including utilities that are suitable for:

  • 分类Classification
  • 回归Regression
  • 群集功能Clustering
  • 主题建模Topic modeling
  • 单值分解 (SVD) 和主体组件分析 (PCA)Singular value decomposition (SVD) and principal component analysis (PCA)
  • 假设测试和计算示例统计信息Hypothesis testing and calculating sample statistics

了解分类和逻辑回归Understand classification and logistic regression

“分类”是一种很常见的机器学习任务,是将输入数据归入各类别的过程。Classification, a popular machine learning task, is the process of sorting input data into categories. 分类算法的作用是找出如何为提供的输入数据分配“标签”。It is the job of a classification algorithm to figure out how to assign "labels" to input data that you provide. 例如,可以联想机器学习算法,该算法接受股票信息作为输入并将股票划分为两个类别:应该卖出的股票和应该保留的股票。For example, you could think of a machine learning algorithm that accepts stock information as input and divides the stock into two categories: stocks that you should sell and stocks that you should keep.

逻辑回归是用于分类的算法。Logistic regression is the algorithm that you use for classification. Spark 的逻辑回归 API 可用于 二元分类,或将输入数据归类到两组中的一组。Spark's logistic regression API is useful for binary classification, or classifying input data into one of two groups. 有关逻辑回归的详细信息,请参阅 维基百科For more information about logistic regressions, see Wikipedia.

总之,逻辑回归过程会产生“逻辑函数” ,该函数可用于预测输入向量属于其中一个组的概率。In summary, the process of logistic regression produces a logistic function that can be used to predict the probability that an input vector belongs in one group or the other.

对食品检测数据进行预测分析的示例Predictive analysis example on food inspection data

本示例使用 Spark 对食品检测数据 (Food_Inspections1.csv) 执行一些预测分析,这些数据通过 City of Shanghai data portal(上海市数据门户)获取。In this example, you use Spark to perform some predictive analysis on food inspection data (Food_Inspections1.csv) that was acquired through the City of Shanghai data portal. 此数据集包含在上海执行的食品检验的相关信息,包括被检查的每个食品机构的信息、发现的违规行为(如果有)以及检验结果。This dataset contains information about food establishment inspections that were conducted in Shanghai, including information about each establishment, the violations found (if any), and the results of the inspection. CSV 数据文件在与群集(位于 /HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv)关联的存储帐户中可用。The CSV data file is already available in the storage account associated with the cluster at /HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv.

在下面的步骤中,将开发一个模型来了解决定食品检验通过或不通过的因素。In the steps below, you develop a model to see what it takes to pass or fail a food inspection.

创建 Apache Spark MLlib 机器学习应用Create an Apache Spark MLlib machine learning app

  1. 使用 PySpark 内核创建 Jupyter Notebook。Create a Jupyter notebook using the PySpark kernel. 有关说明,请参阅创建 Jupyter NotebookFor the instructions, see Create a Jupyter notebook.

  2. 导入此应用程序所需的类型。Import the types required for this application. 将以下代码复制并粘贴到空白单元格中,然后按 SHIFT + ENTERCopy and paste the following code into an empty cell, and then press SHIFT + ENTER.

    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row
    from pyspark.sql.functions import UserDefinedFunction
    from pyspark.sql.types import *
    

    由于使用的是 PySpark 内核,因此不需要显式创建任何上下文。Because of the PySpark kernel, you do not need to create any contexts explicitly. 运行第一个代码单元格时,系统会自动创建 Spark 和 Hive 上下文。The Spark and Hive contexts are automatically created for you when you run the first code cell.

构造输入数据帧Construct the input dataframe

由于原始数据是 CSV 格式,因此可以使用 Spark 上下文将文件拉取到内存中作为非结构化文本,并使用 Python 的 CSV 库分析每行数据。Because the raw data is in a CSV format, you can use the Spark context to pull the file into memory as unstructured text, and then use Python's CSV library to parse each line of the data.

  1. 运行以下命令行,通过导入并分析输入数据来创建弹性分布式数据集 (RDD)。Run the following lines to create a Resilient Distributed Dataset (RDD) by importing and parsing the input data.

    def csvParse(s):
        import csv
        from StringIO import StringIO
        sio = StringIO(s)
        value = csv.reader(sio).next()
        sio.close()
        return value
    
    inspections = sc.textFile('/HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv')\
                    .map(csvParse)
    
  2. 运行以下代码检索 RDD 中的一行,以便可以查看数据架构:Run the following code to retrieve one row from the RDD, so you can take a look of the data schema:

    inspections.take(1)
    

    输出为:The output is:

    [['413707',
        'LUNA PARK INC',
        'LUNA PARK  DAY CARE',
        '2049789',
        "Children's Services Facility",
        'Risk 1 (High)',
        '3250 W FOSTER AVE ',
        'CHICAGO',
        'IL',
        '60625',
        '09/21/2010',
        'License-Task Force',
        'Fail',
        '24. DISH WASHING FACILITIES: PROPERLY DESIGNED, CONSTRUCTED, MAINTAINED, INSTALLED, LOCATED AND OPERATED - Comments: All dishwashing machines must be of a type that complies with all requirements of the plumbing section of the Municipal Code of Chicago and Rules and Regulation of the Board of Health. OBSEVERD THE 3 COMPARTMENT SINK BACKING UP INTO THE 1ST AND 2ND COMPARTMENT WITH CLEAR WATER AND SLOWLY DRAINING OUT. INST NEED HAVE IT REPAIR. CITATION ISSUED, SERIOUS VIOLATION 7-38-030 H000062369-10 COURT DATE 10-28-10 TIME 1 P.M. ROOM 107 400 W. SURPERIOR. | 36. LIGHTING: REQUIRED MINIMUM FOOT-CANDLES OF LIGHT PROVIDED, FIXTURES SHIELDED - Comments: Shielding to protect against broken glass falling into food shall be provided for all artificial lighting sources in preparation, service, and display facilities. LIGHT SHIELD ARE MISSING UNDER HOOD OF  COOKING EQUIPMENT AND NEED TO REPLACE LIGHT UNDER UNIT. 4 LIGHTS ARE OUT IN THE REAR CHILDREN AREA,IN THE KINDERGARDEN CLASS ROOM. 2 LIGHT ARE OUT EAST REAR, LIGHT FRONT WEST ROOM. NEED TO REPLACE ALL LIGHT THAT ARE NOT WORKING. | 35. WALLS, CEILINGS, ATTACHED EQUIPMENT CONSTRUCTED PER CODE: GOOD REPAIR, SURFACES CLEAN AND DUST-LESS CLEANING METHODS - Comments: The walls and ceilings shall be in good repair and easily cleaned. MISSING CEILING TILES WITH STAINS IN WEST,EAST, IN FRONT AREA WEST, AND BY THE 15MOS AREA. NEED TO BE REPLACED. | 32. FOOD AND NON-FOOD CONTACT SURFACES PROPERLY DESIGNED, CONSTRUCTED AND MAINTAINED - Comments: All food and non-food contact equipment and utensils shall be smooth, easily cleanable, and durable, and shall be in good repair. SPLASH GUARDED ARE NEEDED BY THE EXPOSED HAND SINK IN THE KITCHEN AREA | 34. FLOORS: CONSTRUCTED PER CODE, CLEANED, GOOD REPAIR, COVING INSTALLED, DUST-LESS CLEANING METHODS USED - Comments: The floors shall be constructed per code, be smooth and easily cleaned, and be kept clean and in good repair. INST NEED TO ELEVATE ALL FOOD ITEMS 6INCH OFF THE FLOOR 6 INCH AWAY FORM WALL.  ',
        '41.97583445690982',
        '-87.7107455232781',
        '(41.97583445690982, -87.7107455232781)']]
    

    通过这些输出可以了解输入文件的架构。The output gives you an idea of the schema of the input file. 该文件包含每个机构的名称、机构的类型、地址、检测数据和位置等等。It includes the name of every establishment, the type of establishment, the address, the data of the inspections, and the location, among other things.

  3. 运行以下代码创建数据帧 (df) 和临时表 (CountResults),其中包含一些可用于预测分析的列。Run the following code to create a dataframe (df) and a temporary table (CountResults) with a few columns that are useful for the predictive analysis. sqlContext 用于对结构化数据执行转换。is used to perform transformations on structured data.

    schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("results", StringType(), False),
    StructField("violations", StringType(), True)])
    
    df = spark.createDataFrame(inspections.map(lambda l: (int(l[0]), l[1], l[12], l[13])) , schema)
    df.registerTempTable('CountResults')
    

    数据帧中的相关四个列为 idnameresultsviolationsThe four columns of interest in the dataframe are id, name, results, and violations.

  4. 运行以下代码获取数据小样本:Run the following code to get a small sample of the data:

    df.show(5)
    

    输出为:The output is:

    +------+--------------------+-------+--------------------+
    |    id|                name|results|          violations|
    +------+--------------------+-------+--------------------+
    |413707|       LUNA PARK INC|   Fail|24. DISH WASHING ...|
    |391234|       CAFE SELMARIE|   Fail|2. FACILITIES TO ...|
    |413751|          MANCHU WOK|   Pass|33. FOOD AND NON-...|
    |413708|BENCHMARK HOSPITA...|   Pass|                    |
    |413722|           JJ BURGER|   Pass|                    |
    +------+--------------------+-------+--------------------+
    

了解数据Understand the data

让我们开始了解数据集包含的内容。Let's start to get a sense of what the dataset contains.

  1. 运行以下代码,显示 results 列中的非重复值:Run the following code to show the distinct values in the results column:

    df.select('results').distinct().show()
    

    输出为:The output is:

    +--------------------+
    |             results|
    +--------------------+
    |                Fail|
    |Business Not Located|
    |                Pass|
    |  Pass w/ Conditions|
    |     Out of Business|
    +--------------------+
    
  2. 运行以下代码来可视化这些结果的分布:Run the following code to visualize the distribution of these results:

    %%sql -o countResultsdf
    SELECT COUNT(results) AS cnt, results FROM CountResults GROUP BY results
    

    后接 -o countResultsdf%%sql magic 可确保查询输出本地保存在 Jupyter 服务器上(通常在群集的头节点)。The %%sql magic followed by -o countResultsdf ensures that the output of the query is persisted locally on the Jupyter server (typically the headnode of the cluster). 输出将保存为具有指定名称 countResultsdfPandas数据帧。The output is persisted as a Pandas dataframe with the specified name countResultsdf. 有关 %%sql magic 以及可在 PySpark 内核中使用的其他 magic 的详细信息,请参阅包含 Apache Spark HDInsight 群集的 Jupyter Notebook 上可用的内核For more information about the %%sql magic, and other magics available with the PySpark kernel, see Kernels available on Jupyter notebooks with Apache Spark HDInsight clusters.

    输出为:The output is:

    SQL 查询输出SQL query output

  3. 也可以使用 Matplotlib(用于构造数据可视化的库)创建绘图。You can also use Matplotlib, a library used to construct visualization of data, to create a plot. 因为必须从本地保存的 countResultsdf 数据帧中创建绘图,所以代码片段必须以 %%local magic 开头。Because the plot must be created from the locally persisted countResultsdf dataframe, the code snippet must begin with the %%local magic. 这可确保代码在 Jupyter 服务器上本地运行。This ensures that the code is run locally on the Jupyter server.

    %%local
    %matplotlib inline
    import matplotlib.pyplot as plt
    
    labels = countResultsdf['results']
    sizes = countResultsdf['cnt']
    colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
    plt.axis('equal')
    

    输出为:The output is:

    Spark 机器学习应用程序输出 - 包含五种不同检测结果的饼图Spark machine learning application output - pie chart with five distinct inspection results

    若要预测食物检测结果,需要基于违规行为开发一个模型。To predict a food inspection outcome, you need to develop a model based on the violations. 由于逻辑回归是二元分类方法,因此有必要将结果数据分为两个类别:“失败”和“通过”:Because logistic regression is a binary classification method, it makes sense to group the result data into two categories: Fail and Pass:

    • 通过Pass

      • 通过Pass
      • 有条件通过Pass w/ conditions
    • 失败Fail

      • 失败Fail
    • 弃用Discard

      • 未找到企业Business not located
      • 停业Out of Business

      附带其他结果(“未找到企业”或“停止经营”)的数据没有作用,不过它们在结果中占据极小的百分比。Data with the other results ("Business Not Located" or "Out of Business") are not useful, and they make up a very small percentage of the results anyway.

  4. 运行以下代码,将现有数据帧 (df) 转换为新的数据帧,其中每个检测以“违规行为标签对”表示。Run the following code to convert the existing dataframe(df) into a new dataframe where each inspection is represented as a label-violations pair. 在本例中,0.0 标签表示失败,1.0 标签表示成功,-1.0 标签表示除了这两个以外的结果。In this case, a label of 0.0 represents a failure, a label of 1.0 represents a success, and a label of -1.0 represents some results besides those two.

    def labelForResults(s):
        if s == 'Fail':
            return 0.0
        elif s == 'Pass w/ Conditions' or s == 'Pass':
            return 1.0
        else:
            return -1.0
    label = UserDefinedFunction(labelForResults, DoubleType())
    labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')
    
  5. 运行以下代码显示一行带有标签的数据:Run the following code to show one row of the labeled data:

    labeledData.take(1)
    

    输出为:The output is:

    [Row(label=0.0, violations=u"41. PREMISES MAINTAINED FREE OF LITTER, UNNECESSARY ARTICLES, CLEANING  EQUIPMENT PROPERLY STORED - Comments: All parts of the food establishment and all parts of the property used in connection with the operation of the establishment shall be kept neat and clean and should not produce any offensive odors.  REMOVE MATTRESS FROM SMALL DUMPSTER. | 35. WALLS, CEILINGS, ATTACHED EQUIPMENT CONSTRUCTED PER CODE: GOOD REPAIR, SURFACES CLEAN AND DUST-LESS CLEANING METHODS - Comments: The walls and ceilings shall be in good repair and easily cleaned.  REPAIR MISALIGNED DOORS AND DOOR NEAR ELEVATOR.  DETAIL CLEAN BLACK MOLD LIKE SUBSTANCE FROM WALLS BY BOTH DISH MACHINES.  REPAIR OR REMOVE BASEBOARD UNDER DISH MACHINE (LEFT REAR KITCHEN). SEAL ALL GAPS.  REPLACE MILK CRATES USED IN WALK IN COOLERS AND STORAGE AREAS WITH PROPER SHELVING AT LEAST 6' OFF THE FLOOR.  | 38. VENTILATION: ROOMS AND EQUIPMENT VENTED AS REQUIRED: PLUMBING: INSTALLED AND MAINTAINED - Comments: The flow of air discharged from kitchen fans shall always be through a duct to a point above the roofline.  REPAIR BROKEN VENTILATION IN MEN'S AND WOMEN'S WASHROOMS NEXT TO DINING AREA. | 32. FOOD AND NON-FOOD CONTACT SURFACES PROPERLY DESIGNED, CONSTRUCTED AND MAINTAINED - Comments: All food and non-food contact equipment and utensils shall be smooth, easily cleanable, and durable, and shall be in good repair.  REPAIR DAMAGED PLUG ON LEFT SIDE OF 2 COMPARTMENT SINK.  REPAIR SELF CLOSER ON BOTTOM LEFT DOOR OF 4 DOOR PREP UNIT NEXT TO OFFICE.")]
    

从输入数据帧创建逻辑回归模型Create a logistic regression model from the input dataframe

最后一项任务是将标签数据转换为逻辑回归可分析的格式。The final task is to convert the labeled data into a format that can be analyzed by logistic regression. 逻辑回归算法的输入需是一组标签特征矢量对,其中的“特征矢量”是表示输入点的数字矢量。The input to a logistic regression algorithm needs be a set of label-feature vector pairs, where the "feature vector" is a vector of numbers representing the input point. 因此,需要将“违规行为”列(半结构化,并且包含许多任意文本格式的注释)转换为计算机能轻松理解的实数组。So, you need to convert the "violations" column, which is semi-structured and contains many comments in free-text, to an array of real numbers that a machine could easily understand.

用于处理自然语言的一种标准机器学习方法是为每个不同的单词分配一个“索引”,并将一个向量传递给机器学习算法,以使每个索引的值包含该词在文本字符串中的相对频率。One standard machine learning approach for processing natural language is to assign each distinct word an "index", and then pass a vector to the machine learning algorithm such that each index's value contains the relative frequency of that word in the text string.

MLlib 提供了执行此操作的一种简单方法。MLlib provides an easy way to perform this operation. 首先,“标记”每个违规字符串以获取每个字符串中的各个单词。First, "tokenize" each violations string to get the individual words in each string. 然后使用 HashingTF 将每组标记转换为特征向量,随后可将其传递给逻辑回归算法以构建模型。Then, use a HashingTF to convert each set of tokens into a feature vector that can then be passed to the logistic regression algorithm to construct a model. 利用“管道”按序列执行上述所有步骤。You conduct all of these steps in sequence using a "pipeline".

tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)

使用另一个数据集评估模型Evaluate the model using another dataset

可使用先前创建的模型基于所观察到的违规行为来预测后续检查将产生哪些结果。You can use the model you created earlier to predict what the results of new inspections will be, based on the violations that were observed. 通过数据集 Food_Inspections1.csv 来训练此模型。You trained this model on the dataset Food_Inspections1.csv. 可以使用另一个数据集 Food_Inspections2.csv 来评估此模型对新数据的功能性。You can use a second dataset, Food_Inspections2.csv, to evaluate the strength of this model on the new data. 第二个数据集 (Food_Inspections2.csv) 位于与群集关联的默认存储容器中。This second data set (Food_Inspections2.csv) is in the default storage container associated with the cluster.

  1. 运行以下代码创建新的数据帧 predictionsDf,其中包含由模型生成的预测。Run the following code to create a new dataframe, predictionsDf that contains the prediction generated by the model. 该代码段还基于数据帧创建名为 Predictions 的临时表。The snippet also creates a temporary table called Predictions based on the dataframe.

    testData = sc.textFile('wasbs:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections2.csv')\
                .map(csvParse) \
                .map(lambda l: (int(l[0]), l[1], l[12], l[13]))
    testDf = spark.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
    predictionsDf = model.transform(testDf)
    predictionsDf.registerTempTable('Predictions')
    predictionsDf.columns
    

    应该看到如下输出:You should see an output like the following:

    ['id',
        'name',
        'results',
        'violations',
        'words',
        'features',
        'rawPrediction',
        'probability',
        'prediction']
    
  2. 查看其中的一个预测。Look at one of the predictions. 运行此代码段:Run this snippet:

    predictionsDf.take(1)
    

    显示了对测试数据集中第一个条目的预测。There is a prediction for the first entry in the test data set.

  3. model.transform() 方法对具有相同架构的任何新数据应用相同的转换,并得出如何对数据进行分类的预测。The model.transform() method applies the same transformation to any new data with the same schema, and arrive at a prediction of how to classify the data. 可执行一些简单的统计,以了解预测的准确度:You can do some simple statistics to get a sense of how accurate the predictions were:

    numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR
                                            (prediction = 1 AND (results = 'Pass' OR
                                                                results = 'Pass w/ Conditions'))""").count()
    numInspections = predictionsDf.count()
    
    print "There were", numInspections, "inspections and there were", numSuccesses, "successful predictions"
    print "This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate"
    

    输出如下所示:The output looks like the following:

    There were 9315 inspections and there were 8087 successful predictions
    This is a 86.8169618894% success rate
    

    将逻辑回归与 Spark 配合使用可得到关于违规行为(中文)描述和给定企业是通过还是未通过食物检测之间关系的准确模型。Using logistic regression with Spark gives you an accurate model of the relationship between violations descriptions in English and whether a given business would pass or fail a food inspection.

创建预测的可视化表示形式Create a visual representation of the prediction

现在可以构造一个最终可视化效果,以帮助推理此测试的结果。You can now construct a final visualization to help you reason about the results of this test.

  1. 首先,提取之前创建的“Predictions”临时表中的不同预测和结果。You start by extracting the different predictions and results from the Predictions temporary table created earlier. 以下查询将输出分为 true_positivefalse_positivetrue_negativefalse_negativeThe following queries separate the output as true_positive, false_positive, true_negative, and false_negative. 在以下查询中,使用 -q 关闭可视化,同时使用 -o 将输出保存为随后可用于 %%local 幻数的数据帧。In the queries below, you turn off visualization by using -q and also save the output (by using -o) as dataframes that can be then used with the %%local magic.

    %%sql -q -o true_positive
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND results = 'Fail'
    
    %%sql -q -o false_positive
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND (results = 'Pass' OR results = 'Pass w/ Conditions')
    
    %%sql -q -o true_negative
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND results = 'Fail'
    
    %%sql -q -o false_negative
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND (results = 'Pass' OR results = 'Pass w/ Conditions')
    
  2. 最后,使用以下代码段通过 Matplotlib生成绘图。Finally, use the following snippet to generate the plot using Matplotlib.

    %%local
    %matplotlib inline
    import matplotlib.pyplot as plt
    
    labels = ['True positive', 'False positive', 'True negative', 'False negative']
    sizes = [true_positive['cnt'], false_positive['cnt'], false_negative['cnt'], true_negative['cnt']]
    colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
    plt.axis('equal')
    

    应会看到以下输出:You should see the following output:

    Spark 机器学习应用程序输出 - 显示失败食品检测结果百分比的饼图Spark machine learning application output - pie chart percentages of failed food inspections.

    在该图中,“正”的结果指未通过食品检验,而“负”的结果指通过检验。In this chart, a "positive" result refers to the failed food inspection, while a negative result refers to a passed inspection.

关闭笔记本Shut down the notebook

完成运行应用程序之后,应该要关闭 Notebook 以释放资源。After you have finished running the application, you should shut down the notebook to release the resources. 为此,请在 Notebook 的“文件”菜单中选择“关闭并停止”。To do so, from the File menu on the notebook, select Close and Halt. 这会关闭 Notebook。This shuts down and closes the notebook.

另请参阅See also

方案Scenarios

创建和运行应用程序Create and run applications

工具和扩展Tools and extensions

管理资源Manage resources