Here is an example of the difference, as a spark-shell
session:
First, some data – two lines of text:
val rdd = sc.parallelize(Seq("Roses are red", "Violets are blue")) // lines rdd.collect res0: Array[String] = Array("Roses are red", "Violets are blue")
Now, map
transforms an RDD of length N into another RDD of length N.
For example, it maps from two lines into two line-lengths:
rdd.map(_.length).collect res1: Array[Int] = Array(13, 16)
But flatMap
(loosely speaking) transforms an RDD of length N into a collection of N collections, then flattens these into a single RDD of results.
rdd.flatMap(_.split(" ")).collect res2: Array[String] = Array("Roses", "are", "red", "Violets", "are", "blue")
We have multiple words per line, and multiple lines, but we end up with a single output array of words
Just to illustrate that, flatMapping from a collection of lines to a collection of words looks like:
["aa bb cc", "", "dd"] => [["aa","bb","cc"],[],["dd"]] => ["aa","bb","cc","dd"]
The input and output RDDs will therefore typically be of different sizes for flatMap
.
If we had tried to use map
with our split
function, we’d have ended up with nested structures (an RDD of arrays of words, with type RDD[Array[String]]
) because we have to have exactly one result per input:
rdd.map(_.split(" ")).collect res3: Array[Array[String]] = Array( Array(Roses, are, red), Array(Violets, are, blue) )
Finally, one useful special case is mapping with a function which might not return an answer, and so returns an Option
. We can use flatMap
to filter out the elements that return None
and extract the values from those that return a Some
:
val rdd = sc.parallelize(Seq(1,2,3,4)) def myfn(x: Int): Option[Int] = if (x <= 2) Some(x * 10) else None rdd.flatMap(myfn).collect res3: Array[Int] = Array(10,20)
(noting here that an Option behaves rather like a list that has either one element, or zero elements)