安装和编译 Cython Install and compile Cython

本文档介绍如何使用已编译的 Cython 代码运行 Spark 代码。This document will explain how to run Spark code with compiled Cython code. 步骤如下:The steps are as follows:

  1. DBFS 上创建一个示例 cython 模块。Creates an example cython module on DBFS.
  2. 将文件添加到 SparkSession。Adds the file to the SparkSession.
  3. 创建包装方法以在执行器上加载模块。Creates a wrapper method to load the module on the executors.
  4. 在示例数据集上运行映射器。Runs the mapper on a sample dataset.
  5. 生成更大的数据集,并将性能与本机 Python 示例进行比较。Generate a larger dataset and compare the performance with native Python example.

备注

默认情况下,如果未引用任何协议,路径将使用 dbfs:/By default, paths use dbfs:/ if no protocol is referenced.

# Write an example cython module to /example/cython/fib.pyx in DBFS.
dbutils.fs.put("/example/cython/fib.pyx", """
def fib_mapper_cython(n):
    '''
    Return the first fibonnaci number > n.
    '''
    cdef int a = 0
    cdef int b = 1
    cdef int j = int(n)
    while b<j:
        a, b  = b, a+b
    return b, 1
""", True)

# Write an example input file to /example/cython/input.txt in DBFS.
# Every line of this file is an integer.
dbutils.fs.put("/example/cython_input/input.txt", """
1
10
100
""", True)

# Take a look at the example input.
dbutils.fs.head("/example/cython_input/input.txt")

将 Cython 源文件添加到 SparkAdd Cython Source Files to Spark

为了使 Cython 源文件在群集中可用,我们将使用 sc.addPyFile 将这些文件添加到 Spark 中。To make the Cython source files available across the cluster, we will use sc.addPyFile to add these files to Spark. 例如,For example,

sc.addPyFile("dbfs:/example/cython/fib.pyx")

在驱动程序节点上测试 Cython 编译Test Cython compilation on the driver node

此代码将首先在驱动程序节点上测试编译。This code will test compilation on the driver node first.

import pyximport
import os

pyximport.install()
import fib

定义 wapper 函数以编译和导入模块Define the wapper function to compile and import the module

将在执行器节点上执行打印语句。The print statements will get executed on the executor nodes. 可以查看 stdout 日志消息来跟踪模块的进度。You can view the stdout log messages to track the progress of your module.

import sys, os, shutil, cython

def spark_cython(module, method):
  def wrapped(*args, **kwargs):
    print 'Entered function with: %s' % args
    global cython_function_
    try:
      return cython_function_(*args, **kwargs)
    except:
      import pyximport
      pyximport.install()
      print 'Cython compilation complete'
      cython_function_ = getattr(__import__(module), method)
    print 'Defined function: %s' % cython_function_
    return cython_function_(*args, **kwargs)
  return wrapped

运行 Cython 示例Run the Cython example

下面的代码段在几个数据点上运行斐波那契示例。The below snippet runs the fibonacci example on a few data points.

# use the CSV reader to generate a Spark DataFrame. Roll back to RDDs from DataFrames and grab the single element from the GenericRowObject
lines = spark.read.csv("/example/cython_input/").rdd.map(lambda y: y.__getitem__(0))

mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency

性能比较Performance comparison

下面我们将测试这两个实现之间的速度差异。Below we’ll test out the speed difference between the 2 implementations. 我们将使用 spark.range() api 生成 10,000 到 100,000,000 的数据点,以及 50 个 Spark 分区。We will use the spark.range() api to generate data points from 10,000 to 100,000,000 with 50 Spark partitions. 我们会将此输出作为 CSV 写入 DBFS。We will write this output to DBFS as a CSV.

对于此测试,禁用自动缩放,以确保群集具有固定数量的 Spark 执行器。For this test, disable autoscaling in order to make sure the cluster has the fixed number of Spark executors.

dbutils.fs.rm("/tmp/cython_input/", True)
spark.range(10000, 100000000, 1, 50).write.csv("/tmp/cython_input/")

正常 PySpark 代码Normal PySpark code

def fib_mapper_python(n):
  a = 0
  b = 1
  print "Trying: %s" % n
  while b < int(n):
    a, b = b, a+b
  return (b, 1)

print fib_mapper_python(2000)

lines = spark.read.csv("/tmp/cython_input/").rdd.map(lambda y: y.__getitem__(0))
fib_frequency = lines.map(lambda x: fib_mapper_python(x)).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency

测试 Cython 代码Test Cython code

现在,测试已编译的 Cython 代码。Now test the compiled Cython code.

lines = spark.read.csv("/tmp/cython_input/").rdd.map(lambda y: y.__getitem__(0))
mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency

我们生成的测试数据集有 50 个 Spark 分区,会创建 50 个 csv 文件,如下所示。The test dataset we generated has 50 Spark partitions, which creates 50 csv files seen below. 可以使用 dbutils.fs.ls("/tmp/cython_input/") 查看数据集。You can view the dataset with dbutils.fs.ls("/tmp/cython_input/").