Lesen Sie ganze Textdateien aus einer Komprimierung in Spark
Ich habe das folgende Problem: Angenommen, ich habe ein Verzeichnis mit komprimierten Verzeichnissen, die mehrere auf HDFS gespeicherte Dateien enthalten. Ich möchte eine RDD erstellen, die aus einigen Objekten des Typs T besteht, d.
context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
// The name of the file
String fileName = fileNameContent._1();
// The content of the file
String content = fileNameContent._2();
// Class T has a constructor of taking the filename and the content of each
// processed file (as two strings)
T t = new T(content, fileName);
return t;
});
Jetzt, wenninputDataPath
ist ein Verzeichnis, das Dateien enthält, die einwandfrei funktionieren, d.
String inputDataPath = "hdfs://some_path/*/*/"; // because it contains subfolders
Aber wenn es eine tgz gibt, die mehrere Dateien enthält, wird der Dateiinhalt fileNameContent._2()
) holt mir eine nutzlose Binärzeichenfolge (ziemlich erwartet). Ich habe einen ... gefunden ähnliche Frage zu SO, aber es ist nicht der gleiche Fall, da die Lösung darin besteht, dass jede Komprimierung nur aus einer Datei besteht, und in meinem Fall gibt es viele andere Dateien, die ich einzeln als ganze Dateien lesen möchte. Ich habe auch ein @ gefundFrag ÜberwholeTextFiles
, aber das funktioniert in meinem Fall nicht.
Irgendwelche Ideen, wie das geht?
BEARBEITEN
Ich habe es mit dem Leser von @ versucHie (versucht den Reader von @ zu testHie, wie in der FunktiontestTarballWithFolders()
), aber wann immer ich @ anru
TarballReader tarballReader = new TarballReader(fileName);
und ich bekommeNullPointerException
:
java.lang.NullPointerException
at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
at utils.TarballReader.<init>(TarballReader.java:61)
at main.SparkMain.lambda$0(SparkMain.java:105)
at main.SparkMain$Lambda$18/1667100242.call(Unknown Source)
at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$anonfun$collect$1$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$anonfun$collect$1$anonfun$12.apply(RDD.scala:927)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Die Linie 105 inMainSpark
ist das, was ich oben in meiner Bearbeitung des Beitrags gezeigt habe, und Zeile 61 vonTarballReader
ist
GZIPInputStream gzip = new GZIPInputStream(in);
was gibt einen Nullwert für den Eingabestreamin
in der oberen Zeile:
InputStream in = this.getClass().getResourceAsStream(tarball);
Bin ich hier auf dem richtigen Weg? Wenn ja, wie gehe ich weiter vor? Warum erhalte ich diesen Nullwert und wie kann ich ihn beheben?