Sunday, March 5, 2017

Bloom filter , Probabilistic Cache and much more

Probabilistic algorithms first make an impact as incorrect,but they are unbelievably efficient.
Also of course if probabilistic error is not on critical path you do not have to worry.
Once we had a web service here customer was asking if they send us some users.
Think they are sending 10000-20000 users on every batch,and doing so frequent.

If I hit database and check if user exists, i will query for thousand times.
If I make a cache on memory, i need to make a hashmap with item size in database.
Say I have 50K user. Cache will be 50K x user name lets say.

In Bloom Filter instead of caching every time,we compute one projected value for a hash function
and put it in a bit array we occupy much much less space and get incredible performance.

And if we use a few hash functions at one time,we get more accurate results.

This algorithm always tell exact if item exists in cache.
If it says,not in cache, there is a probability it is saying wrong.

For my scenario it is not causing any problem but maybe one unnecessary database hit.

For example I am showing my cache for Country names.
It has 227 items.

Check below address for best parameters.
http://hur.st/bloomfilter?n=227&p=0.01

For our sample parameters are below.

n = 227, p = 0.01 (1 in 100) → m = 2,176 (272B), k = 7

With only 272 Byte and 7 hash functions we can get %1 error rate.

import java.util.BitSet;
import java.util.Random;

import com.sangupta.murmur.Murmur1;
import com.sangupta.murmur.Murmur2;
import com.sangupta.murmur.Murmur3;

public class BloomFilterCache {

 public static String[] COUNTRIES = new String[] { "Afghanistan ", "Albania ", "Algeria ", "American Samoa ",
   "Andorra ", "Angola ", "Anguilla ", "Antigua & Barbuda ", "Argentina ", "Armenia ", "Aruba ", "Australia ",
   "Austria ", "Azerbaijan ", "Bahamas, The ", "Bahrain ", "Bangladesh ", "Barbados ", "Belarus ", "Belgium ",
   "Belize ", "Benin ", "Bermuda ", "Bhutan ", "Bolivia ", "Bosnia & Herzegovina ", "Botswana ", "Brazil ",
   "British Virgin Is. ", "Brunei ", "Bulgaria ", "Burkina Faso ", "Burma ", "Burundi ", "Cambodia ",
   "Cameroon ", "Canada ", "Cape Verde ", "Cayman Islands ", "Central African Rep. ", "Chad ", "Chile ",
   "China ", "Colombia ", "Comoros ", "Congo, Dem. Rep. ", "Congo, Repub. of the ", "Cook Islands ",
   "Costa Rica ", "Cote d'Ivoire ", "Croatia ", "Cuba ", "Cyprus ", "Czech Republic ", "Denmark ", "Djibouti ",
   "Dominica ", "Dominican Republic ", "East Timor ", "Ecuador ", "Egypt ", "El Salvador ",
   "Equatorial Guinea ", "Eritrea ", "Estonia ", "Ethiopia ", "Faroe Islands ", "Fiji ", "Finland ", "France ",
   "French Guiana ", "French Polynesia ", "Gabon ", "Gambia, The ", "Gaza Strip ", "Georgia ", "Germany ",
   "Ghana ", "Gibraltar ", "Greece ", "Greenland ", "Grenada ", "Guadeloupe ", "Guam ", "Guatemala ",
   "Guernsey ", "Guinea ", "Guinea-Bissau ", "Guyana ", "Haiti ", "Honduras ", "Hong Kong ", "Hungary ",
   "Iceland ", "India ", "Indonesia ", "Iran ", "Iraq ", "Ireland ", "Isle of Man ", "Israel ", "Italy ",
   "Jamaica ", "Japan ", "Jersey ", "Jordan ", "Kazakhstan ", "Kenya ", "Kiribati ", "Korea, North ",
   "Korea, South ", "Kuwait ", "Kyrgyzstan ", "Laos ", "Latvia ", "Lebanon ", "Lesotho ", "Liberia ", "Libya ",
   "Liechtenstein ", "Lithuania ", "Luxembourg ", "Macau ", "Macedonia ", "Madagascar ", "Malawi ",
   "Malaysia ", "Maldives ", "Mali ", "Malta ", "Marshall Islands ", "Martinique ", "Mauritania ",
   "Mauritius ", "Mayotte ", "Mexico ", "Micronesia, Fed. St. ", "Moldova ", "Monaco ", "Mongolia ",
   "Montserrat ", "Morocco ", "Mozambique ", "Namibia ", "Nauru ", "Nepal ", "Netherlands ",
   "Netherlands Antilles ", "New Caledonia ", "New Zealand ", "Nicaragua ", "Niger ", "Nigeria ",
   "N. Mariana Islands ", "Norway ", "Oman ", "Pakistan ", "Palau ", "Panama ", "Papua New Guinea ",
   "Paraguay ", "Peru ", "Philippines ", "Poland ", "Portugal ", "Puerto Rico ", "Qatar ", "Reunion ",
   "Romania ", "Russia ", "Rwanda ", "Saint Helena ", "Saint Kitts & Nevis ", "Saint Lucia ",
   "St Pierre & Miquelon ", "Saint Vincent and the Grenadines ", "Samoa ", "San Marino ",
   "Sao Tome & Principe ", "Saudi Arabia ", "Senegal ", "Serbia ", "Seychelles ", "Sierra Leone ",
   "Singapore ", "Slovakia ", "Slovenia ", "Solomon Islands ", "Somalia ", "South Africa ", "Spain ",
   "Sri Lanka ", "Sudan ", "Suriname ", "Swaziland ", "Sweden ", "Switzerland ", "Syria ", "Taiwan ",
   "Tajikistan ", "Tanzania ", "Thailand ", "Togo ", "Tonga ", "Trinidad & Tobago ", "Tunisia ", "Turkey ",
   "Turkmenistan ", "Turks & Caicos Is ", "Tuvalu ", "Uganda ", "Ukraine ", "United Arab Emirates ",
   "United Kingdom ", "United States ", "Uruguay ", "Uzbekistan ", "Vanuatu ", "Venezuela ", "Vietnam ",
   "Virgin Islands ", "Wallis and Futuna ", "West Bank ", "Western Sahara ", "Yemen ", "Zambia ",
   "Zimbabwe " };

 private BitSet bitset;

 int[] hashSeeds;

 int noHashFunctions;

 public boolean useSeed = true;

 public int murmurVersion = 1;

 public int getSeed(int i) {
  if (useSeed)
   return hashSeeds[i];
  else
   return i;

 }

 public BloomFilterCache(int slots, int hashFunctions) {
  bitset = new BitSet(slots);
  noHashFunctions = hashFunctions;
  Random r = new Random(System.currentTimeMillis());
  hashSeeds = new int[hashFunctions];
  for (int i = 0; i < hashFunctions; ++i) {
   hashSeeds[i] = r.nextInt();
  }

 }

 public int getHash(String value, int i) {

  if (murmurVersion == 1)
   return (int) Murmur1.hash(value.getBytes(), 4, getSeed(i));
  else if (murmurVersion == 2)
   return (int) Murmur2.hash(value.getBytes(), 4, getSeed(i));
  else
   return (int) Murmur3.hash_x86_32(value.getBytes(), 4, getSeed(i));
 }

 public void add(String value) {

  for (int i = 0; i < noHashFunctions; ++i) {
   int h = getHash(value, i);

   bitset.set(Math.abs(h) % bitset.size(), true);
  }
 }

 public boolean contains(String value) {

  for (int i = 0; i < noHashFunctions; ++i) {
   int h = getHash(value, i);

   if (!bitset.get(Math.abs(h) % bitset.size())) {
    return false;

   }
  }

  return true;
 }

 public static String generateRandomChars(String candidateChars, int length) {
  StringBuilder sb = new StringBuilder();
  Random random = new Random();
  for (int i = 0; i < length; i++) {
   sb.append(candidateChars.charAt(random.nextInt(candidateChars.length())));
  }

  return sb.toString();
 }

 public int getErrorCount(String[] testSet, String postFix, String prefix) {
  int error = 0;

  for (String s : testSet) {

   if (contains(generateRandomChars("ABCDEFGHIJKLMNOPQRSTUVWXYZ".toLowerCase(), 3) + s + "")) {
    error++;
   }
  }
  return error;
 }

 public int getErrorCountRandom(int testSize) {
  int error = 0;

  for (int i = 0; i < testSize; i++) {

   String test = generateRandomChars("ABCDEFGHIJKLMNOPQRSTUVWXYZ".toLowerCase(), 10);
   if (contains(test) ) {
    error++;
    //System.err.println(test);
   }

  }
  return error;
 }

 public static void testFor(int slots, int hashFunctions, boolean useSeed, int murmurVersion) {
  BloomFilterCache bf = new BloomFilterCache(slots, hashFunctions);
  bf.useSeed = useSeed;
  bf.murmurVersion = murmurVersion;

  for (String s : COUNTRIES) {
   bf.add(s);
  }

  System.err.println("*****Params : slots = " + slots + " no#hash = " + hashFunctions + " cardinality = "
    + bf.bitset.cardinality() + " useSeed = " + useSeed + "  murmurVersion = " + murmurVersion);
  System.err.println("Query for Japan: " + bf.contains("Japan"));
  System.err.println("Query for Dummy: " + bf.contains("Dummy"));
  System.err.println("Error Count: " + bf.getErrorCount(COUNTRIES, "", ""));
  System.err.println("Error Count Prefix: " + bf.getErrorCount(COUNTRIES, "abc", ""));
  System.err.println("Error Count Postfix:  " + bf.getErrorCount(COUNTRIES, "", "abc"));
  System.err.println("Error Count Post-Pre: " + bf.getErrorCount(COUNTRIES, "abc", "def"));
  System.err.println("Error Count Random: " + bf.getErrorCountRandom(COUNTRIES.length));

 }

 public static void main(String[] args) {


  int[] versions = new int[] { 1, 2, 3 };
  boolean[] useSeeds = new boolean[] { true, false };
  for (int v : versions) {
   System.err.println(" ---------------------- ");
   for (boolean useSeed : useSeeds) {
    testFor(10000, 7, useSeed, v);
    testFor(2224, 2, useSeed, v);
    testFor(2224, 7, useSeed, v);
    testFor(2224, 17, useSeed, v);
    testFor(582, 3, useSeed, v);
   }
  }

 }
}

Below are multiple tests with multiple parameters . You can check effects by changing parameters.
97
97
3104
96321
 ---------------------- 
*****Params : slots = 10000 no#hash = 7 cardinality = 1336 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 369 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 10
Error Count Prefix: 6
Error Count Postfix:  4
Error Count Post-Pre: 4
Error Count Random: 2
*****Params : slots = 2224 no#hash = 7 cardinality = 1067 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 3
Error Count Prefix: 3
Error Count Postfix:  1
Error Count Post-Pre: 2
Error Count Random: 1
*****Params : slots = 2224 no#hash = 17 cardinality = 1755 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 3
Error Count Prefix: 4
Error Count Postfix:  5
Error Count Post-Pre: 3
Error Count Random: 2
*****Params : slots = 582 no#hash = 3 cardinality = 402 useSeed = true  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 61
Error Count Prefix: 57
Error Count Postfix:  53
Error Count Post-Pre: 58
Error Count Random: 54
*****Params : slots = 10000 no#hash = 7 cardinality = 1308 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 371 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 10
Error Count Prefix: 9
Error Count Postfix:  7
Error Count Post-Pre: 7
Error Count Random: 5
*****Params : slots = 2224 no#hash = 7 cardinality = 1034 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 2
Error Count Postfix:  1
Error Count Post-Pre: 2
Error Count Random: 0
*****Params : slots = 2224 no#hash = 17 cardinality = 1721 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 2
Error Count Prefix: 3
Error Count Postfix:  4
Error Count Post-Pre: 3
Error Count Random: 1
*****Params : slots = 582 no#hash = 3 cardinality = 378 useSeed = false  murmurVersion = 1
Query for Japan: true
Query for Dummy: false
Error Count: 49
Error Count Prefix: 42
Error Count Postfix:  47
Error Count Post-Pre: 33
Error Count Random: 45
 ---------------------- 
*****Params : slots = 10000 no#hash = 7 cardinality = 1339 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 382 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 9
Error Count Prefix: 6
Error Count Postfix:  6
Error Count Post-Pre: 3
Error Count Random: 4
*****Params : slots = 2224 no#hash = 7 cardinality = 1069 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 1
Error Count Prefix: 3
Error Count Postfix:  1
Error Count Post-Pre: 1
Error Count Random: 1
*****Params : slots = 2224 no#hash = 17 cardinality = 1780 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 3
Error Count Prefix: 3
Error Count Postfix:  4
Error Count Post-Pre: 3
Error Count Random: 8
*****Params : slots = 582 no#hash = 3 cardinality = 386 useSeed = true  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 50
Error Count Prefix: 43
Error Count Postfix:  48
Error Count Post-Pre: 40
Error Count Random: 53
*****Params : slots = 10000 no#hash = 7 cardinality = 1348 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 383 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 11
Error Count Prefix: 5
Error Count Postfix:  6
Error Count Post-Pre: 8
Error Count Random: 5
*****Params : slots = 2224 no#hash = 7 cardinality = 1063 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 2
Error Count Postfix:  1
Error Count Post-Pre: 1
Error Count Random: 2
*****Params : slots = 2224 no#hash = 17 cardinality = 1749 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 1
Error Count Prefix: 3
Error Count Postfix:  4
Error Count Post-Pre: 1
Error Count Random: 3
*****Params : slots = 582 no#hash = 3 cardinality = 408 useSeed = false  murmurVersion = 2
Query for Japan: true
Query for Dummy: false
Error Count: 60
Error Count Prefix: 61
Error Count Postfix:  49
Error Count Post-Pre: 47
Error Count Random: 60
 ---------------------- 
*****Params : slots = 10000 no#hash = 7 cardinality = 1322 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 359 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 18
Error Count Prefix: 22
Error Count Postfix:  12
Error Count Post-Pre: 13
Error Count Random: 14
*****Params : slots = 2224 no#hash = 7 cardinality = 1027 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  3
Error Count Post-Pre: 8
Error Count Random: 6
*****Params : slots = 2224 no#hash = 17 cardinality = 1727 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 8
Error Count Prefix: 6
Error Count Postfix:  8
Error Count Post-Pre: 3
Error Count Random: 6
*****Params : slots = 582 no#hash = 3 cardinality = 389 useSeed = true  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 52
Error Count Prefix: 63
Error Count Postfix:  49
Error Count Post-Pre: 49
Error Count Random: 56
*****Params : slots = 10000 no#hash = 7 cardinality = 1332 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 0
Error Count Prefix: 0
Error Count Postfix:  0
Error Count Post-Pre: 0
Error Count Random: 0
*****Params : slots = 2224 no#hash = 2 cardinality = 383 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 12
Error Count Prefix: 4
Error Count Postfix:  10
Error Count Post-Pre: 5
Error Count Random: 5
*****Params : slots = 2224 no#hash = 7 cardinality = 1060 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 2
Error Count Prefix: 0
Error Count Postfix:  3
Error Count Post-Pre: 0
Error Count Random: 1
*****Params : slots = 2224 no#hash = 17 cardinality = 1782 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 4
Error Count Prefix: 6
Error Count Postfix:  7
Error Count Post-Pre: 6
Error Count Random: 5
*****Params : slots = 582 no#hash = 3 cardinality = 392 useSeed = false  murmurVersion = 3
Query for Japan: true
Query for Dummy: false
Error Count: 59
Error Count Prefix: 59
Error Count Postfix:  57
Error Count Post-Pre: 60
Error Count Random: 57

Sunday, February 19, 2017

Support Vector Machine Estimation for Japanese Flag(A circle inside a rectangle)

Support Vector Machine (SVM) can sometimes surprise with its capability. I was thinking to test it a nonlinear example and suddenly taught,why do not i model Japanese flag. I can calculate points of rectangle and then points of circle inside. Then I taught why do i calculate, i better to draw picture and extract rgb values from it. And I can even extract my data from picture. For japanese flag I used 3 colors. White and Red : as in Japanese Flag Green : part that i cut from flag. I want SVM to predict this.



I extract values and dump into 2 separate csv files.

 
public static ArrayList<ArrayList<Object>> getImageData() {

		BufferedImage img;
		ArrayList<ArrayList<Object>> result = new ArrayList<>();
		try {
			img = ImageIO.read(new File(IMG));

			int widthSample = 1;
			int heightSample = 2;

			for (int j = 0; j < img.getHeight(); j++) {				
				if (j % heightSample != 0) {
					continue;
				}
				for (int i = 0; i < img.getWidth(); i++) {

					if (i % widthSample != 0) {
						continue;
					}

					String color = getPixelData(img, i, j);

					ArrayList<Object> res = new ArrayList<>();
					res.add(i);
					res.add(j);
					res.add(color);

					result.add(res);

				}
			}

		} catch (IOException e) {
			e.printStackTrace();
		}

		return result;
	}

	private static String getPixelData(BufferedImage img, int x, int y) {
		int argb = img.getRGB(x, y);

		int rgb[] = new int[] { (argb >> 16) & 0xff, // red
				(argb >> 8) & 0xff, // green
				(argb) & 0xff // blue
		};

		String color = colorUtils.getColorNameFromRgb(rgb[0], rgb[1], rgb[2]);

		return color;
	}



With this logic, you can generate some shapes in a plane and make some part of image as green. Green part will be your test data.(Place you want svm to predict) For example you can paint 1/4 of a circle or rectangle to green to see the result. In next post i will try stars and top of bottle.

maindata <- read.csv(file="E:/REDA/japan_flag_data1.csv", header=FALSE, sep=",")


test  <- read.csv(file="E:/REDA/japan_flag_data1_test.csv", header=FALSE, sep=",")

traindata = data.frame(
  x=maindata$V1, y=maindata$V2 ,colpart = as.factor( maindata$V3 )
)

testdf = data.frame(
  x=test$V1, y=test$V2 ,colpart = as.factor( test$V3 )
)


plot(traindata$x, traindata$y , col =  traindata$colpart   )

fitsvm =  svm(colpart ~ ., data=traindata)

plot(fitsvm, traindata, y~x , col=c("red","white")) 




predictdata = predict(fitsvm, newdata = testdf)



points(x=testdf$x, y = testdf$y , pch = 19,col = c("blue", "green")[as.numeric(predictdata)] ,cex=2.6)


The blue and green points are predictions from missing part. Blue ones which are nearer to center of circle are predicted correct. Green one are not predicted correct.We can tune parameters to estimate missing values.

Saturday, January 28, 2017

SAP HANA, Scenario error Multiple definitions of node

We were trying to enrich our scenario by adding from SPVR_SCENARIO_EVT table.
In this table there were tasks and people who executed that task.



where "SCOPE_OBJECT_DEF_NAME" = 'Assign Related Lawyer' and "EVENT_NAME" = 'Complete'
For example below query will find who "Complete" the task , "Assign Related Lawyer".

> fitted(res2.mod1)
      select SCOPE_OBJECT_USER_DISPLAY_NAME from  "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    
    where "SCOPE_OBJECT_DEF_NAME" = 'Assign Related Lawyer' and "EVENT_NAME" = 'Complete' 

The problem was were adding this query for 6 times for different roles. The OPInt was giving error
after second join ,

The following errors occurred: Inconsistent calculation model (34011)
 Details (Errors):
 - calculationNode (MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT) -> operation (TableDataSource): Multiple definitions of node MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT found.


We generated a new script view and joined same table for several times.
Dummy join is for bringing a row for every scenario even if related data(s) are null or sparse.

BEGIN 
   var_out = select distinct  
    CAST ( dummyjoin.SCENARIO_INSTANCE_ID AS NVARCHAR) CC_SCENARIO_ID ,
  CAST ( catspec.SCOPE_OBJECT_USER_ID AS NVARCHAR) CC_CATSPEC_USERID ,catspec.SCOPE_OBJECT_USER_DISPLAY_NAME  CC_CATSPEC_DISPLAY,
      CAST (    lawmanager.SCOPE_OBJECT_USER_ID AS NVARCHAR)  CC_LAWYERMGR_USERID,lawmanager.SCOPE_OBJECT_USER_DISPLAY_NAME CC_LAWYERMGR_DISPLAY ,
      CAST (    lawyerjoin.SCOPE_OBJECT_USER_ID AS NVARCHAR) CC_LAWYER_USERID ,lawyerjoin.SCOPE_OBJECT_USER_DISPLAY_NAME      CC_LAWYER_DISPLAY     ,
          intclient.useridagg USERIDAGG ,    intclient.displayagg DISPLAYAGG, 1  MYSUM
 from  
 "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    dummyjoin
 left join  "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    catspec on dummyjoin.SCENARIO_INSTANCE_ID = catspec.SCENARIO_INSTANCE_ID
 and  catspec."SCOPE_OBJECT_DEF_ID" = 'RFX_POC.001.ACT_PUBL.255.ZPUB' and catspec."EVENT_NAME" = 'Complete'
    left join    "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    lawyerjoin on lawyerjoin.SCENARIO_INSTANCE_ID = dummyjoin.SCENARIO_INSTANCE_ID
    and lawyerjoin."SCOPE_OBJECT_DEF_NAME" = 'Review/Revise the Contract for Procurement' and lawyerjoin."EVENT_NAME" = 'Complete' 
 left join "MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"    lawmanager on lawmanager.SCENARIO_INSTANCE_ID = dummyjoin.SCENARIO_INSTANCE_ID
    and lawmanager."SCOPE_OBJECT_DEF_NAME" = 'Assign Related Lawyer' and lawmanager."EVENT_NAME" = 'Complete' 
    
    left join (
    
    select SCENARIO_INSTANCE_ID, STRING_AGG(SCOPE_OBJECT_USER_ID, ';') as useridagg , STRING_AGG(SCOPE_OBJECT_USER_DISPLAY_NAME, ';') as displayagg from
(select distinct SCENARIO_INSTANCE_ID, SCOPE_OBJECT_USER_ID, SCOPE_OBJECT_USER_DISPLAY_NAME from "SYS_PROCESS_VISIBILITY"."MYSCENARO.gen_MYSCENARO::SPVR_MYSCENARO_EVT"
where ( SCOPE_OBJECT_DEF_NAME = 'Analyzing the Questions' or SCOPE_OBJECT_DEF_NAME = 'Create/Revise Technical Evaluation Report' )
 and EVENT_NAME = 'Completed')
 group BY SCENARIO_INSTANCE_ID 
 
     )  intclient on  intclient.SCENARIO_INSTANCE_ID = dummyjoin.SCENARIO_INSTANCE_ID ; 

END 

Residual vs Fitted for investment

While I was checking graphs i can draw from a regression model I realized
one graph is extremely useful for investment.
Think you have(actually we have) thousand of used car prices. And you want to buy a car
with optimum price for investment.(In hope you will sell later)

Best Car to buy Price = Max(Expected Price by regression - Real price )

Think our regression line expects a car price to be 30K but actual advertisement price is 20K.
There are 2 possibilities.
1)Car is damaged.
2)Car owner needs urgent money and selling his car with a very low price.

If car price is 40K. I can not find a logical explanation for this. Some people
are trying to sell their used cars with more price than an-unused one. Probably
they spend for some amenities which they think so valuable.


import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
  (20000,2011,30000.0),
  (120000,2014,20000.0),
  (60000,2015,25000.0) ,
  (20000,2011,32000.0),
  (120000,2014,21000.0),
  (60000,2015,45000.0)   
  
  )
).toDF("km", "year", "label")

val assembler = new VectorAssembler()
  .setInputCols(Array("km", "year"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
output.select("features", "label").show(false)

import org.apache.spark.ml.regression.LinearRegression

val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)


val lrModel = lr.fit(output)

display(lrModel, output, "fittedVsResiduals")


println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

​



val trainingSummary = lrModel.summary

println(s"numIterations: ${trainingSummary.totalIterations}")

println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")

trainingSummary.residuals.show()

println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

println(s"r2: ${trainingSummary.r2}")

Coefficients: [-0.19283786035452538,2928.112739446878] Intercept: -5853577.79139608
numIterations: 8
objectiveHistory: [0.5,0.446369757257132,0.352850077757605,0.272318835721877,0.26365142412164966,0.23726105027025182,0.23725993458647637,0.23725993457463043]
+-------------------+
|          residuals|
+-------------------+
|-1000.1704245014116|
|-500.72260739002377|
| -9999.106968107633|
|  999.8295754985884|
| 499.27739260997623|
| 10000.893031892367|
+-------------------+

Residual = Observed value - Predicted value

We must find the ones with most negative residual.(Much cheaper than expected).
Databricks graph has bad resolution for few points so i wrote R version also.

library(lattice) 
mydata2 = data.frame(
  year = c(2011.0,2012.0,2014.0,2015.0),
  km10000 = c(6.0,7.0,10.0,3.0),
  price1000 = c(200.0,250.0,300.0,400.0)
)



res2.mod1 = lm(price1000 ~  km10000 + year , data = mydata2)
summary(res.mod1)
fitted(res2.mod1)
xyplot(resid(res2.mod1) ~ fitted(res2.mod1),
       xlab = "Fitted Values",
       ylab = "Residuals",
       main = "Car price based on year and km ",
       par.settings = simpleTheme(col=c("blue","red"),
                                  pch=c(10,3,11), cex=3, lwd=2),
       
       panel = function(x, y, ...)
       {
         panel.grid(h = -1, v = -1)
         panel.abline(h = 0)
         panel.xyplot(x, y, ...)
       }
)       


> fitted(res2.mod1)
       1        2        3        4 
206.1722 240.9232 302.5415 400.3631 
> resid(res2.mod1)
         1          2          3          4 
-6.1721992  9.0767635 -2.5414938 -0.3630705 
> 

Sunday, January 22, 2017

Spark Dataframe Broadcast Join

Think you have a person table for your company with 10000 records. And you have 100 departments.

Problem) Find average age of people in Finance department

If we make a join,data needs to be shuffled for joining. But if we know one table is so small(like department) we can mark it as
broadcast so that our big table does not get shuffled.

Also when finding average age for department , all related data will be filtered on local nodes.



import org.apache.spark.sql.functions.broadcast

case class Person(id:Long,name: String, depid: Long)
val personDF = Seq(Person(1,"Andy", 1) ,Person(2,"John", 1),Person(3,"Jack", 2),Person(4,"Max", 2)).toDF()

case class Department(depid:Long,name: String)
val departmentDF = Seq(Department(1,"Finance") ,Department(2,"It")).toDF()

val partitionedPerson = personDF.repartition($"name").explain(true)
val combinedDF = personDF.join(departmentDF, Seq("depid"))
val combinedDF2 = personDF.join(broadcast(departmentDF), Seq("depid"))
combinedDF2.take(10)

We can also define our join with sql syntax.
personDF.createOrReplaceTempView("person")
departmentDF.createOrReplaceTempView("department")

sql("SELECT * FROM person r JOIN department s ON r.depid = s.depid").show()

id|name|depid|depid| name
1|Andy| 1| 1|Finance
2|John| 1| 1|Finance
3|Jack| 2| 2| It
4| Max| 2| 2| It

Spark HashPartitioner for RDD

RDD is a parallel distribution of data. According to our target calculation
we do not have to care how data is distributed. But if our data will benefit from data locality,
we must ensure data locality.
Think we have some car data (Honda,Toyota,Ford) and we will count,average,sum... of cars by Model.
If we have 3 partitions which these models are randomly distributed, data must be shuffled before reducing.

If we partition by car model and we are sure that all Honda data is at same partition, we will
calculate everything in one node and no need to shuffle.

When I was testing partitioning I taught partitioning will be based on data cardinality.
Since I have 3 distinct keys, their hash will be distributed on 3 slots.
But it was wrong. In fact hashcode of object is applied a mod operator on partition number.
So you could get very unpredictable results if you hash over Strings. I advise to convert Strings
to hashcode and see int value for exactly knowing final partition distribution.

val cars = Array("Honda", "Toyota", "Ford")

val carnamePrice = sc.parallelize(for {
    x <- cars
    y <- Array(100,200,300)
} yield (x, y), 8)
val rddEachCar = carnamePrice.partitionBy(new HashPartitioner(3))
val mapped =   rddEachCar.mapPartitionsWithIndex{
                    (index, iterator) => {
                       println("Called in Partition -> " + index)
                       val myList = iterator.toList
                       
                       myList.map(x => x + " -> " + index).iterator
                    }
                 }
mapped.take(10)



Array[String] = Array((Toyota,100) -> 0, (Toyota,200) -> 0, (Toyota,300) -> 0, (Honda,100) -> 1, (Honda,200) -> 1, (Honda,300) -> 1, (Ford,100) -> 2, (Ford,200) -> 2, (Ford,300) -> 2)

println ( "Honda".hashCode() % 3)
println ( "Ford".hashCode() % 3 )
println ( "Toyota".hashCode() % 3 )
1
2
0

Wednesday, January 18, 2017

Impute Outliers In Spark

Imputing data in R is so easy.

fun <- function(x){
    quantiles <- quantile( x, c(.05, .95 ) )
    x[ x < quantiles[1] ] <- quantiles[1]
    x[ x > quantiles[2] ] <- quantiles[2]
    x
}
fun( yourdata )
I tried to write my functions for Spark. Below you will find methods for replacing values
Q1 is 1st quantile and
Q3 is 3rd quantile
left outliers < Q1 – 1.5×IQR
right outliers > Q3 + 1.5×IQR
For a dataset I obtain below values for these results.
org.apache.spark.util.StatCounter = (count: 13, mean: 131.538462, stdev: 65.441242, max: 320.000000, min: 1.000000) 
rddMin: Double = 1.0 
rddMax: Double = 320.0 
rddMean: Double = 131.53846153846155 
quantiles: Array[Double] = Array(117.0, 150.0) 
Q1: Double = 117.0 
Q3: Double = 150.0 
IQR: Double = 33.0 
lowerRange: Double = 67.5 
upperRange: Double = 199.5
So according to our data set and our logic we can make below changes.(Of course there are more)
left outliers -> Q1
left outliers -> mean
right outliers -> Q3
right outliers -> mean
You can also want to impute values less than Q1 to Q1.
import org.apache.spark.sql.functions._

val imputeLessToTarget: (Double,Double,Double ) => Double = (arg: Double,treshold: Double,target: Double) => { if (arg < treshold) target else arg}
val imputeLessToTargetUDF = udf(imputeLessToTarget)

val imputeMoreToTarget: (Double,Double,Double ) => Double = (arg: Double,treshold: Double,target: Double) => { if (arg > treshold) target else arg}
val imputeMoreToTargetUDF = udf(imputeMoreToTarget)


//Change values less than lowerRange of outliers to Q1 **** Capping
val df5 = df.withColumn("replaced", imputeLessToTargetUDF(col("original") ,lit(lowerRange),lit(Q1) ))
println( "df5 Change values less than lowerRange of outliers to Q1 **** Capping")
df5.show()
//Change values less than lowerRange of outliers to mean
val df6 = df.withColumn("replaced", imputeLessToTargetUDF(col("original") ,lit(lowerRange),lit(rddMean) ))
println( "df6 Change values less than lowerRange of outliers to mean")
df6.show()
//Change values less than Q1 to mean
val df7 = df.withColumn("replaced", imputeLessToTargetUDF(col("original") ,lit(Q1),lit(rddMean) ))
println( "df7 Change values less than Q1 to mean")
df7.show()
//Change values more than upperRange of outliers to Q3 **** Capping
val df8 = df.withColumn("replaced", imputeMoreToTargetUDF(col("original") ,lit(upperRange),lit(Q3) ))
println( "df8 Change values more than upperRange of outliers to Q3 **** Capping")
df8.show()
//Change values more than upperRange of outliers to mean
val df9 = df.withColumn("replaced", imputeMoreToTargetUDF(col("original") ,lit(upperRange),lit(rddMean) ))
println( "df9 Change values more than upperRange of outliers to mean")
df9.show()
At below you can see output of generating new columns. These type of functions are my toolset for dealing with new data.
df5 Change values less than lowerRange of outliers to Q1 **** Capping 
1.0| 117.0
110.0| 110.0
111.0| 111.0
112.0| 112.0
117.0| 117.0
118.0| 118.0
120.0| 120.0
122.0| 122.0
129.0| 129.0
140.0| 140.0
150.0| 150.0
160.0| 160.0
320.0| 320.0
df6 Change values less than lowerRange of outliers to mean 
1.0|131.53846153846155
110.0| 110.0
111.0| 111.0
112.0| 112.0
117.0| 117.0
118.0| 118.0
120.0| 120.0
122.0| 122.0
129.0| 129.0
140.0| 140.0
150.0| 150.0
160.0| 160.0
320.0| 320.0
df7 Change values less than Q1 to mean 
1.0|131.53846153846155
110.0|131.53846153846155
111.0|131.53846153846155
112.0|131.53846153846155
117.0| 117.0
118.0| 118.0
120.0| 120.0
122.0| 122.0
129.0| 129.0
140.0| 140.0
150.0| 150.0
160.0| 160.0
320.0| 320.0
df8 Change values more than upperRange of outliers to Q3 **** Capping
1.0| 150.0
110.0| 150.0
111.0| 150.0
112.0| 150.0
117.0| 150.0
118.0| 150.0
120.0| 150.0
122.0| 150.0
129.0| 150.0
140.0| 150.0
150.0| 150.0
160.0| 150.0
320.0| 320.0
df9 Change values more than upperRange of outliers to mean 
1.0|131.53846153846155
110.0|131.53846153846155
111.0|131.53846153846155
112.0|131.53846153846155
117.0|131.53846153846155
118.0|131.53846153846155
120.0|131.53846153846155
122.0|131.53846153846155
129.0|131.53846153846155
140.0|131.53846153846155
150.0|131.53846153846155
160.0|131.53846153846155
320.0| 320.0