Joining Spark RDD’s
Today I will be demonstrating, how you can perform joins on Spark RDD’s. We are going to focus on three basic join operations.
1. Join (Inner)
2.Left Outer Join
3. Right Outer Join
Lets take standard Employee+Department example and create a two RDDs;one holding employee data and another holding department data.
Employee Table :
val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez")))Array[(Int, String, String)] = Array((101,Sam,Flam), (102,Scot,Rut), (103,Jas,Tez)) // outputval DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102)))Array[(String, String, Int)] = Array((D01,Computer,101), (D02,Electronic,104), (D03,Civil,102)) // output
Now as we know for joining two table, we need a key. So lets format our RDD to have key for performing join operation.
val EmpWithKeyRDD = EmpRDD.keyBy(x => x._1) // Forms a new RDD with key as Eidval DetpWithKeyRDD = DeptRDD.keyBy(x => x._3)
Output After Adding Key:
Array[(Int, (Int, String, String))] = Array((101,(101,Sam,Flam)), (102,(102,Scot,Rut)), (103,(103,Jas,Tez)))
Array[(Int, (String, String, Int))] = Array((101,(D01,Computer,101)), (104,(D02,Electronic,104)), (102,(D03,Civil,102)))
We are good to go now. Lets perform join operation on two RDD’s
1. Inner Join
val EmpDeptJoin = EmpWithKeyRDD.join(DetpWithKeyRDD)EmpDeptJoin .collect() // to Check Result
Array[(Int, ((Int, String, String), (String, String, Int)))] = Array((101,((101,Sam,Flam),(D01,Computer,101))), (102,((102,Scot,Rut),(D03,Civil,102))))
val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD)EmpDeptLeftJoin.collect() // to Check Result
Array[(Int, ((Int, String, String), Option[(String, String, Int)]))] = Array((101,((101,Sam,Flam),Some((D01,Computer,101)))), (102,((102,Scot,Rut),Some((D03,Civil,102)))), (103,((103,Jas,Tez),None)))
val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD)EmpDeptRightJoin .collect() // to Check Result
Output :
Array[(Int, (Option[(Int, String, String)], (String, String, Int)))] = Array((101,(Some((101,Sam,Flam)),(D01,Computer,101))), (104,(None,(D02,Electronic,104))), (102,(Some((102,Scot,Rut)),(D03,Civil,102))))
We can perform formatting on data that we received as output from each action. Lets take output from inner join and bring in proper format.
val EmpDeptJoinFormatted = => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2))
Output :
Array[(Int, String, String, String, String)] = Array((101,Sam,Flam,D01,Computer), (102,Scot,Rut,D03,Civil))
Explanation of operation performed using map function on each element of RDD.

Looking at diagram , it is clear that how each element is formatted for creating final RDD.
Complete Code
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object JoinRDD {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val EmpRDD = sc.parallelize(Seq((101,"Sam","Flam"),(102,"Scot","Rut"),(103,"Jas","Tez")))
val DeptRDD = sc.parallelize(Seq(("D01","Computer",101),("D02","Electronic",104),("D03","Civil",102)))
val EmpWithKeyRDD = EmpRDD.keyBy(x => x._1)
val DetpWithKeyRDD = DeptRDD.keyBy(x => x._3)
val EmpDeptJoin = EmpWithKeyRDD.join(DetpWithKeyRDD)
val EmpDeptLeftJoin = EmpWithKeyRDD.leftOuterJoin(DetpWithKeyRDD)
val EmpDeptRightJoin = EmpWithKeyRDD.rightOuterJoin(DetpWithKeyRDD)
val EmpDeptJoinFormatted = => (x._1,x._2._1._2,x._2._1._3,x._2._2._1,x._2._2._2))
