Чтение файла как отдельной записи в hadoop

У меня огромное нет. из небольших файлов, я хочу использовать CombineFileInputFormat для объединения файлов таким образом, чтобы данные каждого файла были представлены как одна запись в моей работе MR. Я следовалhttp://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html и попытался преобразовать его в новый API

Я сталкиваюсь с 2 проблемами:

а) Я просто тестирую его с 2 маленькими файлами, но все еще работают 2 картографа. Я ожидал 1

б) Каждая строка идет как одна запись, я хочу, чтобы весь файл как одна запись.

Это может быть больно, но, пожалуйста, посмотрите на код ниже. Я все еще наивный в Hadoop

Класс водителя}

public class MRDriver  extends Configured implements Tool {

public int run(String[] args) throws Exception {
    FileSystem fs = new Path(".").getFileSystem(getConf());
    Job job = new Job(getConf());
    job.setJobName("Enron MR");
    RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 :1;  

public static void main(String[] args) throws Exception  {
    int exitCode = ToolRunner.run(new MRDriver(), args);

Приведенный ниже класс в основном является копией вставки LineRecordReader с модификацией для initialize () & функция nextKeyValue ()}

public class SingleFileRecordReader extends RecordReader {
  private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);

  private long start;
  private long pos;
  private long end;
  private LineReader in;
  private int maxLineLength;
  private LongWritable key = null;
  private Text value = null;

  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    FileSplit split = (FileSplit) genericSplit;
    Configuration job = context.getConfiguration();
    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
    start = split.getStart();
    end = start + split.getLength();
    final Path file = split.getPath();

    // open the file and seek to the start of the split
    FileSystem fs = file.getFileSystem(job);
    FSDataInputStream fileIn = fs.open(split.getPath());

      in = new LineReader(fileIn, job);
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    this.pos = start;

  private int maxBytesToConsume(long pos) {
    return (int) Math.min(Integer.MAX_VALUE, end - pos);

  private long getFilePosition() throws IOException {
    long retVal= pos;
    return retVal;

  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    if (value == null) {
      value = new Text();
    int newSize = 0;
    StringBuffer totalValue = new StringBuffer();
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition()