Union Operation

In this blog post, we are going to see about one of the set operation available on Spark RDD. Union operation is not something new to many of us as we have been working on relational databases for quit a long time.
 In general, union operation is to combine two data-sets into one.you can assume this as + operator in some sense. In sql world, result of union will have no duplicates and union-all will have duplicates . But on spark, union() operation combines two datasets  and produces resulting RDD with duplicates. union() is a narrow operation on Spark .Lets see it with an example.    

val rdd1 = sc.parallelize(Seq("one", "two", "three", "four"), 5)
val rdd2 = sc.parallelize(Seq("one", "two", "six", "five"), 5)
rdd1.union(rdd2).foreach { println(_) }   
Result
  one
  two
  three
  four
  one
  six
  five
  two

As you can see, we have duplicate values of one,two as a result of union() whereas it is not the case with SQL. The outcome of this makes us believe that union() operation on spark RDD is equivalent to union-all of SQL. just in case you wan to have SQL like union , then you probably need to apply distinct() operation in addition to union()  as mentioned in API document.

def union(other: RDD[T]): RDD[T]
Return the union of this RDD and another one. Any identical elements will appear multiple times (use .distinct() to eliminate them).

rdd1.union(rdd2).distinct().foreach { println(_) }


We know that spark is a parallel processing framework that uses partitions to effectively does its job. As I mentioned that union() combines two RDDs into one RDD and this behaviour naturally brings a question of “does this combine operation is limited to data ?”. Well you heard it right.It is not limited to data on RDD but it sums up partitions as well. Interesting ? let's see it with an example. 

val rdd1 = sc.parallelize(Seq("one", "two", "three", "four"), 5)
val rdd2 = sc.parallelize(Seq("one", "two", "six", "five"), 5)
println(rdd1.union(rdd2).partitions.size)
Result         

10


This shows that union() will also combine partitions and resulting RDD will contain sum of two RDD partition size.But In case with HashPartitioner, it behaves differently. 

val baseRDD = sc.parallelize(Array(("u1", "site1"), ("u1", "site1"),
("u1", "site2"), ("u2", "site2"), ("u2", "site2"), 
("u3", "site1"), ("u3", "site1"), ("u3", "site1"), 
("u4", "site1"), ("u1", "site2"))).partitionBy(new HashPartitioner(5))

val baseRDD2 = sc.parallelize(Array(("u1", "site1"), ("u1", "site1"), 
("u1", "site2"), ("u2", "site2"), ("u2", "site2"), ("u3", "site1"), 
("u3", "site1"), ("u3", "site1"), 
("u4", "site1"), ("u1", "site2"))).partitionBy(new HashPartitioner(4))

println(baseRDD.partitioner)
println(baseRDD2.partitioner)
println(baseRDD.union(baseRDD2).partitioner)
println(baseRDD.union(baseRDD2).partitions.size)


Result 

Some(org.apache.spark.HashPartitioner@5)
Some(org.apache.spark.HashPartitioner@4)
None
9


When partition size is not same on both the RDDs , the result RDD does not retain the partitioner but it sums up the size of partitions. Another scenario where it behaves differently is. 

val baseRDD = sc.parallelize(Array(("u1", "site1"), ("u1", "site1"),
 ("u1", "site2"), ("u2", "site2"), ("u2", "site2"), ("u3", "site1"), 
("u3", "site1"),("u3", "site1"), ("u4", "site1"), 
("u1", "site2"))).partitionBy(new HashPartitioner(5))

val baseRDD2 = sc.parallelize(Array(("u1", "site1"), ("u1", "site1"), 
("u1", "site2"), ("u2", "site2"), ("u2", "site2"), ("u3", "site1"), 
("u3", "site1"), ("u3", "site1"), ("u4", "site1"), 
("u1", "site2"))).partitionBy(new HashPartitioner(5))

println(baseRDD.partitioner)
println(baseRDD2.partitioner)
println(baseRDD.union(baseRDD2).partitioner)
println(baseRDD.union(baseRDD2).partitions.size)


Result 
Some(org.apache.spark.HashPartitioner@5)
Some(org.apache.spark.HashPartitioner@5)
Some(org.apache.spark.HashPartitioner@5)
5


The above result shows that, when the partition size is same, then the result RDD retains Partitioner but not summing up the partition size.

Another interesting scenario is when either one of the RDD doesn't have partitioner, then the result RDD will not have any partition but partition numbers are summed up.

 val baseRDD = sc.parallelize(Array(("u1", "site1"), ("u1", "site1"),
 ("u1", "site2"), ("u2", "site2"), ("u2", "site2"), ("u3", "site1"),
 ("u3", "site1"), ("u3", "site1"),("u4", "site1"),
 ("u1", "site2"))).partitionBy(new HashPartitioner(5))

val baseRDD2 = sc.parallelize(Array(("u1", "site1"), ("u1", "site1"), 
("u1", "site2"), ("u2", "site2"), ("u2", "site2"), ("u3", "site1"), 
("u3", "site1"), ("u3", "site1"), ("u4", "site1"), ("u1", "site2")))

println(baseRDD.partitioner)
println(baseRDD2.partitioner)
println(baseRDD.union(baseRDD2).partitioner)
println(baseRDD.union(baseRDD2).partitions.size)
Result  

Some(org.apache.spark.HashPartitioner@5)
None
None
9


Note: My default partition size is 4.


Conclusion 

             Even though union is a narrow operation, we need to take the other factors such as Partitioner, partition size  into considerations when applying union on larger dataset.This article is written against Spark API version 1.6.1. 

Comments

Popular Posts