Odczyt pliku jako pojedynczego rekordu w hadoop

Mam ogromne nie. małych plików, chcę użyć CombineFileInputFormat, aby scalić pliki tak, że każdy plik danych jest w jednym rekordzie w moim zadaniu MR. Poszedłemhttp://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html i próbowałem przekonwertować go na nowy api

Mam 2 problemy:

a) Po prostu testuję go z 2 małymi plikami, nadal działają 2 programy odwzorowujące. Spodziewałem się 1

b) Każda linia przychodzi jako pojedynczy rekord, chcę, aby cały plik był pojedynczym rekordem.

Może to być bolesne, ale spójrz na poniższy kod. Nadal jestem naiwny w hadoopie

Klasa kierowcy

public class MRDriver  extends Configured implements Tool {

public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;  

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);


Poniższa klasa to głównie wklejanie kopii LineRecordReader z modyfikacją funkcji initialize () i nextKeyValue ()

public class SingleFileRecordReader extends RecordReader<LongWritable, Text> {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    this.pos = start;

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    if (value == null) {
      value = new Text();
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      if (newSize == 0) {
      pos += newSize;
      if (newSize < maxLineLength) {

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
        value = new Text(totalValue.toString());
      return true;

  public LongWritable getCurrentKey() {
    return key;

  public Text getCurrentValue() {
    return value;

   * Get the progress within the split
  public float getProgress() throws IOException {
    if (start == end) {
      return 0.0f;
    } else {
      return Math.min(1.0f,
        (getFilePosition() - start) / (float)(end - start));

  public synchronized void close() throws IOException {
    try {
      if (in != null) {
    } finally {



Inne pliki

public class RawCombineFileInputFormat extends CombineFileInputFormat <LongWritable,Text>{

public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, TaskAttemptContext context) throws IOException {
    return new CombineFileRecordReader< LongWritable, Text >((CombineFileSplit) split, context, MultiFileRecordReader.class);



public class MultiFileRecordReader extends RecordReader < LongWritable, Text > {

 private CombineFileSplit split;
 private TaskAttemptContext context;
 private int index;
 private RecordReader< LongWritable, Text > rr;

public MultiFileRecordReader(CombineFileSplit split, TaskAttemptContext context, Integer index) {
    this.split = split;
    this.context = context;
    this.index = index;
    this.rr = new SingleFileRecordReader();
public void initialize(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException {
    this.split = (CombineFileSplit) split;
      this.context = context;

      if (null == rr) {
       rr = new SingleFileRecordReader();

      FileSplit fileSplit = new FileSplit(this.split.getPath(index), 
      this.rr.initialize(fileSplit, this.context);


public boolean nextKeyValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.nextKeyValue();

public LongWritable getCurrentKey() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentKey();

public Text getCurrentValue() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getCurrentValue();

public float getProgress() throws IOException, InterruptedException {
    // TODO Auto-generated method stub
    return this.rr.getProgress();

public void close() throws IOException {
    if (rr != null) {
           rr = null;


