escalabilidade de centelha: o que estou fazendo de errado?
Estou processando dados com spark e ele funciona com um dia de dados (40G), mas falha comOOM em uma semana de dados:
import pyspark
import datetime
import operator
sc = pyspark.SparkContext()
sqc = pyspark.sql.SQLContext(sc)
sc.union([sqc.parquetFile(hour.strftime('.....'))
.map(lambda row:(row.id, row.foo))
for hour in myrange(beg,end,datetime.timedelta(0,3600))]) \
.reduceByKey(operator.add).saveAsTextFile("myoutput")
O número de IDs diferentes é menor que 10k. Cada ID é pequenoint
. O trabalho falha porque muitos executores falham com o OOM. Quando o trabalho for bem-sucedido (em pequenas entradas),"myoutput"
é de cerca de 100k.
saveAsTextFile
comcollect
(porque eu realmente quero fatiar e picar em python antes de salvar), não houve diferença no comportamento, a mesma falha. isso é de se esperar?eu costumava terreduce(lambda x,y: x.union(y), [sqc.parquetFile(...)...])
ao invés desc.union
- qual é melhor? Isso faz alguma diferença?O cluster possui25 nós com825GB RAM e224 núcleos entre eles.
A invocação éspark-submit --master yarn --num-executors 50 --executor-memory 5G
.
Um único RDD possui ~ 140 colunas e abrange uma hora de dados; portanto, uma semana é uma união de 168 (= 7 * 24) RDDs.