Monday, December 26, 2016

Spark and R K-Means classification

***
Spark samples are for big files which contains thousands of lines.
Also you do not know data and can not play with it.
I put here simplest data set for spark mllib so that one can play and understand what metrics
are effected from which parameters.
It is not for seniors but perfect for beginners of who need to calibrate parameters with simple sets.
***
Below code is from sample Spark documentation. I changed Rdd so that one can play and understand
how data is distributed.




Here you can play with values and observe distribution of clusters.
Always print cluster centers. It will give you a clue for large datasets.

You can easily play with dataset and number of demanded clusters to get an idea of how
K-means work.


import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

val parsedData = sc.parallelize(Seq(
  ( Vectors.dense(1.0, 1.0)),
  ( Vectors.dense(40.0, 40.0)),
  ( Vectors.dense(60.0, 60.0)),
  ( Vectors.dense(101.0, 101.1))
))

// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE )
val clusterCenters = clusters.clusterCenters.map(_.toArray)
println("The Cluster Centers are = " + clusterCenters)
parsedData.collect().map( s=> println( "cluster "+clusters.predict(s) +" "+s.toString() ) )

Result
Within Set Sum of Squared Errors = 3874.8066666666673
clusterCenters: Array[Array[Double]] = Array(Array(67.0, 67.03333333333333), Array(1.0, 1.0))
cluster 1 [1.0,1.0] cluster 0 [40.0,40.0] cluster 0 [60.0,60.0] cluster 0 [101.0,101.1]

Same Code In R

pointx = c(1,2, 50, 51) 
pointy = c(1,2,50,51) 
df = data.frame(pointx, pointy)
library(ggplot2)
ggplot(df, aes(pointx, pointy)) + geom_point()
myCluster <- kmeans(df, 3, nstart = 20)
myCluster$centers
myCluster$clus <- as.factor(myCluster$cluster)
ggplot(df, aes(pointx, pointy, color = myCluster$clus)) + geom_point()

Friday, December 23, 2016

Spark NaiveBayes and Result Interpretation

***
Spark samples are for big files which contains thousands of lines.
Also you do not know data and can not play with it.
I put here simplest data set for spark mllib so that one can play and understand what metrics
are effected from which parameters.
It is not for seniors but perfect for beginners of who need to calibrate parameters with simple sets.
***
In samples at internet people usually try to guess if a mail is spam or not.
Below code includes codes from spark samples and some other samples.
I tried to work with spark 2 but it was not success. It is working with 1.6.
Since it did not work i played a lot, took lots of fixes from net. So code is not neat.

Lets make it much more simpler. I will list some properties and try to guess if it is
Plane or Not.
My training is
"wing wheel engine" : 1 it is plane
"wheel airbag engine" : 0 it is not plane

Steps
1)Get training set
2)Tokenize it
3)Apply hashingtf

Result of hashingtf, it generates 2vectors of words.

0 wheel airbag engine ["wheel","airbag","engine"] {"type":0,"size":20,"indices":[3,14,18],"values":[1,1,1]}
1 wing wheel engine ["wing","wheel","engine"] {"type":0,"size":20,"indices":[3,7,14],"values":[1,1,1]}


4)Train model
Result of training
Array[org.apache.spark.mllib.regression.LabeledPoint] = Array(
(8.0,[0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0]), 
(9.0,[0.0,0.0,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])
)

5)Prepare test data
(0,"wing airbag")
(1,"wing airport")
(0,"wing airport") False negative(this will be guest as plane, but it is zeppelin!!)
hashingtf generates below vectors for test data

[8,wing airbag,WrappedArray(wing, airbag),(20,[7,18],[1.0,1.0])], 
[9,wing airport,WrappedArray(wing, airport),(20,[3,7],[1.0,1.0])])

7 was wing and 3 was wheel in model vectors.

6)Apply prediction testpredictionAndLabel
(0.0,0.0) I guessed as not plane, Not plane
(1.0,1.0) I guessed as plane , Plane
(1.0,0.0) I guessed as plane , Plane

7)Dump metrics, output is as below

Confusion matrix: 
1.0 1.0 
0.0 1.0 
Precision(0.0) = 1.0 
Precision(1.0) = 0.5 
Recall(0.0) = 0.5 
Recall(1.0) = 1.0 
FPR(0.0) = 0.0 
FPR(1.0) = 0.5 
F1-Score(0.0) = 0.6666666666666666 
F1-Score(1.0) = 0.6666666666666666 
Weighted precision: 0.8333333333333333 
Weighted recall: 0.6666666666666666 
Weighted F1 score: 0.6666666666666666 
Weighted false positive rate: 0.16666666666666666 
labels: Array[Double] = Array(0.0, 1.0)

Precision (0.0) :1.0 we guest 1 zero(false) , that was correct so ratio 1 / 1 = 1
Precision (1.0) :0.5 we guest 2 one(true) , 1 was correct 1 not so ratio 1 / 2 = 0.5

Recall(0.0) :0.5 we guest 1 zero(false) , there was infact 2 zeros 1 / 2 = 0.5
Recall(1.0) :1.0 we guest 1 one(true) , there was correct 1 not so ratio 1 / 1 = 1

F1-Score(0.0) = 0.6666666666666666

F1- Score = 2 x ( precision x recall ) / precison + recall.
= 2 x ( 0.5 x 1 ) / 0.5 + 1 = 2 x 0.5 / 1.5 = 0.6

From definitions :
Precision can be seen as a measure of exactness or quality, whereas recall is a measure of completeness or quantity.
High precision means that an algorithm returned substantially more relevant results than irrelevant ones, while high recall means that an algorithm returned most of the relevant results.

What does these mean.
Think in our sample we have a bigger set and we say
there are 30 planes but only 20 of them is really (among 30)
then precision is 20 / 30 = this is how well we performed on our results.
But there are items we missed.
Think in fact there were total 50 planes.
Then recall = 20 / 50 = 0.4
It is what percent of real result we returned.


High precision Low recall : we are very good at estimation but we do not cover the whole space. It means
we choose cut-off value so high.





import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.ml.feature.{HashingTF, IDF}
import org.apache.spark.mllib.classification.{NaiveBayes, NaiveBayesModel}
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.evaluation.MulticlassMetrics



val trainData = sqlContext.createDataFrame(Seq((0,"wheel airbag engine"),(1,"wing wheel engine"))).toDF("category","text")
    val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words")
    val wordsData = tokenizer.transform(trainData)
    val hashTF = new HashingTF().setInputCol("words").setOutputCol("features").setNumFeatures(20)
    val featureData = hashTF.transform(wordsData) 
val subFeature = featureData.select("category","features");
val df_1 = subFeature.withColumnRenamed("category","category2")
val trainDataRdd2 = df_1.withColumn("category",df_1.col("category2").cast("double")).drop("category2")


trainDataRdd2.printSchema()
val testScoreAndLabel = trainDataRdd2.select("category","features").map{ case Row(l:Double,p:Vector) => LabeledPoint(l,p) }

    val model = NaiveBayes.train(testScoreAndLabel, lambda = 1.0, modelType = "multinomial")
   //same for the test data
    val testData = sqlContext.createDataFrame(Seq((0,"wing airbag"),(1,"wing airport"),(0,"wing airport"))).toDF("category","text")
    val testWordData = tokenizer.transform(testData)
    val testFeatureData = hashTF.transform(testWordData)
    val testDataRdd = testFeatureData.select("category","features").map {
    case Row(label: Int, features: Vector) =>
    LabeledPoint(label.toDouble, Vectors.dense(features.toArray))
    }
    val testpredictionAndLabel = testDataRdd.map(p => (model.predict(p.features), p.label))


val metrics = new MulticlassMetrics(testpredictionAndLabel)
/* output F1-measure for all labels (0 and 1, negative and positive) */
metrics.labels.foreach( l => println(metrics.fMeasure(l)))
testpredictionAndLabel.take(5)
// Confusion matrix
println("Confusion matrix:")
println(metrics.confusionMatrix)


// Precision by label
val labels = metrics.labels
labels.foreach { l =>
  println(s"Precision($l) = " + metrics.precision(l))
}

// Recall by label
labels.foreach { l =>
  println(s"Recall($l) = " + metrics.recall(l))
}

// False positive rate by label
labels.foreach { l =>
  println(s"FPR($l) = " + metrics.falsePositiveRate(l))
}

// F-measure by label
labels.foreach { l =>
  println(s"F1-Score($l) = " + metrics.fMeasure(l))
}

// Weighted stats
println(s"Weighted precision: ${metrics.weightedPrecision}")
println(s"Weighted recall: ${metrics.weightedRecall}")
println(s"Weighted F1 score: ${metrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")
  

Thursday, December 22, 2016

Spark BinaryClassificationMetrics

After finishing a LogisticRegression we can check if result is good with BinaryClassificationMetrics.
It simply takes 2 parameters.
One is score associated with your predicition.(rawPrediction column after a Logistic Regression) for example.
And other is what you guesses.
For ROC you must have a big area near 1.

What does this mean.
Suppose you are measuring if you use heater according to weather.
(of course this is obvious, we are now doing obvious case)

Say at 10 F : do not use
20 F : do not use
...
50 F : use
..
100 F : use

You see for low scores ,u do not use, but for high ones you use.
True and false it perfectly separated so I expect a perfect ROC.

ROC is a graph showing what we gain as data for calculations we did with Logistic Regression.
for example we can have 4 data for one point. If you check below it means we only use heater once on this period.
So value 10 give 3 0 and 1 1 value. This makes learning of value 10 less efficient.
Intervals must give as much as information as possible.
Purified intervals will only output 1 value for so that information gain. is so high.

( 10.0, 0.0),
( 10.0, 0.0),
( 10.0, 0.0),
( 10.0, 1.0),


val metricData= sc.parallelize(
  
   Seq( 
     ( 10.0,  0.0),
     ( 20.0,  0.0),
     ( 30.0,  0.0),
     ( 40.0,  0.0),
     ( 50.0,  0.0),
     ( 60.0,  1.0),
     ( 70.0,  1.0),
     ( 80.0,  1.0),
     ( 90.0,  1.0),     
     ( 100.0,  1.0)
    
     )
);

val metrics = new BinaryClassificationMetrics(metricData) 
println("area under the precision-recall curve: " + metrics.areaUnderPR)
println("area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)
metrics.roc().collect()



Above case was so good so metrics are below.

area under the precision-recall curve: 1.0 
area under the receiver operating characteristic (ROC) curve : 0.9999999999999999 
Array[(Double, Double)] = Array((0.0,0.0), (0.0,0.2), (0.0,0.4), (0.0,0.6), (0.0,0.8), (0.0,1.0), (0.2,1.0), (0.4,1.0), (0.6,1.0), (0.8,1.0), (1.0,1.0), (1.0,1.0))







Lets preapre a bad data where distribution is useless.
Think you are measuring your ice-tea consumption according to weather.
As in above you do need have a pattern. You do not drink at 10F but you drink 20 ...
So this is near random distribution. And random distribution gives 0.5 area under curve.
It is 45 degree line. A line like that means on every probability(score) of event
I have equal info from True or False case.


val metricData= sc.parallelize(
  
   Seq( 
     ( 10.0,  0.0),
     ( 20.0,  1.0),
     ( 30.0,  0.0),
     ( 40.0,  1.0),
     ( 50.0,  0.0),
     ( 60.0,  1.0),
     ( 70.0,  0.0),
     ( 80.0,  1.0),
     ( 90.0,  0.0),     
     ( 100.0,  1.0)
    
     )
);

val metrics = new BinaryClassificationMetrics(metricData) 
println("area under the precision-recall curve: " + metrics.areaUnderPR)
println("area under the receiver operating characteristic (ROC) curve : " + metrics.areaUnderROC)
metrics.roc().collect()







Above case was so bad so metrics are below.

area under the precision-recall curve: 0.6393650793650794 
area under the receiver operating characteristic (ROC) curve : 0.6000000000000001 metrics: 
Array[(Double, Double)] = Array((0.0,0.0), (0.0,0.2), (0.2,0.2), (0.2,0.4), (0.4,0.4), (0.4,0.6), (0.6,0.6), (0.6,0.8), (0.8,0.8), (0.8,1.0), (1.0,1.0), (1.0,1.0))












ChiSqSelector for Top Feature Selection

When i get data first I try to understand most important column and relation between columns.
Relation between columns is coefficient matrix which i will give a sample in another post.

Think you are guessing if someone is women or not based on
foot,chest,height,hair measurements.
You do not know which parameter effects much.
You run a ChiSqSelector test to understand most important n parameters.
setNumTopFeatures controls the n number.

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  //            foot,chest,height,hair  women
  (7, Vectors.dense(36, 90.0, 165.0, 20.0), 1.0),
  (8, Vectors.dense(38, 95.0, 170.0, 25.0), 1.0),
  (8, Vectors.dense(41, 60.0, 178.0, 10.0), 0.0),
  (9, Vectors.dense(42.0, 60.0, 165.0, 5.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(2)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)
result.show()

Result is as below.
You see selected features column.
Since setNumTopFeatures is 2 , there are 2 columns in result.
According to ChiSqSelector , most important column is foot and then chest.

| id| features|clicked|selectedFeatures|
| 7|[36.0,90.0,165.0,...| 1.0| [36.0,90.0]|
| 8|[38.0,95.0,170.0,...| 1.0| [38.0,95.0]|
| 8|[41.0,60.0,178.0,...| 0.0| [41.0,60.0]|
| 9|[42.0,60.0,165.0,...| 0.0| [42.0,60.0]|

Sunday, December 18, 2016

Sample Size and Standart error




You record your sleeping for 1 year lets say.
And you find average to be 10 hours.(This is average of all individual days)

And you find population standard deviation as 2.

Leftmost part in picture.

You randomly take 10 days within a year and take average.
Repeat this process for lots of time to take average of different "10 samples" .
*** Be careful you now took average of only a sample subset.

Middle part in picture.

Standard error is = 2 / √ 10 
Standard error = 0.63
This value means with 10 samples we are 0.63 far from real mean which is 10.

You can see that sample graph


If we get 100 samples it is more centered around real mean.
Standard error is = 2 / √ 100 
Standard error = 0.2


More samples the more you are near to Population mean.

Standard error = Standard Deviation / SQRT( Sample Size)

So if Standard deviation is high, (your data varies a lot ) , you will have big error for samples.

Saturday, December 17, 2016

Spark Apply Descriptive Statistics on DataFrame

When you first get your data you have to play with it.
You want to learn what kind of data you have.
Below is a simple code piece to begin investigating general properties of your data.

Suppose you have a data like 48,49,50,51,52. This is well distributed homogenous data.

import org.apache.commons.math3.stat.descriptive._

val df = Seq(48,49.0, 50.0, 51.0,52.0).toDF("nums")

val mean = df.select("nums").rdd.map(row => row(0).asInstanceOf[Double]).collect()

val arrMean = new DescriptiveStatistics()
genericArrayOps(mean).foreach(v => arrMean.addValue(v))

val meanQ1 = arrMean.getPercentile(25)
val meanQ3 = arrMean.getPercentile(75)
val meanIQR = meanQ3 - meanQ1





Perfect distribution
47,48,49,50,51
n: 5 
min: 48.0 max: 52.0 mean: 50.0 
std dev: 1.5811388300841898 
median: 50.0 
skewness: 0.0 
kurtosis: -1.200000000000002 
meanQ1: Double = 48.5 
meanQ3: Double = 51.5 
meanIQR: Double = 3.0



Lets form a line shaped distribution
val df = Seq(50,50, 50, 50,50.0).toDF("nums")

n: 5 
min: 50.0 
max: 50.0 
mean: 50.0 
std dev: 0.0 
median: 50.0 
skewness: NaN 
kurtosis: NaN 
meanQ1: Double = 50.0 
meanQ3: Double = 50.0 
meanIQR: Double = 0.0

Lets add 40 to make left skew(negative skew.
** Skewness is asymmetry of distribution about mean.



Left tail (skew ) distribution

val df = Seq(40,48,49, 50, 51,52.0).toDF("nums")
n: 6 
min: 40.0 
max: 52.0 
mean: 48.333333333333336 
std dev: 4.320493798938574 
median: 49.5 
skewness: -1.8805720776629977 
kurtosis: 3.9187500000000064 
meanQ1: Double = 46.0 
meanQ3: Double = 51.25 
meanIQR: Double = 5.25


Rigth tail (skew ) distribution
If we just add 60 to original series we get a right tail distribution.
Skewness is same with different sign.
val df = Seq(48,49, 50, 51,52.0,60).toDF("nums")

n: 6 
min: 48.0 
max: 60.0 
mean: 51.666666666666664 
std dev: 4.320493798938574 
median: 50.5 
skewness: 1.8805720776629975 
kurtosis: 3.9187500000000064

meanQ1: Double = 48.75 
meanQ3: Double = 54.0 
meanIQR: Double = 5.25

meanIQR is a data without boundaries. So it gives lots of idea if you know your domain.
For example you have a car price data. You know that car must be around 50.000$.
When you check meanIQR you will see datas near to your expectation. Others will
have have meaningless high( irreal expectation of seller) or low( this time meaningful because
car could be damaged.) meanIQR is a nice measure.

Skewness can give a rough idea about tendency of data. (Data having a tail to left if minus.)

kurtosis is a measure of shape. The sharper the top the higher the kurtosis. Check picture from internet please.