escalabilidad de chispa: ¿qué estoy haciendo mal?
Estoy procesando datos con chispa y funciona con un día de datos (40G) pero falla conOOM en una semana de datos:
import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
.map(lambda row:(row.id, row.foo))
for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
.reduceByKey(operator.add).saveAsTextFile("myoutput")
El número de ID diferentes es inferior a 10k. Cada identificación es pequeñaint
. El trabajo falla porque demasiados ejecutores fallan con OOM. Cuando el trabajo tiene éxito (en entradas pequeñas),"myoutput"
es de unos 100k.
saveAsTextFile
concollect
(porque realmente quiero hacer algunos cortes y cortes en python antes de guardar), no hubo diferencias en el comportamiento, el mismo error. ¿Es esto de esperar?solía tenerreduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])
en lugar desc.union
- ¿cual es mejor? ¿Hace alguna diferencia?El clúster tiene25 nodos con825GB RAM y224 núcleos entre ellos.
La invocación esspark-submit --master yarn --num-executors 50 --executor-memory 5G
.
Un único RDD tiene ~ 140 columnas y cubre una hora de datos, por lo que una semana es una unión de 168 (= 7 * 24) RDD.