Чтение файла как отдельной записи в hadoop

У меня огромное нет. из небольших файлов, я хочу использовать CombineFileInputFormat для объединения файлов таким образом, чтобы данные каждого файла были представлены как одна запись в моей работе MR. Я следовалhttp://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html и попытался преобразовать его в новый API

Я сталкиваюсь с 2 проблемами:

а) Я просто тестирую его с 2 маленькими файлами, но все еще работают 2 картографа. Я ожидал 1

б) Каждая строка идет как одна запись, я хочу, чтобы весь файл как одна запись.

Это может быть больно, но, пожалуйста, посмотрите на код ниже. Я все еще наивный в Hadoop

Класс водителя}

public class MRDriver  extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    fs.printStatistics();
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    job.setMapperClass(EnronMailReadMapper.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setNumReduceTasks(0);
    job.setJarByClass(EnronMailReadMapper.class);
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;  
}

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);
    System.exit(exitCode);
}

Приведенный ниже класс в основном является копией вставки LineRecordReader с модификацией для initialize () & функция nextKeyValue ()}

public class SingleFileRecordReader extends RecordReader {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                                    Integer.MAX_VALUE);
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      fileIn.seek(start);
      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);
  }

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;
  }

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() 

Ответы на вопрос(1)

Ваш ответ на вопрос