Получение имени файла / FileData в качестве ввода ключа / значения для Map при запуске задания Hadoop MapReduce

Я прошел через вопросКак получить имя файла / содержимое файла в качестве ввода ключа / значения для MAP при запуске задания Hadoop MapReduce? Вот. Хотя это объясняет концепцию, я не могу успешно преобразовать его в код.

По сути, я хочу, чтобы имя файла было ключом, а данные файла - значением. Для этого я написал обычайRecordReader как рекомендовано в вышеупомянутом вопросе. Но я не могне понимаю, как получить имя файла в качестве ключа в этом классе. Также при написании кастомаFileInputFormat класс, я не могне понимаю, как вернуть обычайRecordReader Я писал ранее.

RecordReader код является:

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class CustomRecordReader extends RecordReader {

    private static final String LINE_SEPARATOR = System.getProperty("line.separator");

    private StringBuffer valueBuffer = new StringBuffer("");
    private Text key = new Text();
    private Text value = new Text();
    private RecordReader recordReader;

    public SPDRecordReader(RecordReader recordReader) {
        this.recordReader = recordReader;
    }

    @Override
    public void close() throws IOException {
        recordReader.close();
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return recordReader.getProgress();
    }

    @Override
    public void initialize(InputSplit arg0, TaskAttemptContext arg1)
            throws IOException, InterruptedException {
        recordReader.initialize(arg0, arg1);
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (valueBuffer.equals("")) {
            while (recordReader.nextKeyValue()) {
                valueBuffer.append(recordReader.getCurrentValue());
                valueBuffer.append(LINE_SEPARATOR);
            }
            value.set(valueBuffer.toString());
            return true;
        }
        return false;
    }

}

И неполныйFileInputFormat класс это:

import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

public class CustomFileInputFormat extends FileInputFormat {

    @Override
    protected boolean isSplitable(FileSystem fs, Path filename) {
        return false;
    }

    @Override
    public RecordReader getRecordReader(InputSplit arg0, JobConf arg1,
            Reporter arg2) throws IOException {
        return null;
    }
}

Ответы на вопрос(1)

Ваш ответ на вопрос