escalabilidad de chispa: ¿qué estoy haciendo mal?

Estoy procesando datos con chispa y funciona con un día de datos (40G) pero falla conOOM en una semana de datos:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")

El número de ID diferentes es inferior a 10k. Cada identificación es pequeñaint. El trabajo falla porque demasiados ejecutores fallan con OOM. Cuando el trabajo tiene éxito (en entradas pequeñas),"myoutput" es de unos 100k.

¿Qué estoy haciendo mal?Traté de reemplazarsaveAsTextFile concollect (porque realmente quiero hacer algunos cortes y cortes en python antes de guardar), no hubo diferencias en el comportamiento, el mismo error. ¿Es esto de esperar?solía tenerreduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...]) en lugar desc.union - ¿cual es mejor? ¿Hace alguna diferencia?

El clúster tiene25 nodos con825GB RAM y224 núcleos entre ellos.

La invocación esspark-submit --master yarn --num-executors 50 --executor-memory 5G.

Un único RDD tiene ~ 140 columnas y cubre una hora de datos, por lo que una semana es una unión de 168 (= 7 * 24) RDD.

Respuestas a la pregunta(2)

Su respuesta a la pregunta