RDD se divide y agrega en nuevos RDD

Tengo un RDD de(String,String,Int).

Quiero reducirlo en base a las dos primeras cadenasY luego, en función de la primera cadena, quiero agrupar el (cadena, int) y ordenarlosDespués de ordenar, necesito agruparlos en pequeños grupos, cada uno con n elementos.

He hecho el siguiente código. El problema es que el número de elementos en el paso 2 es muy grande para una sola clave yreduceByKey(x++y) toma mucho tiempo.

//Input
val data = Array(
  ("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1), 
  ("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))

val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))

// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y) 

// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList)) 

El problema es que "c1" tiene muchas entradas únicas como b1, b2 .... millones yreduceByKey está matando el tiempo porque todos los valores van a un solo nodo. ¿Hay alguna manera de lograr esto de manera más eficiente?

// output
 Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))

Respuestas a la pregunta(1)

Su respuesta a la pregunta