Sunday, January 22, 2017

Spark HashPartitioner for RDD

RDD is a parallel distribution of data. According to our target calculation
we do not have to care how data is distributed. But if our data will benefit from data locality,
we must ensure data locality.
Think we have some car data (Honda,Toyota,Ford) and we will count,average,sum... of cars by Model.
If we have 3 partitions which these models are randomly distributed, data must be shuffled before reducing.

If we partition by car model and we are sure that all Honda data is at same partition, we will
calculate everything in one node and no need to shuffle.

When I was testing partitioning I taught partitioning will be based on data cardinality.
Since I have 3 distinct keys, their hash will be distributed on 3 slots.
But it was wrong. In fact hashcode of object is applied a mod operator on partition number.
So you could get very unpredictable results if you hash over Strings. I advise to convert Strings
to hashcode and see int value for exactly knowing final partition distribution.

val cars = Array("Honda", "Toyota", "Ford")

val carnamePrice = sc.parallelize(for {
    x <- cars
    y <- Array(100,200,300)
} yield (x, y), 8)
val rddEachCar = carnamePrice.partitionBy(new HashPartitioner(3))
val mapped =   rddEachCar.mapPartitionsWithIndex{
                    (index, iterator) => {
                       println("Called in Partition -> " + index)
                       val myList = iterator.toList
                       
                       myList.map(x => x + " -> " + index).iterator
                    }
                 }
mapped.take(10)



Array[String] = Array((Toyota,100) -> 0, (Toyota,200) -> 0, (Toyota,300) -> 0, (Honda,100) -> 1, (Honda,200) -> 1, (Honda,300) -> 1, (Ford,100) -> 2, (Ford,200) -> 2, (Ford,300) -> 2)

println ( "Honda".hashCode() % 3)
println ( "Ford".hashCode() % 3 )
println ( "Toyota".hashCode() % 3 )
1
2
0

1 comment: