Collect vs Map Operation

I take the liberty to say that almost all Spark developers must have used collect() operation and must be familiar with it's functionalities.When someone ask you, is collect() a transformation or an action?most likely you would say it's an action without a second thought.But after reading this article, you probably would ask the questioner to be bit more specific with his question.

I write this post with an intention of exploring an unfamiliar avatar of collect(...) operation. RDD provides a transformation variant of collect() operation which can be used like a map() in a way. My following example demonstrates just that.
  /**
    * It adds 10 with even number
    */
  val transformEvenNbr: PartialFunction[Int, Int] = {
    case value: Int if ((value % 2) == 0) => value + 10
  }

val spark = getSpark()
val baseRdd = spark.sparkContext.parallelize((1 to 10), 1)

baseRdd.collect[Int](transformEvenNbr).foreach(println(_))

Result
12
14
16
18
20

You could see that, collect(...) takes PartialFunction as an input argument and applies it on targeted element and returns a result RDD.Some of you could have sensed the reason behind passing a PartialFunction as an argument.The intention of the function is to apply transformation only on targeted records unlike map() operation. In other words, map() operation applies given function on all the records of given dataset where as collect(...) operation applies the transformation only on targeted records which satisfies the domain.

Technically collect(...) does pretty straight forward work under the hoot. It calls filter(...) and then map(...). Hence it is safe to say that this variant of collect(...) is a shorthand of filter(...) + map(...). Sharing Spark source code for your reference
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope {
 val cleanF = sc.clean(f)
 filter(cleanF.isDefinedAt).map(cleanF)
 }

Interesting isn't it? Wait, we are not done yet.some of you might think why should we apply filter(...)
before map(...) and why cannot I directly apply the PartialFunction to map(...) and get desired result ? The answer is, it is depends on your PartialFunction implementations and in most of our cases, it will lead to runtime error as follows

baseRdd.map(transformEvenNbr).foreach(println(_))


19/09/28 19:51:45 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
scala.MatchError: 1 (of class java.lang.Integer)
 at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:254)
 at scala.PartialFunction$$anon$1.apply(PartialFunction.scala:252)
 at SparkLearning.App$$anonfun$1.applyOrElse(App.scala:26)
 at SparkLearning.App$$anonfun$1.applyOrElse(App.scala:25)

This scenario gives clear hint why collect(...) was required.

Comments

  1. Good explanation Balaji. Even though I do not work on this tech stack, still I am able to understand your write up. Nice discovery about collect() as a transformation when used with PartialFunction. Such a simple and neat writeup with no haste.

    ReplyDelete

Post a Comment

Popular Posts