Lesen Sie ganze Textdateien aus einer Komprimierung in Spark

Ich habe das folgende Problem: Angenommen, ich habe ein Verzeichnis mit komprimierten Verzeichnissen, die mehrere auf HDFS gespeicherte Dateien enthalten. Ich möchte eine RDD erstellen, die aus einigen Objekten des Typs T besteht, d.

context = new JavaSparkContext(conf);
JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<T> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String content = fileNameContent._2();

    // Class T has a constructor of taking the filename and the content of each
    // processed file (as two strings)
    T t = new T(content, fileName);

    return t;
});

Jetzt, wenninputDataPath ist ein Verzeichnis, das Dateien enthält, die einwandfrei funktionieren, d.

String inputDataPath =  "hdfs://some_path/*/*/"; // because it contains subfolders

Aber wenn es eine tgz gibt, die mehrere Dateien enthält, wird der Dateiinhalt fileNameContent._2()) holt mir eine nutzlose Binärzeichenfolge (ziemlich erwartet). Ich habe einen ... gefunden ähnliche Frage zu SO, aber es ist nicht der gleiche Fall, da die Lösung darin besteht, dass jede Komprimierung nur aus einer Datei besteht, und in meinem Fall gibt es viele andere Dateien, die ich einzeln als ganze Dateien lesen möchte. Ich habe auch ein @ gefundFrag ÜberwholeTextFiles, aber das funktioniert in meinem Fall nicht.

Irgendwelche Ideen, wie das geht?

BEARBEITEN

Ich habe es mit dem Leser von @ versucHie (versucht den Reader von @ zu testHie, wie in der FunktiontestTarballWithFolders()), aber wann immer ich @ anru

TarballReader tarballReader = new TarballReader(fileName);

und ich bekommeNullPointerException:

java.lang.NullPointerException
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:83)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:77)
    at java.util.zip.GZIPInputStream.<init>(GZIPInputStream.java:91)
    at utils.TarballReader.<init>(TarballReader.java:61)
    at main.SparkMain.lambda$0(SparkMain.java:105)
    at main.SparkMain$Lambda$18/1667100242.call(Unknown Source)
    at org.apache.spark.api.java.JavaPairRDD$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$anonfun$collect$1$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$anonfun$collect$1$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Die Linie 105 inMainSpark ist das, was ich oben in meiner Bearbeitung des Beitrags gezeigt habe, und Zeile 61 vonTarballReader ist

GZIPInputStream gzip = new GZIPInputStream(in);

was gibt einen Nullwert für den Eingabestreamin in der oberen Zeile:

InputStream in = this.getClass().getResourceAsStream(tarball);

Bin ich hier auf dem richtigen Weg? Wenn ja, wie gehe ich weiter vor? Warum erhalte ich diesen Nullwert und wie kann ich ihn beheben?

Antworten auf die Frage(4)

Ihre Antwort auf die Frage