filter()

filter() is fairly a simple operation to understand and use.It is a transformation operation which returns a new RDD containing only the elements that satisfy the input predicate function. filter() is a narrow and element-wise operation which is being applied at every available partition of the RDD and It is equivalent to where clause in SQL. Let's start with an example to explore some other features of the operation.
val baseRdd = spark.sparkContext.parallelize(Seq(("1", 1), ("1", 2),
 ("1", 2), ("3", 3), ("3", 4), ("1", 5), ("7", 7)))
    
//filter function 
val filterFun: ((String, Int)) => Boolean = value => {value == ("1", 1)}

val resultRdd = baseRdd.filter(filterFun)
resultRdd.foreach(println(_))
Result
(1,1)
DAG
Syntax
public RDD filter(scala.Function1 f)
As far as syntax is concerned, filter() operation takes an unary predicate function and applies the function on every element across the partitions.Neither it takes any other implicit arguments nor having any arguments with default values.Since it is not taking any ordering function, the resultant RDD is not guaranteed to be an order-sensitive.   

filter() converts the parent/current RDD into MapPartitionsRDD which intern applies the provided function to every partition of the parent RDD.It is important to note that, MapPartitionsRDD has option to preserve the partition.Thus the resultant RDD of filter() operation would retain the partition if the parent did.Following example is to demonstrate just that.
//baseRdd is now created with HashPartitioner
val resultRdd = baseRdd.filter(filterFun)
println(resultRdd.partitioner)
resultRdd.foreach(println(_))
Result
Some(org.apache.spark.HashPartitioner@2)
(1,1)




Comments

Popular Posts