Sunday, March 5, 2017

Bloom filter , Probabilistic Cache and much more

Probabilistic algorithms first make an impact as incorrect,but they are unbelievably efficient.
Also of course if probabilistic error is not on critical path you do not have to worry.
Once we had a web service here customer was asking if they send us some users.
Think they are sending 10000-20000 users on every batch,and doing so frequent.

If I hit database and check if user exists, i will query for thousand times.
If I make a cache on memory, i need to make a hashmap with item size in database.
Say I have 50K user. Cache will be 50K x user name lets say.

In Bloom Filter instead of caching every time,we compute one projected value for a hash function
and put it in a bit array we occupy much much less space and get incredible performance.

And if we use a few hash functions at one time,we get more accurate results.

This algorithm always tell exact if item exists in cache.
If it says,not in cache, there is a probability it is saying wrong.

For my scenario it is not causing any problem but maybe one unnecessary database hit.

For example I am showing my cache for Country names.
It has 227 items.

Check below address for best parameters.
http://hur.st/bloomfilter?n=227&p=0.01

For our sample parameters are below.

n = 227, p = 0.01 (1 in 100) → m = 2,176 (272B), k = 7

With only 272 Byte and 7 hash functions we can get %1 error rate.

import java.util.BitSet;
import java.util.Random;

import com.sangupta.murmur.Murmur1;
import com.sangupta.murmur.Murmur2;
import com.sangupta.murmur.Murmur3;

public class BloomFilterCache {

 public static String[] COUNTRIES = new String[] { "Afghanistan ", "Albania ", "Algeria ", "American Samoa ",
   "Andorra ", "Angola ", "Anguilla ", "Antigua & Barbuda ", "Argentina ", "Armenia ", "Aruba ", "Australia ",
   "Austria ", "Azerbaijan ", "Bahamas, The ", "Bahrain ", "Bangladesh ", "Barbados ", "Belarus ", "Belgium ",
   "Belize ", "Benin ", "Bermuda ", "Bhutan ", "Bolivia ", "Bosnia & Herzegovina ", "Botswana ", "Brazil ",
   "British Virgin Is. ", "Brunei ", "Bulgaria ", "Burkina Faso ", "Burma ", "Burundi ", "Cambodia ",
   "Cameroon ", "Canada ", "Cape Verde ", "Cayman Islands ", "Central African Rep. ", "Chad ", "Chile ",
   "China ", "Colombia ", "Comoros ", "Congo, Dem. Rep. ", "Congo, Repub. of the ", "Cook Islands ",
   "Costa Rica ", "Cote d'Ivoire ", "Croatia ", "Cuba ", "Cyprus ", "Czech Republic ", "Denmark ", "Djibouti ",
   "Dominica ", "Dominican Republic ", "East Timor ", "Ecuador ", "Egypt ", "El Salvador ",
   "Equatorial Guinea ", "Eritrea ", "Estonia ", "Ethiopia ", "Faroe Islands ", "Fiji ", "Finland ", "France ",
   "French Guiana ", "French Polynesia ", "Gabon ", "Gambia, The ", "Gaza Strip ", "Georgia ", "Germany ",
   "Ghana ", "Gibraltar ", "Greece ", "Greenland ", "Grenada ", "Guadeloupe ", "Guam ", "Guatemala ",
   "Guernsey ", "Guinea ", "Guinea-Bissau ", "Guyana ", "Haiti ", "Honduras ", "Hong Kong ", "Hungary ",
   "Iceland ", "India ", "Indonesia ", "Iran ", "Iraq ", "Ireland ", "Isle of Man ", "Israel ", "Italy ",
   "Jamaica ", "Japan ", "Jersey ", "Jordan ", "Kazakhstan ", "Kenya ", "Kiribati ", "Korea, North ",
   "Korea, South ", "Kuwait ", "Kyrgyzstan ", "Laos ", "Latvia ", "Lebanon ", "Lesotho ", "Liberia ", "Libya ",
   "Liechtenstein ", "Lithuania ", "Luxembourg ", "Macau ", "Macedonia ", "Madagascar ", "Malawi ",
   "Malaysia ", "Maldives ", "Mali ", "Malta ", "Marshall Islands ", "Martinique ", "Mauritania ",
   "Mauritius ", "Mayotte ", "Mexico ", "Micronesia, Fed. St. ", "Moldova ", "Monaco ", "Mongolia ",
   "Montserrat ", "Morocco ", "Mozambique ", "Namibia ", "Nauru ", "Nepal ", "Netherlands ",
   "Netherlands Antilles ", "New Caledonia ", "New Zealand ", "Nicaragua ", "Niger ", "Nigeria ",
   "N. Mariana Islands ", "Norway ", "Oman ", "Pakistan ", "Palau ", "Panama ", "Papua New Guinea ",
   "Paraguay ", "Peru ", "Philippines ", "Poland ", "Portugal ", "Puerto Rico ", "Qatar ", "Reunion ",
   "Romania ", "Russia ", "Rwanda ", "Saint Helena ", "Saint Kitts & Nevis ", "Saint Lucia ",
   "St Pierre & Miquelon ", "Saint Vincent and the Grenadines ", "Samoa ", "San Marino ",
   "Sao Tome & Principe ", "Saudi Arabia ", "Senegal ", "Serbia ", "Seychelles ", "Sierra Leone ",
   "Singapore ", "Slovakia ", "Slovenia ", "Solomon Islands ", "Somalia ", "South Africa ", "Spain ",
   "Sri Lanka ", "Sudan ", "Suriname ", "Swaziland ", "Sweden ", "Switzerland ", "Syria ", "Taiwan ",
   "Tajikistan ", "Tanzania ", "Thailand ", "Togo ", "Tonga ", "Trinidad & Tobago ", "Tunisia ", "Turkey ",
   "Turkmenistan ", "Turks & Caicos Is ", "Tuvalu ", "Uganda ", "Ukraine ", "United Arab Emirates ",
   "United Kingdom ", "United States ", "Uruguay ", "Uzbekistan ", "Vanuatu ", "Venezuela ", "Vietnam ",
   "Virgin Islands ", "Wallis and Futuna ", "West Bank ", "Western Sahara ", "Yemen ", "Zambia ",
   "Zimbabwe " };

 private BitSet bitset;

 int[] hashSeeds;

 int noHashFunctions;

 public boolean useSeed = true;

 public int murmurVersion = 1;

 public int getSeed(int i) {
  if (useSeed)
   return hashSeeds[i];
  else
   return i;

 }

 public BloomFilterCache(int slots, int hashFunctions) {
  bitset = new BitSet(slots);
  noHashFunctions = hashFunctions;
  Random r = new Random(System.currentTimeMillis());
  hashSeeds = new int[hashFunctions];
  for (int i = 0; i < hashFunctions; ++i) {
   hashSeeds[i] = r.nextInt();
  }

 }

 public int getHash(String value, int i) {

  if (murmurVersion == 1)
   return (int) Murmur1.hash(value.getBytes(), 4, getSeed(i));
  else if (murmurVersion == 2)
   return (int) Murmur2.hash(value.getBytes(), 4, getSeed(i));
  else
   return (int) Murmur3.hash_x86_32(value.getBytes(), 4, getSeed(i));
 }

 public void add(String value) {

  for (int i = 0; i < noHashFunctions; ++i) {
   int h = getHash(value, i);

   bitset.set(Math.abs(h) % bitset.size(), true);
  }
 }

 public boolean contains(String value) {

  for (int i = 0; i < noHashFunctions; ++i) {
   int h = getHash(value, i);

   if (!bitset.get(Math.abs(h) % bitset.size())) {
    return false;

   }
  }

  return true;
 }

 public static String generateRandomChars(String candidateChars, int length) {
  StringBuilder sb = new StringBuilder();
  Random random = new Random();
  for (int i = 0; i < length; i++) {
   sb.append(candidateChars.charAt(random.nextInt(candidateChars.length())));
  }

  return sb.toString();
 }

 public int getErrorCount(String[] testSet, String postFix, String prefix) {
  int error = 0;

  for (String s : testSet) {

   if (contains(generateRandomChars("ABCDEFGHIJKLMNOPQRSTUVWXYZ".toLowerCase(), 3) + s + "")) {
    error++;
   }
  }
  return error;
 }

 public int getErrorCountRandom(int testSize) {
  int error = 0;

  for (int i = 0; i < testSize; i++) {

   String test = generateRandomChars("ABCDEFGHIJKLMNOPQRSTUVWXYZ".toLowerCase(), 10);
   if (contains(test) ) {
    error++;
    //System.err.println(test);
   }

  }
  return error;
 }

 public static void testFor(int slots, int hashFunctions, boolean useSeed, int murmurVersion) {
  BloomFilterCache bf = new BloomFilterCache(slots, hashFunctions);
  bf.useSeed = useSeed;
  bf.murmurVersion = murmurVersion;

  for (String s : COUNTRIES) {
   bf.add(s);
  }

  System.err.println("*****Params : slots = " + slots + " no#hash = " + hashFunctions + " cardinality = "
    + bf.bitset.cardinality() + " useSeed = " + useSeed + "  murmurVersion = " + murmurVersion);
  System.err.println("Query for Japan: " + bf.contains("Japan"));
  System.err.println("Query for Dummy: " + bf.contains("Dummy"));
  System.err.println("Error Count: " + bf.getErrorCount(COUNTRIES, "", ""));
  System.err.println("Error Count Prefix: " + bf.getErrorCount(COUNTRIES, "abc", ""));
  System.err.println("Error Count Postfix:  " + bf.getErrorCount(COUNTRIES, "", "abc"));
  System.err.println("Error Count Post-Pre: " + bf.getErrorCount(COUNTRIES, "abc", "def"));
  System.err.println("Error Count Random: " + bf.getErrorCountRandom(COUNTRIES.length));

 }

 public static void main(String[] args) {


  int[] versions = new int[] { 1, 2, 3 };
  boolean[] useSeeds = new boolean[] { true, false };
  for (int v : versions) {
   System.err.println(" ---------------------- ");
   for (boolean useSeed : useSeeds) {
    testFor(10000, 7, useSeed, v);
    testFor(2224, 2, useSeed, v);
    testFor(2224, 7, useSeed, v);
    testFor(2224, 17, useSeed, v);
    testFor(582, 3, useSeed, v);
   }
  }

 }
}

Below are multiple tests with multiple parameters . You can check effects by changing parameters.
97
97
3104
96321
 ---------------------- 
*****Params : slots = 10000 no#hash = 7 cardinality = 1336 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 369 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 10
Error Count Prefix: 6
Error Count Postfix:  4
Error Count Post-Pre: 4
Error Count Random: 2
*****Params : slots = 2224 no#hash = 7 cardinality = 1067 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 3
Error Count Prefix: 3
Error Count Postfix:  1
Error Count Post-Pre: 2
Error Count Random: 1
*****Params : slots = 2224 no#hash = 17 cardinality = 1755 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 3
Error Count Prefix: 4
Error Count Postfix:  5
Error Count Post-Pre: 3
Error Count Random: 2
*****Params : slots = 582 no#hash = 3 cardinality = 402 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 61
Error Count Prefix: 57
Error Count Postfix:  53
Error Count Post-Pre: 58
Error Count Random: 54
*****Params : slots = 10000 no#hash = 7 cardinality = 1308 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 371 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 10
Error Count Prefix: 9
Error Count Postfix:  7
Error Count Post-Pre: 7
Error Count Random: 5
*****Params : slots = 2224 no#hash = 7 cardinality = 1034 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 2
Error Count Postfix:  1
Error Count Post-Pre: 2
Error Count Random: 0
*****Params : slots = 2224 no#hash = 17 cardinality = 1721 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 2
Error Count Prefix: 3
Error Count Postfix:  4
Error Count Post-Pre: 3
Error Count Random: 1
*****Params : slots = 582 no#hash = 3 cardinality = 378 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 49
Error Count Prefix: 42
Error Count Postfix:  47
Error Count Post-Pre: 33
Error Count Random: 45
 ---------------------- 
*****Params : slots = 10000 no#hash = 7 cardinality = 1339 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 382 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 9
Error Count Prefix: 6
Error Count Postfix:  6
Error Count Post-Pre: 3
Error Count Random: 4
*****Params : slots = 2224 no#hash = 7 cardinality = 1069 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 1
Error Count Prefix: 3
Error Count Postfix:  1
Error Count Post-Pre: 1
Error Count Random: 1
*****Params : slots = 2224 no#hash = 17 cardinality = 1780 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 3
Error Count Prefix: 3
Error Count Postfix:  4
Error Count Post-Pre: 3
Error Count Random: 8
*****Params : slots = 582 no#hash = 3 cardinality = 386 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 50
Error Count Prefix: 43
Error Count Postfix:  48
Error Count Post-Pre: 40
Error Count Random: 53
*****Params : slots = 10000 no#hash = 7 cardinality = 1348 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 383 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 11
Error Count Prefix: 5
Error Count Postfix:  6
Error Count Post-Pre: 8
Error Count Random: 5
*****Params : slots = 2224 no#hash = 7 cardinality = 1063 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 2
Error Count Postfix:  1
Error Count Post-Pre: 1
Error Count Random: 2
*****Params : slots = 2224 no#hash = 17 cardinality = 1749 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 1
Error Count Prefix: 3
Error Count Postfix:  4
Error Count Post-Pre: 1
Error Count Random: 3
*****Params : slots = 582 no#hash = 3 cardinality = 408 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 60
Error Count Prefix: 61
Error Count Postfix:  49
Error Count Post-Pre: 47
Error Count Random: 60
 ---------------------- 
*****Params : slots = 10000 no#hash = 7 cardinality = 1322 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 359 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 18
Error Count Prefix: 22
Error Count Postfix:  12
Error Count Post-Pre: 13
Error Count Random: 14
*****Params : slots = 2224 no#hash = 7 cardinality = 1027 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  3
Error Count Post-Pre: 8
Error Count Random: 6
*****Params : slots = 2224 no#hash = 17 cardinality = 1727 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 8
Error Count Prefix: 6
Error Count Postfix:  8
Error Count Post-Pre: 3
Error Count Random: 6
*****Params : slots = 582 no#hash = 3 cardinality = 389 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 52
Error Count Prefix: 63
Error Count Postfix:  49
Error Count Post-Pre: 49
Error Count Random: 56
*****Params : slots = 10000 no#hash = 7 cardinality = 1332 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 383 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 12
Error Count Prefix: 4
Error Count Postfix:  10
Error Count Post-Pre: 5
Error Count Random: 5
*****Params : slots = 2224 no#hash = 7 cardinality = 1060 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 2
Error Count Prefix: 0
Error Count Postfix:  3
Error Count Post-Pre: 0
Error Count Random: 1
*****Params : slots = 2224 no#hash = 17 cardinality = 1782 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 4
Error Count Prefix: 6
Error Count Postfix:  7
Error Count Post-Pre: 6
Error Count Random: 5
*****Params : slots = 582 no#hash = 3 cardinality = 392 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 59
Error Count Prefix: 59
Error Count Postfix:  57
Error Count Post-Pre: 60
Error Count Random: 57

Sunday, February 19, 2017

Support Vector Machine Estimation for Japanese Flag(A circle inside a rectangle)

Support Vector Machine (SVM) can sometimes surprise with its capability. I was thinking to test it a nonlinear example and suddenly taught,why do not i model Japanese flag. I can calculate points of rectangle and then points of circle inside. Then I taught why do i calculate, i better to draw picture and extract rgb values from it. And I can even extract my data from picture. For japanese flag I used 3 colors. White and Red : as in Japanese Flag Green : part that i cut from flag. I want SVM to predict this.



I extract values and dump into 2 separate csv files.

 
public static ArrayList<ArrayList<Object>> getImageData() {

		BufferedImage img;
		ArrayList<ArrayList<Object>> result = new ArrayList<>();
		try {
			img = ImageIO.read(new File(IMG));

			int widthSample = 1;
			int heightSample = 2;

			for (int j = 0; j < img.getHeight(); j++) {				
				if (j % heightSample != 0) {
					continue;
				}
				for (int i = 0; i < img.getWidth(); i++) {

					if (i % widthSample != 0) {
						continue;
					}

					String color = getPixelData(img, i, j);

					ArrayList<Object> res = new ArrayList<>();
					res.add(i);
					res.add(j);
					res.add(color);

					result.add(res);

				}
			}

		} catch (IOException e) {
			e.printStackTrace();
		}

		return result;
	}

	private static String getPixelData(BufferedImage img, int x, int y) {
		int argb = img.getRGB(x, y);

		int rgb[] = new int[] { (argb >> 16) & 0xff, // red
				(argb >> 8) & 0xff, // green
				(argb) & 0xff // blue
		};

		String color = colorUtils.getColorNameFromRgb(rgb[0], rgb[1], rgb[2]);

		return color;
	}



With this logic, you can generate some shapes in a plane and make some part of image as green. Green part will be your test data.(Place you want svm to predict) For example you can paint 1/4 of a circle or rectangle to green to see the result. In next post i will try stars and top of bottle.

maindata <- read.csv(file="E:/REDA/japan_flag_data1.csv", header=FALSE, sep=",")


test  <- read.csv(file="E:/REDA/japan_flag_data1_test.csv", header=FALSE, sep=",")

traindata = data.frame(
  x=maindata$V1, y=maindata$V2 ,colpart = as.factor( maindata$V3 )
)

testdf = data.frame(
  x=test$V1, y=test$V2 ,colpart = as.factor( test$V3 )
)


plot(traindata$x, traindata$y , col =  traindata$colpart   )

fitsvm =  svm(colpart ~ ., data=traindata)

plot(fitsvm, traindata, y~x , col=c("red","white")) 




predictdata = predict(fitsvm, newdata = testdf)



points(x=testdf$x, y = testdf$y , pch = 19,col = c("blue", "green")[as.numeric(predictdata)] ,cex=2.6)


The blue and green points are predictions from missing part. Blue ones which are nearer to center of circle are predicted correct. Green one are not predicted correct.We can tune parameters to estimate missing values.

Saturday, January 28, 2017

SAP HANA, Scenario error Multiple definitions of node

We were trying to enrich our scenario by adding from SPVR_SCENARIO_EVT table.
In this table there were tasks and people who executed that task.



where "SCOPE_OBJECT_DEF_NAME" = 'Assign Related Lawyer' and "EVENT_NAME" = 'Complete'
For example below query will find who "Complete" the task , "Assign Related Lawyer".

> fitted(res2.mod1)
      select SCOPE_OBJECT_USER_DISPLAY_NAME from  "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    
    where "SCOPE_OBJECT_DEF_NAME" = 'Assign Related Lawyer' and "EVENT_NAME" = 'Complete' 

The problem was were adding this query for 6 times for different roles. The OPInt was giving error
after second join ,

The following errors occurred: Inconsistent calculation model (34011)
 Details (Errors):
 - calculationNode (MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT) -> operation (TableDataSource): Multiple definitions of node MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT found.


We generated a new script view and joined same table for several times.
Dummy join is for bringing a row for every scenario even if related data(s) are null or sparse.

BEGIN 
   var_out = select distinct  
    CAST ( dummyjoin.SCENARIO_INSTANCE_ID AS NVARCHAR) CC_SCENARIO_ID ,
  CAST ( catspec.SCOPE_OBJECT_USER_ID AS NVARCHAR) CC_CATSPEC_USERID ,catspec.SCOPE_OBJECT_USER_DISPLAY_NAME  CC_CATSPEC_DISPLAY,
      CAST (    lawmanager.SCOPE_OBJECT_USER_ID AS NVARCHAR)  CC_LAWYERMGR_USERID,lawmanager.SCOPE_OBJECT_USER_DISPLAY_NAME CC_LAWYERMGR_DISPLAY ,
      CAST (    lawyerjoin.SCOPE_OBJECT_USER_ID AS NVARCHAR) CC_LAWYER_USERID ,lawyerjoin.SCOPE_OBJECT_USER_DISPLAY_NAME      CC_LAWYER_DISPLAY     ,
          intclient.useridagg USERIDAGG ,    intclient.displayagg DISPLAYAGG, 1  MYSUM
 from  
 "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    dummyjoin
 left join  "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    catspec on dummyjoin.SCENARIO_INSTANCE_ID = catspec.SCENARIO_INSTANCE_ID
 and  catspec."SCOPE_OBJECT_DEF_ID" = 'RFX_POC.001.ACT_PUBL.255.ZPUB' and catspec."EVENT_NAME" = 'Complete'
    left join    "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    lawyerjoin on lawyerjoin.SCENARIO_INSTANCE_ID = dummyjoin.SCENARIO_INSTANCE_ID
    and lawyerjoin."SCOPE_OBJECT_DEF_NAME" = 'Review/Revise the Contract for Procurement' and lawyerjoin."EVENT_NAME" = 'Complete' 
 left join "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    lawmanager on lawmanager.SCENARIO_INSTANCE_ID = dummyjoin.SCENARIO_INSTANCE_ID
    and lawmanager."SCOPE_OBJECT_DEF_NAME" = 'Assign Related Lawyer' and lawmanager."EVENT_NAME" = 'Complete' 
    
    left join (
    
    select SCENARIO_INSTANCE_ID, STRING_AGG(SCOPE_OBJECT_USER_ID, ';') as useridagg , STRING_AGG(SCOPE_OBJECT_USER_DISPLAY_NAME, ';') as displayagg from
(select distinct SCENARIO_INSTANCE_ID, SCOPE_OBJECT_USER_ID, SCOPE_OBJECT_USER_DISPLAY_NAME from "SYS_PROCESS_VISIBILITY"."MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"
where ( SCOPE_OBJECT_DEF_NAME = 'Analyzing the Questions' or SCOPE_OBJECT_DEF_NAME = 'Create/Revise Technical Evaluation Report' )
 and EVENT_NAME = 'Completed')
 group BY SCENARIO_INSTANCE_ID 
 
     )  intclient on  intclient.SCENARIO_INSTANCE_ID = dummyjoin.SCENARIO_INSTANCE_ID ; 

END 

Residual vs Fitted for investment

While I was checking graphs i can draw from a regression model I realized
one graph is extremely useful for investment.
Think you have(actually we have) thousand of used car prices. And you want to buy a car
with optimum price for investment.(In hope you will sell later)

Best Car to buy Price = Max(Expected Price by regression - Real price )

Think our regression line expects a car price to be 30K but actual advertisement price is 20K.
There are 2 possibilities.
1)Car is damaged.
2)Car owner needs urgent money and selling his car with a very low price.

If car price is 40K. I can not find a logical explanation for this. Some people
are trying to sell their used cars with more price than an-unused one. Probably
they spend for some amenities which they think so valuable.


import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
  (20000,2011,30000.0),
  (120000,2014,20000.0),
  (60000,2015,25000.0) ,
  (20000,2011,32000.0),
  (120000,2014,21000.0),
  (60000,2015,45000.0)   
  
  )
).toDF("km", "year", "label")

val assembler = new VectorAssembler()
  .setInputCols(Array("km", "year"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
output.select("features", "label").show(false)

import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)


val lrModel = lr.fit(output)

display(lrModel, output, "fittedVsResiduals")


println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

​



val trainingSummary = lrModel.summary

println(s"numIterations: ${trainingSummary.totalIterations}")

println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")

trainingSummary.residuals.show()

println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

println(s"r2: ${trainingSummary.r2}")

Coefficients: [-0.19283786035452538,2928.112739446878] Intercept: -5853577.79139608
numIterations: 8
objectiveHistory: [0.5,0.446369757257132,0.352850077757605,0.272318835721877,0.26365142412164966,0.23726105027025182,0.23725993458647637,0.23725993457463043]
+-------------------+
|          residuals|
+-------------------+
|-1000.1704245014116|
|-500.72260739002377|
| -9999.106968107633|
|  999.8295754985884|
| 499.27739260997623|
| 10000.893031892367|
+-------------------+

Residual = Observed value - Predicted value

We must find the ones with most negative residual.(Much cheaper than expected).
Databricks graph has bad resolution for few points so i wrote R version also.

library(lattice) 
mydata2 = data.frame(
  year = c(2011.0,2012.0,2014.0,2015.0),
  km10000 = c(6.0,7.0,10.0,3.0),
  price1000 = c(200.0,250.0,300.0,400.0)
)



res2.mod1 = lm(price1000 ~  km10000 + year , data = mydata2)
summary(res.mod1)
fitted(res2.mod1)
xyplot(resid(res2.mod1) ~ fitted(res2.mod1),
       xlab = "Fitted Values",
       ylab = "Residuals",
       main = "Car price based on year and km ",
       par.settings = simpleTheme(col=c("blue","red"),
                                  pch=c(10,3,11), cex=3, lwd=2),
       
       panel = function(x, y, ...)
       {
         panel.grid(h = -1, v = -1)
         panel.abline(h = 0)
         panel.xyplot(x, y, ...)
       }
)       


> fitted(res2.mod1)
       1        2        3        4 
206.1722 240.9232 302.5415 400.3631 
> resid(res2.mod1)
         1          2          3          4 
-6.1721992  9.0767635 -2.5414938 -0.3630705 
> 

Sunday, January 22, 2017

Spark Dataframe Broadcast Join

Think you have a person table for your company with 10000 records. And you have 100 departments.

Problem) Find average age of people in Finance department

If we make a join,data needs to be shuffled for joining. But if we know one table is so small(like department) we can mark it as
broadcast so that our big table does not get shuffled.

Also when finding average age for department , all related data will be filtered on local nodes.



import org.apache.spark.sql.functions.broadcast

case class Person(id:Long,name: String, depid: Long)
val personDF = Seq(Person(1,"Andy", 1) ,Person(2,"John", 1),Person(3,"Jack", 2),Person(4,"Max", 2)).toDF()

case class Department(depid:Long,name: String)
val departmentDF = Seq(Department(1,"Finance") ,Department(2,"It")).toDF()

val partitionedPerson = personDF.repartition($"name").explain(true)
val combinedDF = personDF.join(departmentDF, Seq("depid"))
val combinedDF2 = personDF.join(broadcast(departmentDF), Seq("depid"))
combinedDF2.take(10)

We can also define our join with sql syntax.
personDF.createOrReplaceTempView("person")
departmentDF.createOrReplaceTempView("department")

sql("SELECT * FROM person r JOIN department s ON r.depid = s.depid").show()

id|name|depid|depid| name
1|Andy| 1| 1|Finance
2|John| 1| 1|Finance
3|Jack| 2| 2| It
4| Max| 2| 2| It

Spark HashPartitioner for RDD

RDD is a parallel distribution of data. According to our target calculation
we do not have to care how data is distributed. But if our data will benefit from data locality,
we must ensure data locality.
Think we have some car data (Honda,Toyota,Ford) and we will count,average,sum... of cars by Model.
If we have 3 partitions which these models are randomly distributed, data must be shuffled before reducing.

If we partition by car model and we are sure that all Honda data is at same partition, we will
calculate everything in one node and no need to shuffle.

When I was testing partitioning I taught partitioning will be based on data cardinality.
Since I have 3 distinct keys, their hash will be distributed on 3 slots.
But it was wrong. In fact hashcode of object is applied a mod operator on partition number.
So you could get very unpredictable results if you hash over Strings. I advise to convert Strings
to hashcode and see int value for exactly knowing final partition distribution.

val cars = Array("Honda", "Toyota", "Ford")

val carnamePrice = sc.parallelize(for {
    x <- cars
    y <- Array(100,200,300)
} yield (x, y), 8)
val rddEachCar = carnamePrice.partitionBy(new HashPartitioner(3))
val mapped =   rddEachCar.mapPartitionsWithIndex{
                    (index, iterator) => {
                       println("Called in Partition -> " + index)
                       val myList = iterator.toList
                       
                       myList.map(x => x + " -> " + index).iterator
                    }
                 }
mapped.take(10)



Array[String] = Array((Toyota,100) -> 0, (Toyota,200) -> 0, (Toyota,300) -> 0, (Honda,100) -> 1, (Honda,200) -> 1, (Honda,300) -> 1, (Ford,100) -> 2, (Ford,200) -> 2, (Ford,300) -> 2)

println ( "Honda".hashCode() % 3)
println ( "Ford".hashCode() % 3 )
println ( "Toyota".hashCode() % 3 )
1
2
0

Wednesday, January 18, 2017

Impute Outliers In Spark

Imputing data in R is so easy.

fun <- function(x){
    quantiles <- quantile( x, c(.05, .95 ) )
    x[ x < quantiles[1] ] <- quantiles[1]
    x[ x > quantiles[2] ] <- quantiles[2]
    x
}
fun( yourdata )
I tried to write my functions for Spark. Below you will find methods for replacing values
Q1 is 1st quantile and
Q3 is 3rd quantile
left outliers < Q1 – 1.5×IQR
right outliers > Q3 + 1.5×IQR
For a dataset I obtain below values for these results.
org.apache.spark.util.StatCounter = (count: 13, mean: 131.538462, stdev: 65.441242, max: 320.000000, min: 1.000000) 
rddMin: Double = 1.0 
rddMax: Double = 320.0 
rddMean: Double = 131.53846153846155 
quantiles: Array[Double] = Array(117.0, 150.0) 
Q1: Double = 117.0 
Q3: Double = 150.0 
IQR: Double = 33.0 
lowerRange: Double = 67.5 
upperRange: Double = 199.5
So according to our data set and our logic we can make below changes.(Of course there are more)
left outliers -> Q1
left outliers -> mean
right outliers -> Q3
right outliers -> mean
You can also want to impute values less than Q1 to Q1.
import org.apache.spark.sql.functions._

val imputeLessToTarget: (Double,Double,Double ) => Double = (arg: Double,treshold: Double,target: Double) => { if (arg < treshold) target else arg}
val imputeLessToTargetUDF = udf(imputeLessToTarget)

val imputeMoreToTarget: (Double,Double,Double ) => Double = (arg: Double,treshold: Double,target: Double) => { if (arg > treshold) target else arg}
val imputeMoreToTargetUDF = udf(imputeMoreToTarget)


//Change values less than lowerRange of outliers to Q1 **** Capping
val df5 = df.withColumn("replaced", imputeLessToTargetUDF(col("original") ,lit(lowerRange),lit(Q1) ))
println( "df5 Change values less than lowerRange of outliers to Q1 **** Capping")
df5.show()
//Change values less than lowerRange of outliers to mean
val df6 = df.withColumn("replaced", imputeLessToTargetUDF(col("original") ,lit(lowerRange),lit(rddMean) ))
println( "df6 Change values less than lowerRange of outliers to mean")
df6.show()
//Change values less than Q1 to mean
val df7 = df.withColumn("replaced", imputeLessToTargetUDF(col("original") ,lit(Q1),lit(rddMean) ))
println( "df7 Change values less than Q1 to mean")
df7.show()
//Change values more than upperRange of outliers to Q3 **** Capping
val df8 = df.withColumn("replaced", imputeMoreToTargetUDF(col("original") ,lit(upperRange),lit(Q3) ))
println( "df8 Change values more than upperRange of outliers to Q3 **** Capping")
df8.show()
//Change values more than upperRange of outliers to mean
val df9 = df.withColumn("replaced", imputeMoreToTargetUDF(col("original") ,lit(upperRange),lit(rddMean) ))
println( "df9 Change values more than upperRange of outliers to mean")
df9.show()
At below you can see output of generating new columns. These type of functions are my toolset for dealing with new data.
df5 Change values less than lowerRange of outliers to Q1 **** Capping 
1.0| 117.0
110.0| 110.0
111.0| 111.0
112.0| 112.0
117.0| 117.0
118.0| 118.0
120.0| 120.0
122.0| 122.0
129.0| 129.0
140.0| 140.0
150.0| 150.0
160.0| 160.0
320.0| 320.0
df6 Change values less than lowerRange of outliers to mean 
1.0|131.53846153846155
110.0| 110.0
111.0| 111.0
112.0| 112.0
117.0| 117.0
118.0| 118.0
120.0| 120.0
122.0| 122.0
129.0| 129.0
140.0| 140.0
150.0| 150.0
160.0| 160.0
320.0| 320.0
df7 Change values less than Q1 to mean 
1.0|131.53846153846155
110.0|131.53846153846155
111.0|131.53846153846155
112.0|131.53846153846155
117.0| 117.0
118.0| 118.0
120.0| 120.0
122.0| 122.0
129.0| 129.0
140.0| 140.0
150.0| 150.0
160.0| 160.0
320.0| 320.0
df8 Change values more than upperRange of outliers to Q3 **** Capping
1.0| 150.0
110.0| 150.0
111.0| 150.0
112.0| 150.0
117.0| 150.0
118.0| 150.0
120.0| 150.0
122.0| 150.0
129.0| 150.0
140.0| 150.0
150.0| 150.0
160.0| 150.0
320.0| 320.0
df9 Change values more than upperRange of outliers to mean 
1.0|131.53846153846155
110.0|131.53846153846155
111.0|131.53846153846155
112.0|131.53846153846155
117.0|131.53846153846155
118.0|131.53846153846155
120.0|131.53846153846155
122.0|131.53846153846155
129.0|131.53846153846155
140.0|131.53846153846155
150.0|131.53846153846155
160.0|131.53846153846155
320.0| 320.0

Tuesday, January 17, 2017

SparkException, Task not serializable

I usually run my codes on databricks for a 1st test.I was facing with lots of "Task not serializable" exceptions.
I was solving them in a way. Then I googled some and tried to list every reason that can cause this exception.
Here I will write simplest one.
When writing on databricks(single class lets say) if at a point i try to


Let's define 3 classes. Exception1 is class throwing serializable exception.
The reason behind this is when there is a call to function, Spark tries to serialize the enclosing object class.
There are 2 solutions.

1) NoException2 extends java.io.Serializable
just make class Serializable

2) Make add function function with def.
def add(a:Int) = a+1 // function
instead of
val add = (a: Int) => a + 1 // method

Functions in scala are serializable.


class Exception1 {

    val rdd = sc.parallelize(List(1,2,3))

    def addFunc =  {
      val result = rdd.map(add)
      result.take(3)
    }

    def add(a:Int) = a+1

  }
class NoException1 {
  val rdd = sc.parallelize(List(1,2,3))

  def addFunc() =  {
    val result = rdd.map(add)
    result.take(3)
  }

  val add = (a: Int) => a + 1
}
class NoException2 extends java.io.Serializable {
  val rdd = sc.parallelize(List(1,2,3))

  def doIT() =  {
    val result = rdd.map(add)
    result.take(3)
  }

  def add(a: Int) = a + 1
}

Calling functions.

(new Exception1()).addFunc
(new NoException1()).addFunc
(new NoException2()).addFunc
Results

org.apache.spark.SparkException: Task not serializable
res9: Array[Int] = Array(2, 3, 4)
res9: Array[Int] = Array(2, 3, 4)

Sunday, January 15, 2017

Write Elastic from Kafka

Reading and writing from Kafka is so time consuming because you write your code,
wait data to come. If you see exception break the program and write again.

Our task was
1)Read a Json formatted data from kafka
2)Filter messages if they include some keywords.
3)Write to elastic search.

**Add related kafka streaming jars to your Zeppelin or
application.



import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.rdd.RDD
import scalaj.http._
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.util.Config._
import org.apache.spark.sql.functions._
import org.elasticsearch.spark.sql._
import scalaj.http._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._

val ssc = new StreamingContext(sc, Seconds(30))
val topicMap:Map[String,Int] =Map("YourTopic"->1)
val lines = KafkaUtils.createStream(ssc,"your.com:2181", "group", topicMap).map(_._2)

lines.foreachRDD((rdd: RDD[String], time: Time) => {
      
  val df2 = sqlContext.read.json(rdd)

 
  val df = df2.filter($"msg".contains("mykeyword1") || $"msg".contains("mykeyword1") )
 
  
  val esConfig = Map("es.nodes"->"192.168.1.151","es.port"->"9200")

         df.saveToEs("elastic/target",esConfig)
 
    })

Spark, Migrate Data from Solr to MongoDB in Batches


Most of big data projects are migration. And most of utility packages does not work as in samples because
of version differences , library conflicts.

We decided to use Stratio/Spark-MongoDB for Spark to MongoDB migration.

We had to move data from old Solr files to Mongo. If we try to move all data it was getting exceptions
which implies size problems.(Sorry forgot the exceptions.) Then we need a method for writing the items in batch.
We tried different sizes and found 50000 was working.

Solr was also returning how many records are there actually even if we do not fetch.
So we defined a function(of course we could do at loop) to check if we consumed all of them in batches.

We got some errors on usage of library stratio. I found standard codes were not compatible with our environment and
found magic configuration by checking source code. I omitted lots of code below but it gives a perfect idea.
And in fact it is runnable if you write your data frames correctly.
Our environment was Spark 1.6.2 .


import scalaj.http._
import com.mongodb.casbah.{WriteConcern => MongodbWriteConcern}
import com.stratio.datasource._
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.schema._
import com.stratio.datasource.mongodb.writer._
import com.stratio.datasource.mongodb.config._
import com.stratio.datasource.mongodb.config.MongodbConfig._
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.util.Config._
import org.apache.spark.sql.functions._

val wbuilder = MongodbConfigBuilder(Map(Host -> List("192.168.2.150:27018"), Database -> "ManagementDev", Collection ->"arsiv", SamplingRatio -> 1.0, WriteConcern -> "normal", ReadPreference -> "Secondary"))
val writeConfig = wbuilder.build()

var condition=true
var start=0
var pass=0

def checkPaging(total:Long,start:Int):Boolean={
    if((start+50000)<(total)) {
        return true
    }
    else {
        return false
    }
}

do{
    val response1 = Http("http://192.168.2.155:8983/solr/collection1/select?q=*%3A*&sort=tw_id+asc&start="+start+"&rows=50000&fl=AuthorEmail&wt=json")
    .timeout(connTimeoutMs = 1000000, readTimeoutMs = 500000).header("key", "val").method("get")
    .execute().body
    val rdd1 = sc.parallelize(Seq(response1))
 
    val df1 = sqlContext.read.json(rdd1)
    
    val df2=df1.select($"response.docs")
    
    //Lots of omitted codes
    val dcount=df1.select("response.numFound").take(1)(0)(0).asInstanceOf[Long]
    //Lots of omitted codes

    dfy.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("saveCriteria").save("hdfs://your.com/user/Archive") 
    dfx.saveToMongodb(writeConfig)
    
    condition=checkPaging(dcount,start)
    start=start+50000
    pass=pass+1
    println("!!PASS "+pass )
} 
while(condition)

Gradient Descent for Linear Regression


I was searching for something(I do not remember) and I saw below post.
GradientDescentExample

This was a perfect post to test some parameters on gradient descent.
I opened my databricks notebook and began to play with it. I took the functions from this page but changed a bit
because of type changes in my code.

Question : Check picture below. We have points as below, is there a formula
that identifies this spread.


I used y1 = 5 * x1 + 10 + noise formula to generate this data.
So our target values are 5 and 10. ( Or a little different because of noise)

noise = np.random.normal(-3, 6, 49)

x1 = np.linspace(0, 50, 49)
y1 = 5 * x1 + 10 + noise
points = zip(x1,y1)

fig3, ax3 = plt.subplots()
#ax3.plot(x1, y1, 'k--')

ax3.plot(x1, y1, 'ro')



display(fig3)


So lets say you made an initial guess.
y = 5 * x + 3



Now lets say this is your initial guess. We must calculate how good is y = 5 * x + 3
from sum( (guess - actual)^2 ) / len ( standard formula)

What is next step? Make a better guess. How do determine you will be making a better guess.
There must be a function which will determine how your error decreases. Gradient descent function will
help you choose better values for slope and intercept.

There are some parameters you are giving to function. Learning rate and iteration count.
Learning rate is hard to understand. I checked various learning rates to see the effect.

for learning_rate = 0.00001
As you see in the graph error function is diminishing at each run. But by time
improvement is getting smaller.



for learning_rate = 0.001
It seems it is learning faster. But be careful this is a simple example and our distribution is simple with only one minimum.
If we had a complex function who has lots of convex ,concave shapes then our high/low learning rates could skip global minimum
or stuck in local minimum. Check pictures in net for these effects. There are lots of nice pictures.


You can play with parameters below and you will obtain very different results according to your parameters.

num_tests = 10
mycoef = 1
iter_count = 500

learning_rate = 0.001
initial_b = 0 # initial y-intercept guess
initial_m = 0 # initial slope guess


figError, axError = plt.subplots()

fig2, ax2 = plt.subplots()
plt.figure(1)

ax2.plot(x1, y1, 'ro')

errorList = [];


for i in range(num_tests):
  [b, m] = gradient_descent_runner(points, initial_b, initial_m, learning_rate, (i+mycoef)* iter_count)  
  
  
  x = np.linspace(0, 50, 49)
  y = x *m + b
  ax2.plot(x, y, 'k--')
  ax2.text(max(x),max(y),i)

  error = compute_error_for_line_given_points(b, m, points)
  errorList.append( error );
  
  
  axError.plot( i ,error , 'bo')
  print "After {0} iterations b = {1}, m = {2}, error = {3}".format( (i+mycoef)* iter_count, b, m, error)

Below is a result of parameters
num_tests = 10
mycoef = 1
iter_count = 500

You can play as much as you want and see the of error, slope and intercept.
Below graph seems bad because ,function performs so good from beginning and lines overlap.



Part I took from article

from numpy import *
import numpy as np
from StringIO import StringIO
import matplotlib.pyplot as plt
import numpy as np

# y = mx + b
# m is slope, b is y-intercept
def compute_error_for_line_given_points(b, m, points):
    totalError = 0
    for i in range(0, len(points)):
        x = points[i][ 0]
        y = points[i][ 1]
        totalError += (y - (m * x + b)) ** 2
    return totalError / float(len(points))

def step_gradient(b_current, m_current, points, learningRate):
    b_gradient = 0
    m_gradient = 0
    N = float(len(points))
    for i in range(0, len(points)):
        x = points[i, 0]
        y = points[i, 1]
        b_gradient += -(2/N) * (y - ((m_current * x) + b_current))
        m_gradient += -(2/N) * x * (y - ((m_current * x) + b_current))
    new_b = b_current - (learningRate * b_gradient)
    new_m = m_current - (learningRate * m_gradient)
    return [new_b, new_m]

def gradient_descent_runner(points, starting_b, starting_m, learning_rate, num_iterations):
    b = starting_b
    m = starting_m
    for i in range(num_iterations):
        b, m = step_gradient(b, m, array(points), learning_rate)
    return [b, m]

Monday, January 9, 2017

R Impute Dataframe( Replace outliers )

There are some methods over internet for imputing outliers.
I here give a sample I use,which i combined methods i found.
My intention is selectively applying imputing to numeric columns.
When
Q1 is 1st quantile and
Q3 is 3rd quantile
Below ranges are outliers by definition.
below Q1 – 1.5×IQR or above Q3 + 1.5×IQR


Below code replaces value as x < min or x > max with mean value. According to your
data median could be a better choice.

numcol <- c(1,3,40,50,600)
numcol2 <- c(2,420,400,500,600)
charcol <- c("a","a","b","b","a")


df <- data.frame(a=numcol,b=charcol,c=numcol2)
#select numeric columns to change
columnsToChange <- c("a","c")

df
for(i in columnsToChange){
  Q1 <- quantile(df[,i],0.75, na.rm=TRUE) 
  max <- Q1 + (IQR(df[,i], na.rm=TRUE) * 1.5 )
  
  Q3 <- quantile(df[,i],0.25, na.rm=TRUE)
  min <- Q3 - (IQR(df[,i], na.rm=TRUE) * 1.5 )
  
  message(sprintf("min ,  max  mean  %s %s mean of column %s \n", min,max ,mean(df[,i] )) )
  
  indexesstochange <- which(df[,i] < min | df[,i] > max)
  
  message(sprintf("indexes to change %s \n", indexesstochange ))
 
  df[,i][indexesstochange] <- mean(df[,i])
}
df
It produces the output below. For column a outlier is max value at index 5. For column c outlier is at min value at index 1.
1   1 a   2
2   3 a 420
3  40 b 400
4  50 b 500
5 600 a 600

min ,  max  mean  -67.5 120.5 mean of column 138.8 

indexes to change 5 

min ,  max  mean  250 650 mean of column 384.4 

indexes to change 1 


      a   b     c
1   1.0 a 384.4
2   3.0 a 420.0
3  40.0 b 400.0
4  50.0 b 500.0
5 138.8 a 600.0

Saturday, January 7, 2017

Trying to guess if a car is damaged or not by Logistic Regresion

We had a car data collected from website. It was an advertisement website for used cars.
Car data had below properties regarding damage.

1)If there is a big damage or any damage that insurance company knows people
say it is damaged.
2)If it is a small thing,if one can make up, or already painted that are and thinks
no one understand , he does not say it is damaged.

What we have is then
Car model
Year
Price
City
Date of publishing
Last update of advertisement
Days elapsed from publishing(if a car is sold it goes from list)
Elapsed Days for selling

Think there is no data as clue. We must generate , extract, invent our data.

a)So lets think how a damaged car owner thinks
b)What changes in advertisement over time if car has small damage.(car owner is
editing advertisement over time)



1)Number of page view

There is a mean number of average page view before car owner deletes advertisement.
Lets say a no damaged car is being sold after nearly 100 page views. If a car is advertised
as not damaged and still not sold after 100 page views it can have a problem.

2)Number of change in price

At 1st owner thinks he can sell his car with a price like non-damaged cars .After a period he makes some discounts.
Probably after some calls he realizes he has to make discount. So we can generate 2 variables from here
% discount he made from 1st price
# of discounts he made.

3)Is price lower than average with same conditions.

A sense of guilt could be determined.

4)Duration that it is on sale

Total duration car is on sale.


5)Difference of duration in days from duration average sales of same car model.

There is an average duration for every combination of cars. So elapsed day
after average duration(day or week) will increase the probability of damage.



6)Number of pictures in advertisement

Probably a damaged car owner will put no picture or 1-2 pictures.Less picture could mean more probability of damage.



Effect of Outliers with R

***
Spark or ( R ) samples are for big files which contains thousands of lines.
Also you do not know data and can not play with it.
I put here simplest data set for spark mllib so that one can play and understand what metrics
are effected from which parameters.
It is not for seniors but perfect for beginners of who need to calibrate parameters with simple sets.
***
R has a built in dataset as cars.

carssub
   speed dist
1      4    2
2      4   10
3      7    4
4      7   22
5      8   16
6      9   10
7     10   18
8     10   26
9     10   34
10    11   17
11    11   28
12    12   14
13    12   20
14    12   24
15    12   28
16    13   26
17    13   34
18    13   34
19    13   46
20    14   26
21    14   36
22    14   60
23    14   80
24    15   20
25    15   26
26    15   54
27    16   32
28    16   40
29    17   32
30    17   40


We first take a subset of data.
Then we add outliers. (1 and 5 outliers)
Then we see the effect of outliers.
lm : for fitting linear models.
abline : add line to plot

If you look at picture you will see how a line fits to data when no outlier.
When we add only 1 outlier it changes a lot. When 5 is added it gets much more worser.


carssub <- cars[1:30, ]  # original data

carssub <- cars[1:30, ]  # original data

cars_outliers1 <- data.frame(speed=c(20), dist=c( 218))  # introduce outliers.

cars_outliers5 <- data.frame(speed=c(19,19,20,20,20), dist=c(190, 186, 210, 220, 218))  # introduce outliers.

cars_outliers10 <- data.frame(speed=c(19,19,20,20,20,21,22,23,24,25), dist=c(190, 186, 210, 220, 218,220,224,230,235,240))

cars_outliers15 <- data.frame(speed=c(19,19,20,20,20,21,22,23,24,25,26,27,28,29,30), dist=c(190, 186, 210, 220, 218,220,224,230,235,240,244,245,248,250,252))

cars_outliers20 <- data.frame(speed=c(19,19,20,20,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35), dist=c(190, 186, 210, 220, 218,220,224,230,235,240,244,245,248,250,252,254,256,258,260,262))

cars_total_1 <- rbind(carssub, cars_outliers1)  # data with outliers.
cars_total_5 <- rbind(carssub, cars_outliers5)  # data with outliers.
cars_total_10 <- rbind(carssub, cars_outliers10)  # data with outliers.
cars_total_15 <- rbind(carssub, cars_outliers15)  # data with outliers.
cars_total_20 <- rbind(carssub, cars_outliers20)  # data with outliers.


par(mfrow=c(2, 3))
plot(carssub$speed, carssub$dist, xlim=c(0, 40), ylim=c(0, 300), main="Pure data", xlab="speed", ylab="dist", pch="*", col="red", cex=2)
abline(lm(dist ~ speed, data=carssub), col="blue", lwd=3, lty=2)

#aykiri gozlemsiz model
plot(cars_total_1$speed, cars_total_1$dist, xlim=c(0, 40), ylim=c(0, 300), main="1 outlier added", xlab="speed", ylab="dist", pch="*", col="red", cex=2)
lm1 <- lm(dist ~ speed, data=cars_total_1)
abline(lm1, col="blue", lwd=3, lty=2)
summary(lm1)

plot(cars_total_5$speed, cars_total_5$dist, xlim=c(0, 40), ylim=c(0, 300), main="5 outliers added", xlab="speed", ylab="dist", pch="*", col="red", cex=2)
lm2 <- lm(dist ~ speed, data=cars_total_5)
abline(lm2, col="blue", lwd=3, lty=2)
summary(lm2)

plot(cars_total_10$speed, cars_total_10$dist, xlim=c(0, 40), ylim=c(0, 300), main="10 outlier added", xlab="speed", ylab="dist", pch="*", col="red", cex=2)
lm10 <- lm(dist ~ speed, data=cars_total_10)
abline(lm10, col="blue", lwd=3, lty=2)
summary(lm10)


plot(cars_total_15$speed, cars_total_15$dist, xlim=c(0, 40), ylim=c(0, 300), main="15 outlier added", xlab="speed", ylab="dist", pch="*", col="red", cex=2)
lm15 <- lm(dist ~ speed, data=cars_total_15)
abline(lm15, col="blue", lwd=3, lty=2)
summary(lm15)

plot(cars_total_20$speed, cars_total_20$dist, xlim=c(0, 40), ylim=c(0, 300), main="20 outlier added", xlab="speed", ylab="dist", pch="*", col="red", cex=2)
lm20 <- lm(dist ~ speed, data=cars_total_20)
abline(lm20, col="blue", lwd=3, lty=2)
summary(lm20)



no outliers: dist = speed * 2.9 - 6.8
1 outlier: dist = speed * 6.1 -40
5 outliers: dist = speed * 11.4 - 95
You can see that slope and intercept are getting worser values up to 10 but after that it is changing shape and error function is decreasing. And outliers are also becoming a large group and they are not outliers..
Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept)  -6.8446     8.7420  -0.783 0.440223    
speed         2.9730     0.7046   4.219 0.000233 ***

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept)  -40.015     19.271  -2.076 0.046817 *  
speed          6.131      1.515   4.048 0.000351 ***

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept)  -95.203     24.507  -3.885 0.000466 ***
speed         11.437      1.793   6.379 3.18e-07 ***

Call:
lm(formula = dist ~ speed, data = cars_total_1)

Residuals:
    Min      1Q  Median      3Q     Max 
-32.210 -15.883  -5.555   6.641 135.398 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept)  -40.015     19.271  -2.076 0.046817 *  
speed          6.131      1.515   4.048 0.000351 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 30.63 on 29 degrees of freedom
Multiple R-squared:  0.361, Adjusted R-squared:  0.339 
F-statistic: 16.38 on 1 and 29 DF,  p-value: 0.0003514


Call:
lm(formula = dist ~ speed, data = cars_total_5)

Residuals:
    Min      1Q  Median      3Q     Max 
-67.220 -27.755  -7.473  19.428  86.471 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept)  -95.203     24.507  -3.885 0.000466 ***
speed         11.437      1.793   6.379 3.18e-07 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 43.88 on 33 degrees of freedom
Multiple R-squared:  0.5522, Adjusted R-squared:  0.5386 
F-statistic: 40.69 on 1 and 33 DF,  p-value: 3.175e-07



Call:
lm(formula = dist ~ speed, data = cars_total_10)

Residuals:
    Min      1Q  Median      3Q     Max 
-81.855 -30.503  -0.082  34.346  77.691 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept) -123.552     20.715  -5.964 6.37e-07 ***
speed         13.965      1.366  10.221 1.85e-12 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 44.15 on 38 degrees of freedom
Multiple R-squared:  0.7333, Adjusted R-squared:  0.7263 
F-statistic: 104.5 on 1 and 38 DF,  p-value: 1.852e-12



Call:
lm(formula = dist ~ speed, data = cars_total_15)

Residuals:
    Min      1Q  Median      3Q     Max 
-78.833 -30.297  -3.225  31.292  71.650 

Coefficients:
             Estimate Std. Error t value Pr(>|t|)    
(Intercept) -114.7213    16.5388  -6.936 1.59e-08 ***
speed         13.2679     0.9684  13.701  < 2e-16 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 42.11 on 43 degrees of freedom
Multiple R-squared:  0.8136, Adjusted R-squared:  0.8093 
F-statistic: 187.7 on 1 and 43 DF,  p-value: < 2.2e-16




Call:
lm(formula = dist ~ speed, data = cars_total_20)

Residuals:
   Min     1Q Median     3Q    Max 
-73.15 -31.29  -6.27  33.74  79.83 

Coefficients:
            Estimate Std. Error t value Pr(>|t|)    
(Intercept) -93.3047    14.5507  -6.412 5.86e-08 ***
speed        11.6738     0.7548  15.466  < 2e-16 ***
---
Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1

Residual standard error: 42.92 on 48 degrees of freedom
Multiple R-squared:  0.8329, Adjusted R-squared:  0.8294 
F-statistic: 239.2 on 1 and 48 DF,  p-value: < 2.2e-16