Получение имени файла / FileData в качестве ввода ключа / значения для Map при запуске задания Hadoop MapReduce
Я прошел через вопросКак получить имя файла / содержимое файла в качестве ввода ключа / значения для MAP при запуске задания Hadoop MapReduce? Вот. Хотя это объясняет концепцию, я не могу успешно преобразовать его в код.
По сути, я хочу, чтобы имя файла было ключом, а данные файла - значением. Для этого я написал обычайRecordReader
как рекомендовано в вышеупомянутом вопросе. Но я не мог понять, как получить имя файла в качестве ключа в этом классе. Также при написании кастомаFileInputFormat
класс, я не мог понять, как вернуть обычайRecordReader
Я писал ранее.
RecordReader
код является:
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
public class CustomRecordReader extends RecordReader<Text, Text> {
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
private StringBuffer valueBuffer = new StringBuffer("");
private Text key = new Text();
private Text value = new Text();
private RecordReader<Text, Text> recordReader;
public SPDRecordReader(RecordReader<Text, Text> recordReader) {
this.recordReader = recordReader;
}
@Override
public void close() throws IOException {
recordReader.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return recordReader.getProgress();
}
@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
recordReader.initialize(arg0, arg1);
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (valueBuffer.equals("")) {
while (recordReader.nextKeyValue()) {
valueBuffer.append(recordReader.getCurrentValue());
valueBuffer.append(LINE_SEPARATOR);
}
value.set(valueBuffer.toString());
return true;
}
return false;
}
}
И неполныйFileInputFormat
класс это:
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class CustomFileInputFormat extends FileInputFormat<Text, Text> {
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
return false;
}
@Override
public RecordReader<Text, Text> getRecordReader(InputSplit arg0, JobConf arg1,
Reporter arg2) throws IOException {
return null;
}
}