Home » Uncategorized

Building machine learning models in Apache Spark using SCALA in 6 steps

Introduction:

When dealing with building machine learning models, Data scientists spend most of the time on 2 main tasks when building machine learning models

Pre-processing and Cleaning

The major portion of time goes in to collecting, understanding, and analysing, cleaning the data and then building features. All the above steps mentioned are very important and critical to build successful machine learning model.

Iterations

The optimization algorithms and finalizing the model accesses the data over multiple iterations

Now, when it comes to big data, we need a tool which can efficiently pre-process and iterate on large data sets and this is where spark is useful. As compared with only MapReduce, Spark uses MapReduce where the intermediate results can be passed directly to the next stage in the pipeline. Spark also has provision to cache the data for in memory operations. This makes spark well suited for machine learning algorithms.

The next part of the article uses the data set from UCI data repository to build the machine learning model in Scala. I have assumed that the user has access to cluster where Scala and spark is already installed. Spark can also work on local system but I highly recommend applying and trying out these codes on a cluster to realize the true power Spark and Scala. I have used shell command everywhere.

Content

  • Why Scala for Spark?
  • About Data
  • Download and Store
  • Read, understand and preprocess the data
  • First cut machine learning model
  • Better Machine Learning Models

1-    Why Scala for Spark

Most of us including me are comfortable and prefer using R and Python for solving machine learning problems. Spark has libraries in R and Python namely sparkr and pyspark. But spark is written in scala and it does make sense to build machine learning model in the same language in which the spark is written.

2-    About Data

I am using the data from UCI Repository and can be found here. The data contains 10.5 million rows including train and test data. Though this data is not so “big” enough as compared to real life big data sets, it will give fair idea of computations and coding in to scala. The data set consists of ~7 million points as training data and remaining as test data. The label is defined as 1 and 0 and it is binary classification problem. The data has 28 features of which 27 features are already normalized.

3-    Download and store data in HDFS

The first step is to create a folder to save the data. Once the folder is ready, we download the train and test data in gzip format. The extracted csv files are then put in to hdfs from where the scala can access those files.

// create directory

mkdir hepmass

cd hepmass

// use curl to access the data and download the data to the directory

curl -L -o all_train.csv.gz http://archive.ics.uci.edu/ml/machine-learning-databases/00347/all_…

curl -L -o all_test.csv.gz http://archive.ics.uci.edu/ml/machine-learning-databases/00347/all_…

// use gzip to unzip and extract the files in to specified directory

 

gzip -d all_train.csv.gz

gzip -d all_test.csv.gz

// Check if the files are downloaded and extracted

ls

// Create the directory in to hdfs for storing the files

hadoop fs -mkdir hepmassTrain

hadoop fs -mkdir hepmassTest

 

// put the downloaded csv files in to hdfs

hadoop fs -put all_train.csv hepmassTrain

hadoop fs -put all_test.csv hepmassTest

// In case you need to remove files from hdfs, following code can be used.

// hadoop fs -rm -R hepmassTest   // To remove the data  from hdfs

4-    Read, Understand and preprocess the Data

Now here comes the stage where we actually start spark instance.

// Start Spark Instance

spark-shell –master yarn –deploy-mode client

Earlier versions of spark extensively used RDD for data operations. The recent version uses the data frame approach for the data. We will use data frame in this code. The following piece of code will read the data as spark dataframe. After reading we will look in to the schema of the dataframe. The data frame will identify the type of columns.

// Read Train Data

val trainDataRead = spark.read.option(“inferSchema”, true).option(“header”, true).csv(“hepmassTrain”)

val testDataRead = spark.read.option(“inferSchema”, true).option(“header”, true).csv(“hepmassTest”)

// Print the Schema and check the header to ensure data is loaded correctly

trainDataRead.printSchema

trainDataRead.head

testDataRead.printSchema

testDataRead.head

The schema will show us the column name and type of the column. If you simply want to extract the column names, you can use following code.

// Check Column Names

trainDataRead.columns

testDataRead.columns

The column name of label is “ # label”. First we will change the column name to “ label”. At the sametime we will also count the number of records.

// Rename the columns

val trainData1 = trainDataRead.withColumnRenamed(“# label”,”label”)

val testData1 = testDataRead.withColumnRenamed(“# label”,”label”)

// Counting records

trainData1.count()

testData1.count()

Sometimes the type of columns could be different. E.g the column having date could be string or we might have to convert a string to double. Following function could come handy in those case. As far as the hepmass data is concerned we are already dealing with pretty clean data which surely will not be the case in real life datasets.

// Function to convert the type of column

def convertColumnType(df: org.apache.spark.sql.DataFrame, name:String, newType:String) = {

val df1 = df.withColumnRenamed(name, “swap”)

df1.withColumn(name, df1.col(“swap”).cast(newType)).drop(“swap”)

}

In the next piece of code, we will check the distribution of label. The distributions looks well balanced. The interesting point to note here is if we do not append the command ‘show’ the code runs instantly. This is because the spark is considered to be lazy language. It starts the actual computations when you ask to show the results.

// Check the number of Ones and zeros.

trainData1.groupBy(“label”).count().orderBy($”count”.desc).show()

testData1.groupBy(“label”).count().orderBy($”count”.desc).show()

The summary f all the variables can be obtained be just one single command. The summary gives the count, mean, standard deviation, min and max. This is very useful because we might not be able to plot many graphs to understand big data. The summary comes handy to check the outliers in the system. In our case we have the normalized data. So the mean will be closer to 0 and standard deviation will be closer to 1.

// Check Summary of Other columns

val summary = trainData1.describe()

// Show the selected columns

summary.select(“summary”, “f0”, “f1″,”f2″,”f3″,”mass”).show()

But wait !!! the column mass seems to be the categorical value. We will have to treat mass separately. Lets check the values of mass separately.

// check the occurrences of various values in mass using groupby

trainData1.groupBy(“mass”).count().orderBy($”count”.desc).show()

We will first convert the value 499.999 to 500 and later use on hot encoder to create a single column features. We will apply this operation on train as well as test. We are creating new column as massIndex.

// Round the values of column mass

import org.apache.spark.sql.functions.round

val trainData2 = trainData1.withColumn(“mass”, ceil($”mass”))

val testData2 = testData1.withColumn(“mass”, ceil($”mass”))

// since the mass value has ordering, so rather than using one hot ending we

// will transform the categorical in to index

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val indexer = new StringIndexer().setInputCol(“mass”).setOutputCol(“massIndex”).fit(trainData2)

val trainData3 = indexer.transform(trainData2).drop(“mass”)

val indexer = new StringIndexer().setInputCol(“mass”).setOutputCol(“massIndex”).fit(testData2)

val testData3 = indexer.transform(testData2).drop(“mass”)

// Check the transformation again

trainData3.groupBy(“massIndex”).count().orderBy($”count”.desc).show()

5-    First Cut Machine Learning Model

The spark offers the processed data to cache so that it can be uses iteratively by models. We will cache our train and test and validation data. When to cache the data is an art. It is a trade-off between reducing the in memory and faster execution of algorithms. The rule of thumb is the cache should be used after data cleaning and pre-processing when the features and labels are ready. For first cut model the training data is divided in to train and validation. We have an option of k fold cross-validation but when dealing with big data,  it is computationally expensive. We will not use k fold cross validation in this example.

val testDataFinal = testData3

val trainDataFinal = trainData3

// Cache the data

trainDataFinal.cache()

testDataFinal.cache()

// Simple Method

val Array(trainData, validData) = trainData3.randomSplit(Array(0.7, 0.3))

trainData.cache()

validData.cache()

We will first use random forest model to build the model with default parameters and check the model performance. We will use Spark ML library to build the model. Spark offers the pipeline functionality and we will use that for building better models in next section.  The following code builds the model and evaluates the performance.

// import

import org.apache.spark.ml.feature.VectorAssembler

import org.apache.spark.ml.classification.RandomForestClassifier

import org.apache.spark.ml.Pipeline

import scala.util.Random

import org.apache.spark.ml.tuning.ParamGridBuilder

import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics

import org.apache.spark.mllib.evaluation.MulticlassMetrics

// Select the g=feature columns and group them in to features

val featureColsNames = trainData.columns.filter(_ != “label”)

val assembler = new VectorAssembler().setInputCols(featureColsNames).setOutputCol(“featureVector”)

val assembledTrainData = assembler.transform(trainData)

val assembledValidData = assembler.transform(validData)

// random forest classifier

// New Classifier

val classifier = new RandomForestClassifier().setSeed(Random.nextLong()).setLabelCol(“label”).

setFeaturesCol(“featureVector”).setPredictionCol(“prediction”)

// Build model

val model = classifier.fit(assembledTrainData)

// Predict

val predictions = model.transform(assembledValidData)

For evaluation we will use binary as well as multi class evaluations to cover implementation of both. We will now convert the predictions and labels in to RDD format.

/ Create RDD

val predictionAndLabels = predictions.select(“prediction”, “label”).as[(Double,Double)].rdd

val metricsBinary = new BinaryClassificationMetrics(predictionAndLabels)

// AUROC

val auROC = metricsBinary.areaUnderROC

println(“Area under ROC = ” + auROC)

// Show confusion matrix

val metricsMulti = new MulticlassMetrics(predictionAndLabels)

println(“Confusion matrix:”)

println(metricsMulti.confusionMatrix)

//  Show accuracy

val accuracy = metricsMulti.accuracy

println(“Summary Statistics”)

println(s”Accuracy = $accuracy”)

6-    Building better model

In this section we will build better models using parameter grid and pipeline. The spark gives us the facility to build a pipeline of operations and it is very useful. Though some of the steps are repeated from earlier sections, I have purposefully kept them to understand the entire chain of events. Here in this section we first create assembler, evaluator, parameter grid and validator. We will use the inbuilt method ‘TrainValidationSplit’ to divide data in to train and validation. All the computations will start when we call fit on train data. This step will take time to run because of heavy computations.

// Building Better Models

// Parameter Tuning

val featureColsNames = TrainData3.columns.filter(_ != “label”)

val assembler = new VectorAssembler().setInputCols(featureColsNames).setOutputCol(“featureVector”)

val multiclassEval = new MulticlassClassificationEvaluator().

setLabelCol(“label”).

setPredictionCol(“prediction”).

setMetricName(“accuracy”)

 

// Set Pipeline

val pipeline = new Pipeline().setStages(Array(assembler, classifier))

// Set Parameter Grid

val paramGrid = new ParamGridBuilder().

addGrid(classifier.impurity, Seq(“gini”, “entropy”)).

addGrid(classifier.maxDepth, Seq(5,10)).

addGrid(classifier.numTrees, Seq(10,100)).

build()

 

// Validate

import org.apache.spark.ml.tuning.TrainValidationSplit

val validator = new TrainValidationSplit().

setSeed(Random.nextLong()).

setEstimator(pipeline).

setEvaluator(multiclassEval).

setEstimatorParamMaps(paramGrid).

setTrainRatio(0.7)

// This process should take time

val validatorModel = validator.fit(trainDataFinal)

Once the model is ready, we can check the best model and choose that best model to get the final predictions

// Check accuracy of best model

import org.apache.spark.ml.PipelineModel

val bestModel = validatorModel.bestModel

bestModel.asInstanceOf[PipelineModel].stages.last.extractParamMap

// use the best model for test data

validatorModel.validationMetrics.max

multiclassEval.evaluate(bestModel.transform(testData))

____________________________________________________________________________

Please feel free to reach out to me.

Rohit Walimbe

My LinkedIn Profile