mapPartitions()

I think it is safe to say that map is one of the most popular operation on RDD and in this article, we will cover mapPartitions() variant of map() operation in detail. mapPartitions takes a higher order function as an argument and applies the function once per partition and returns a new rdd. mapPartitions is a narrow operation and let's start the article with an example for better understanding
val baseRdd = spark.sparkContext.parallelize(Seq(("1", 1), ("1", 2), 
("1", 2), ("3", 3), ("3", 4), ("1", 5), ("7", 7))) 

val mapPartFun: (Iterator[(String, Int)]) => Iterator[String] = (values)=>{
println(s"invoked for ${values.toString()}")
values.map(_._1)
}
val resultRDD = baseRdd.mapPartitions(mapPartFun)
resultRDD.foreach(println(_))
Result
invoked for empty iterator
invoked for non-empty iterator
invoked for non-empty iterator
 3
 1
 3
 1
 1
 1
 7
DAG
Let me elaborate the example a bit. I'm running this example on a single node cluster with 3 cores. mapPartFun function takes Iterator of (K,V) as an input and returns K as an output for the sake of simplicity of the article. In mapPartFun, you could find print statement that gets triggered whenever mapPartFun is invoked. When you look at the result, mapPartFun is invoked just 3 times for the RDD of 7 elements. Conversely map operation would have invoked mapPartFun 7 times. In other words, map is element-wise operation whereas mapPartitions is partition-wise operation yet the similarity is both are narrow operations. 

Let's discuss about method signature
public <U> RDD<U> mapPartitions(scala.Function1<scala.collection.Iterator<T>,
scala.collection.Iterator<U>> f,boolean preservesPartitioning,
scala.reflect.ClassTag<U> evidence$6)
It takes an unary function and applies the function once per partition and returns transformed records.Next parameter is preservesPartitioning which specifies whether the resultant RDD has to retain the parent RDD's partition (if any) or not. This parameter should be false unless the input RDD is pair RDD and the function dose't modify the keys. Let's see an example with and without preserve  partition argument.
val baseRdd = spark.sparkContext.parallelize(Seq(("1", 1), ("1", 2),
("1", 2), ("3", 3), ("3", 4), ("1", 5), 
("7", 7))).partitionBy(new HashPartitioner(2))

//preservesPartitioning as false
val resultRdd1 = baseRdd.mapPartitions(values => values.map(_._1), false)
// without preservesPartitioning
val resultRdd2 = baseRdd.mapPartitions(values => values.map(_._1))
//preservesPartitioning as true
val resultRdd3 = baseRdd.mapPartitions(values => values.map(_._1), true)

println(resultRdd1.partitioner)
println(resultRdd2.partitioner)
println(resultRdd3.partitioner)
Result
None
None
Some(org.apache.spark.HashPartitioner@2)
Above example demonstrated the significance of preservesPartitioning argument and it proves the following points.

  • When parent rdd has partitioner and preservesPartitioning is set to false then the resultant rdd will not have any partitioner. 
  • When parent rdd has partitioner and preservesPartitioning is not set, spark by default sets it as false hence the resultant rdd will not have any partitioner. 
  • When parent rdd has partitioner and preservesPartitioning is set to true then the resultant rdd will retain the partitioner of the parent. 
We know that map() operation dose't retain any partitioner even if the input rdds did. Conversely mapPartitions() operation provides options to preserve partitioner if any. Since it is partition wise operation, it is tend to be slightly faster than regular map() operation and the difference can be seen with huge datasets.    

Comments

Popular Posts