Чтение файла как отдельной записи в hadoop
У меня огромное нет. из небольших файлов, я хочу использовать CombineFileInputFormat для объединения файлов таким образом, чтобы данные каждого файла были представлены как одна запись в моей работе MR. Я следовалhttp://yaseminavcular.blogspot.in/2011/03/many-small-input-files.html и попытался преобразовать его в новый API
Я сталкиваюсь с 2 проблемами:
а) Я просто тестирую его с 2 маленькими файлами, но все еще работают 2 картографа. Я ожидал 1
б) Каждая строка идет как одна запись, я хочу, чтобы весь файл как одна запись.
Это может быть больно, но, пожалуйста, посмотрите на код ниже. Я все еще наивный в Hadoop
Класс водителя}
public class MRDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
FileSystem fs = new Path(".").getFileSystem(getConf());
fs.printStatistics();
Job job = new Job(getConf());
job.setJobName("Enron MR");
job.setMapperClass(EnronMailReadMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(0);
job.setJarByClass(EnronMailReadMapper.class);
RawCombineFileInputFormat.addInputPath(job, new Path(args[0]));
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 :1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MRDriver(), args);
System.exit(exitCode);
}
Приведенный ниже класс в основном является копией вставки LineRecordReader с модификацией для initialize () & функция nextKeyValue ()}
public class SingleFileRecordReader extends RecordReader {
private static final Log LOG = LogFactory.getLog(SingleFileRecordReader.class);
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
fileIn.seek(start);
in = new LineReader(fileIn, job);
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private int maxBytesToConsume(long pos) {
return (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal= pos;
return retVal;
}
public boolean nextKeyValue() throws IOException {
if (key == null) {
key = new LongWritable();
}
key.set(pos);
if (value == null) {
value = new Text();
}
int newSize = 0;
StringBuffer totalValue = new StringBuffer();
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition()