RDD se divide y agrega en nuevos RDD
Tengo un RDD de(String,String,Int)
.
He hecho el siguiente código. El problema es que el número de elementos en el paso 2 es muy grande para una sola clave yreduceByKey(x++y)
toma mucho tiempo.
//Input
val data = Array(
("c1","a1",1), ("c1","b1",1), ("c2","a1",1),("c1","a2",1), ("c1","b2",1),
("c2","a2",1), ("c1","a1",1), ("c1","b1",1), ("c2","a1",1))
val rdd = sc.parallelize(data)
val r1 = rdd.map(x => ((x._1, x._2), (x._3)))
val r2 = r1.reduceByKey((x, y) => x + y ).map(x => ((x._1._1), (x._1._2, x._2)))
// This is taking long time.
val r3 = r2.mapValues(x => ArrayBuffer(x)).reduceByKey((x, y) => x ++ y)
// from the list I will be doing grouping.
val r4 = r3.map(x => (x._1 , x._2.toList.sorted.grouped(2).toList))
El problema es que "c1" tiene muchas entradas únicas como b1, b2 .... millones yreduceByKey
está matando el tiempo porque todos los valores van a un solo nodo. ¿Hay alguna manera de lograr esto de manera más eficiente?
// output
Array((c1,List(List((a1,2), (a2,1)), List((b1,2), (b2,1)))), (c2,List(List((a1,2), (a2,1)))))