Datei wird als einzelner Datensatz in Hadoop gelesen

Ich habe riesig nein. Bei kleinen Dateien möchte ich CombineFileInputFormat verwenden, um die Dateien so zusammenzuführen, dass alle Dateidaten in meinem MR-Auftrag als ein einzelner Datensatz vorliegen. Ich bin gefolgthttp://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html und versuchte, es in die neue API zu konvertieren

Ich habe zwei Probleme:

a) Ich teste es nur mit 2 kleinen Dateien, trotzdem werden 2 Mapper abgefeuert. Ich habe mit 1 gerechnet

b) Jede Zeile kommt als Einzeldatensatz, ich möchte die gesamte Datei als Einzeldatensatz.

Es kann schmerzhaft sein, aber schauen Sie bitte in den Code unten. Ich bin immer noch naiv in Hadoop

Die Fahrerklasse

public class MRDriver  extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    fs.printStatistics();
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    job.setMapperClass(EnronMailReadMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(0);
    job.setJarByClass(EnronMailReadMapper.class);
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;  
}

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);
    System.exit(exitCode);
}

}

Die folgende Klasse besteht hauptsächlich aus dem Kopieren und Einfügen von LineRecordReader mit der Änderung der Funktion initialize () & nextKeyValue ()

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      fileIn.seek(start);
      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;
  }

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      totalValue.append(value.toString()+"\n");
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
        value = new Text(totalValue.toString());
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));
    }
  }

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
        in.close();
      }
    } finally {

    }
  }

}

Andere Dateien

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{

@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);
}

}

Und

public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< LongWritable, Text > rr;

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
    this.split = split;
    this.context = context;
    this.index = index;
    this.rr = new SingleFileRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    this.split = (CombineFileSplit) split;
      this.context = context;

      if (null == rr) {
       rr = new SingleFileRecordReader();
      }

      FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                          this.split.getOffset(index), 
                                          this.split.getLength(index), 
                                          this.split.getLocations());
      this.rr.initialize(fileSplit, this.context);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.nextKeyValue();
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentKey();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentValue();
}

@Override
public float getProgress() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getProgress();
}

@Override
public void close() throws IOException {
    if (rr != null) {
           rr.close();
           rr = null;
    }       
}   

}

Antworten auf die Frage(1)

Ihre Antwort auf die Frage