Sunday, January 22, 2017

Spark Dataframe Broadcast Join

Think you have a person table for your company with 10000 records. And you have 100 departments.

Problem) Find average age of people in Finance department

If we make a join,data needs to be shuffled for joining. But if we know one table is so small(like department) we can mark it as
broadcast so that our big table does not get shuffled.

Also when finding average age for department , all related data will be filtered on local nodes.



import org.apache.spark.sql.functions.broadcast

case class Person(id:Long,name: String, depid: Long)
val personDF = Seq(Person(1,"Andy", 1) ,Person(2,"John", 1),Person(3,"Jack", 2),Person(4,"Max", 2)).toDF()

case class Department(depid:Long,name: String)
val departmentDF = Seq(Department(1,"Finance") ,Department(2,"It")).toDF()

val partitionedPerson = personDF.repartition($"name").explain(true)
val combinedDF = personDF.join(departmentDF, Seq("depid"))
val combinedDF2 = personDF.join(broadcast(departmentDF), Seq("depid"))
combinedDF2.take(10)

We can also define our join with sql syntax.
personDF.createOrReplaceTempView("person")
departmentDF.createOrReplaceTempView("department")

sql("SELECT * FROM person r JOIN department s ON r.depid = s.depid").show()

id|name|depid|depid| name
1|Andy| 1| 1|Finance
2|John| 1| 1|Finance
3|Jack| 2| 2| It
4| Max| 2| 2| It

No comments:

Post a Comment