escalabilidade de centelha: o que estou fazendo de errado?

Estou processando dados com spark e ele funciona com um dia de dados (40G), mas falha comOOM em uma semana de dados:

import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
          .map(lambda row:(row.id, row.foo))
          for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
  .reduceByKey(operator.add).saveAsTextFile("myoutput")

O número de IDs diferentes é menor que 10k. Cada ID é pequenoint. O trabalho falha porque muitos executores falham com o OOM. Quando o trabalho for bem-sucedido (em pequenas entradas),"myoutput" é de cerca de 100k.

O que estou fazendo errado?Eu tentei substituirsaveAsTextFile comcollect (porque eu realmente quero fatiar e picar em python antes de salvar), não houve diferença no comportamento, a mesma falha. isso é de se esperar?eu costumava terreduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...]) ao invés desc.union - qual é melhor? Isso faz alguma diferença?

O cluster possui25 nós com825GB RAM e224 núcleos entre eles.

A invocação éspark-submit --master yarn --num-executors 50 --executor-memory 5G.

Um único RDD possui ~ 140 colunas e abrange uma hora de dados; portanto, uma semana é uma união de 168 (= 7 * 24) RDDs.

questionAnswers(2)

yourAnswerToTheQuestion