Lendo arquivo como registro único no hadoop

Eu tenho enorme não. de arquivos pequenos, eu quero usar CombineFileInputFormat para mesclar os arquivos de modo que cada arquivo de dados venha como um único registro no meu trabalho de RM. Eu seguihttp://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html e tentou convertê-lo na nova API

Eu estou enfrentando 2 problemas:

a) Estou apenas testando com 2 arquivos pequenos, ainda 2 mappers são disparados. Eu esperava 1

b) Cada linha vem como registro único, eu quero o arquivo inteiro como registro único.

Pode ser doloroso, mas por favor, olhe no código abaixo. Eu ainda sou ingênuo no hadoop

A classe do motorista

public class MRDriver  extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    fs.printStatistics();
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    job.setMapperClass(EnronMailReadMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(0);
    job.setJarByClass(EnronMailReadMapper.class);
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;  
}

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);
    System.exit(exitCode);
}

}

A classe abaixo é principalmente copiar colar de LineRecordReader com modificação para initialize () & nextKeyValue () function

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      fileIn.seek(start);
      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;
  }

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
        break;
      }
      totalValue.append(value.toString()+"\n");
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
        value = new Text(totalValue.toString());
      return true;
    }
  }

  @Override
  public LongWritable getCurrentKey() {
    return key;
  }

  @Override
  public Text getCurrentValue() {
    return value;
  }

  /**
   * Get the progress within the split
   */
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));
    }
  }

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
        in.close();
      }
    } finally {

    }
  }

}

Outros arquivos

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{

@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);
}

}

E

public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< LongWritable, Text > rr;

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
    this.split = split;
    this.context = context;
    this.index = index;
    this.rr = new SingleFileRecordReader();
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    this.split = (CombineFileSplit) split;
      this.context = context;

      if (null == rr) {
       rr = new SingleFileRecordReader();
      }

      FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
                                          this.split.getOffset(index), 
                                          this.split.getLength(index), 
                                          this.split.getLocations());
      this.rr.initialize(fileSplit, this.context);

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.nextKeyValue();
}

@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentKey();
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentValue();
}

@Override
public float getProgress() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getProgress();
}

@Override
public void close() throws IOException {
    if (rr != null) {
           rr.close();
           rr = null;
    }       
}   

}

questionAnswers(1)

yourAnswerToTheQuestion