Cómo forzar la evaluación de DataFrame en Spark

A veces (por ejemplo, para pruebas y bechmarking) quiero forzar la ejecución de las transformaciones definidas en un DataFrame. AFAIK llamando a una acción comocount no asegura que todoColumns en realidad se calculanshow solo puede calcular un subconjunto de todosRows (ver ejemplos a continuación)

Mi solución es escribir elDataFrame a HDFS usandodf.write.saveAsTable, pero esto "desordena" mi sistema con tablas que no quiero mantener más.

Entonces, ¿cuál es la mejor manera de activar la evaluación de unDataFrame?

Editar:

Tenga en cuenta que también hay una discusión reciente sobre la lista de desarrolladores de spark:http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html

Hice un pequeño ejemplo que muestra quecount enDataFrame no evalúa todo (probado con Spark 1.6.3 y spark-master =local[2]):

val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeEx,ception;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception

Usando la misma lógica, aquí un ejemplo queshow no evalúa todas las filas:

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception

Edición 2: Para Eliasah: The Exception dice esto:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$1.apply$mcII$sp(<console>:68)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$1.apply(<console>:68)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$anonfun$1.apply(<console>:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$anonfun no asegura que todo$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$anonfun no asegura que todo$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
.
.
.
.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute no asegura que todo1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute no asegura que todo1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1376)
    at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:74)
.
.
.
.

Respuestas a la pregunta(4)

Su respuesta a la pregunta