DEV Community

AsifYar
AsifYar

Posted on

Divide RDD into sub parts

Problem Definition

Divide the Spark RDD into N(as defined by user) sub parts and perform same function on these sub parts with different data set on each sub part

Explanation

I am having RDD. I want to divide that RDD into sub parts let call him partitions. Now on each partition i want to perform same task for multiple iteration.
Here is code that explains well

case class shape ( dim:Int) {
  val random = new Random()
  var X      : Array[Double]       =   Array.fill(dim)(random.nextDouble() * (100-10)+11 )
  var Y      : Array[Double]       =   Array.fill(dim)( math.random)
  var Loss       : Double              =   math.random
  var NewLoss       : Double              =   math.random 
}


val N = 1000 //in real N will be in millions
val d = 100 //in real N will be in millions
val nP = 4  // nP defines number of sub parts into which RDD is divided
val iterations = 1000 // in real analysis iteration will be in millions or billions  
val list = List.fill(N)(new shape(d))

 list.map { x =>
  x.Loss = function. SphereFunc(x.X) // Update Loss of each element
}

val rdd = sc.parallelize(batList, nP)
var partitioned = rdd.persist()

for(iter <- 1 to iterations) {

 partitioned = partitioned.mapPartitionsWithIndex { (k, iterator  )  =>
 val li = iterator.toList
 val localBest = li.minBy(_.Loss).X
 li.map { j =>
        j.Y = ((j.X, localBest).zipped.map(_ - _).map(_ * math.random), j.Y).zipped.map(_ + _)
        j.X = (j.X, j.Y).zipped.map(_ + _)

       }
 li.filter(math.random > _.Loss)
        .map { j =>
          j.X = localBest.map(_ + math.random)            
       }
 li.map{j => j.NewLoss = SphereFunc(j.X)

 li.filter(j => math.random < j.NewLoss && j.NewLoss < j.Loss).map { j =>
        j.Loss = j.NewLoss  
       }
  li.iterator
}

def SphereFunc(list: List[Double]): Double = {
list.foldLeft(0.0)((x, xs) => x + xs * xs)
}

In this code I created RDD and for each iteration I call mapPartitionsWithIndex to get each partition data and updated elements of each partition.

Problem with this code is that for each iteration, when I call mapPartitionsWithIndex it will perform required operation on each element of partitions and it will again create RDD. So partitions will not remain same. For each iteration RDD is divided again to partitions. Also will cause lot of shuffeling.

But I want to create partitions from RDD at start and than perform desired operation on partitions till user defined iteration completes.
Here i am calling mapPartitionsWithIndex iterations times. Is this possible to call mapPartitionsWithIndex only one time and than iterate over partitions till iterations

How can I achieve that? I have to run experiments on cluster with more than 100 cores.

Top comments (0)