Unirse a un marco de datos de chispa grande y descomunal

Tengo dos marcos de datos, df1 tiene 6 millones de filas, df2 tiene mil millones.

He probado el estándardf1.join(df2,df1("id")<=>df2("id2")), pero se queda sin memoria.

df1 es demasiado grande para colocarlo en una unión de difusión.

Incluso probé un filtro de floración, pero también era demasiado grande para caber en una transmisión y aún así ser útil.

Lo único que he intentado que no falla es dividir df1 en 300,000 fragmentos de fila y unirme con df2 en un bucle foreach. Pero esto toma un orden de magnitud más largo de lo que probablemente debería (probablemente porque es demasiado grande para caber como una persistencia, lo que hace que rehaga la división hasta ese punto). Volver a combinar los resultados también lleva un tiempo.

¿Cómo has resuelto este problema?

Algunas notas

df1 es un subconjunto de df2.df1=df2.where("fin<1").selectExpr("id as id2").distinct() Estoy interesado en todas las filas en df2 que tienen una identificación que a la vez tiene una aleta <1, lo que significa que no puedo hacerlo como un paso.

Hay alrededor de 200 millones de identificadores únicos en df2.

Aquí hay algunos ajustes de chispa relevantes:

spark.cores.max=1000
spark.executor.memory=15G
spark.akka.frameSize=1024
spark.shuffle.consolidateFiles=false
spark.task.cpus=1
spark.driver.cores=1
spark.executor.cores=1
spark.memory.fraction=0.5
spark.memory.storageFraction=0.3
spark.sql.shuffle.partitions=10000
spark.default.parallelism=10000

El error que obtengo es:

16/03/11 04:36:07 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(11,1,ResultTask,FetchFailed(BlockManagerId(68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199, mapr, 46487),3,176,4750,org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /tmp/mesos/work/slaves/68dcb91c-1b45-437d-ac47-8e8c1e4bc386-S199/frameworks/c754216b-bf80-4d84-97f1-2e907030365e-2545/executors/16/runs/5a5a01c5-205e-4380-94d3-7fa0f6421b85/blockmgr-ea345692-05bb-4f42-9ba1-7b93311fb9d4/0e/shuffle_3_340_0.index (No such file or directory)

y

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 6.3 failed 4 times, most recent failure: Lost task 465.3 in stage 6.3 (TID 114448, mapr): java.lang.OutOfMemoryError: Direct buffer memory

Respuestas a la pregunta(2)

Su respuesta a la pregunta