map(), flatMap() vs mapValues(),flatMapValues()
map() and flatMap() are transformation operations and are narrow in nature (i.e) no data shuffling will take place between the partitions.They take a function as input argument which will be applied on each element basis and return a new RDD. These are one of the most widely used operations in Spark RDD API. A familiar use case is to create paired RDD from unpaired RDD. In other words, it can be used on both paired and unpaired RDDs.
map() and flatmap() can be used for ETL purpose as it can have different input type and return type.For example, string as input and Integer as return type.Meaning maps can be used to transform (transformed new RDD. Remember RDDs are read only in nature) the given dataset as required.
As mentioned, it can be applied on both paired and unpaired RDDs.If you are aware of partitioners, then you would naturally get this question “does these maps takes advantage of partitioning when applied on paired RDDs?”. The answer is no.Lets see why
val baseRDD = sc.parallelize(Seq(1 to 100)) println(baseRDD.map(f => 1).partitioner)
Result
None
Above code will print None. Meaning it is not associated with any partitioner.If you think “It is alright if my data is unpaired and what if my data is paired one ?”.Then you got it right and the fact is that in case of paired data(key-value) , having partitioner (possibly HashPartitioner) would greatly help if you are trying to do any aggregation.But why it is not implemented? the reason is the input function to the maps could alter the key and Spark will not inference the input function to guarantee the result RDD has a partitioner.
So this may make you think that map will be efficient only on unpaired RDDs and not on paired RDDs.In fact, its partially true in terms of operational perspective.So whats is the alternative for this? Well, the alternative is mapValues() and flatMapValues().
Unlike regular maps, it guarantees that it will always return the RDD with partitioner if its parent RDD has any partitioner.
val data = for { x <- 1 to 6 y <- 1 to 10 } yield (x, y) val baseRDD = sc.parallelize(data) val resultRDD = baseRDD.groupByKey().mapValues{x => x.reduce((x, y) => x + y)} println(resultRDD.partitioner)
Result
Some(org.apache.spark.HashPartitioner@2)]
The above code will return partitioner name as mapValues() applied on result of groupByKey() which has inbuilt HashPartitioner implementation. But the following code will return None as baseRDD has no partitioner.
val resultRDD = baseRDD.mapValues { x => x + 1 } println(resultRDD.partitioner)
Result
None
So mapValues() and flatMapValues() would be handy when you deal with paired RDD. you might have noticed that these operations does not let you alter the key which leads Spark to guarantee that result RDD has a partitioner if the parent RDD has any. Basically it will inherit the partitioner from parent RDDs if any.
In a scenario where you are performing reduceByKey() which is wide transformations .
val baseRDD = sc.parallelize(data).partitionBy(new HashPartitioner(2)) val resultRDD = baseRDD.mapValues { x => x + 1 }.reduceByKey((accum, current) => current + accum).foreach(println)
The above code would significantly reduce data shuffling and improve performance as the RDD is already partitioned.So its better to use mapValues() and flatMapValues() on paired RDDs than map() and flatMap().
good post
ReplyDeleteThank you Sesha for your kind words.
Delete