Spark SQL kann das Schreiben von Parkettdaten mit einer großen Anzahl von Shards nicht abschließen.

Ich versuche, Apache Spark SQL zu verwenden, um JSON-Protokolldaten in S3 auch in Parquet-Dateien in S3 zu speichern. Mein Code ist im Grunde:

import org.apache.spark._
val sqlContext = sql.SQLContext(sc)
val data = sqlContext.jsonFile("s3n://...", 10e-6)
data.saveAsParquetFile("s3n://...")

Dieser Code funktioniert, wenn ich bis zu 2000 Partitionen habe und bei 5000 oder mehr fehlschlägt, unabhängig vom Datenvolumen. Normalerweise könnte man die Partitionen einfach zu einer akzeptablen Zahl zusammenfassen, aber dies ist ein sehr großer Datensatz, und bei 2000 Partitionen stoße ich auf das in diesem @ beschriebene ProbleFrag

14/10/10 00:34:32 INFO scheduler.DAGScheduler: Stage 1 (runJob at ParquetTableOperations.scala:318) finished in 759.274 s
14/10/10 00:34:32 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
14/10/10 00:34:32 INFO spark.SparkContext: Job finished: runJob at ParquetTableOperations.scala:318, took 759.469302077 s
14/10/10 00:34:34 WARN hadoop.ParquetOutputCommitter: could not write summary file for ...
java.io.IOException: Could not read footer: java.lang.NullPointerException
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:190)
        at parquet.hadoop.ParquetFileReader.readAllFootersInParallel(ParquetFileReader.java:203)
        at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:49)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.saveAsHadoopFile(ParquetTableOperations.scala:319)
        at org.apache.spark.sql.parquet.InsertIntoParquetTable.execute(ParquetTableOperations.scala:246)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:409)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:409)
        at org.apache.spark.sql.SchemaRDDLike$class.saveAsParquetFile(SchemaRDDLike.scala:77)
        at org.apache.spark.sql.SchemaRDD.saveAsParquetFile(SchemaRDD.scala:103)
        at $line37.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:39)
        at $line37.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:44)
        at $line37.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:46)
        at $line37.$read$iwC$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:48)
        at $line37.$read$iwC$iwC$iwC$iwC$iwC$iwC.<init>(<console>:50)
        at $line37.$read$iwC$iwC$iwC$iwC$iwC.<init>(<console>:52)
        at $line37.$read$iwC$iwC$iwC$iwC.<init>(<console>:54)
        at $line37.$read$iwC$iwC$iwC.<init>(<console>:56)
        at $line37.$read$iwC$iwC.<init>(<console>:58)
        at $line37.$read$iwC.<init>(<console>:60)
        at $line37.$read.<init>(<console>:62)
        at $line37.$read$.<init>(<console>:66)
        at $line37.$read$.<clinit>(<console>)
        at $line37.$eval$.<init>(<console>:7)
        at $line37.$eval$.<clinit>(<console>)
        at $line37.$eval.$print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
        at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
        at org.apache.spark.repl.SparkILoop$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
        at org.apache.spark.repl.SparkILoop$anonfun$process$1.apply(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop$anonfun$process$1.apply(SparkILoop.scala:902)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.NullPointerException
        at org.apache.hadoop.fs.s3native.NativeS3FileSystem$NativeS3FsInputStream.close(NativeS3FileSystem.java:106)
        at java.io.BufferedInputStream.close(BufferedInputStream.java:472)
        at java.io.FilterInputStream.close(FilterInputStream.java:181)
        at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:298)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:180)
        at parquet.hadoop.ParquetFileReader$2.call(ParquetFileReader.java:176)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

Ich starte dies mit spark-1.1.0 auf einem R3.xlarge in ec2. Ich benutze die Spark-Shell-Konsole, um den obigen Code auszuführen. Ich bin in der Lage, nicht triviale Abfragen auf dem @ durchzuführedata SchemaRDD-Objekt anschließend, sodass es kein Ressourcenproblem zu sein scheint. Es ist auch möglich, die resultierende Parkettdatei zu lesen und abzufragen. Aufgrund des Fehlens von Zusammenfassungsdateien dauert dies nur sehr lange.

Antworten auf die Frage(1)

Ihre Antwort auf die Frage