Leyendo imágenes de HDFS usando mapreduce

Por favor ayúdame en este código. Estoy tratando de reiar imágenes de HDFS. Estoy usando WholeFileInputFormat. con WholeFileRecordreader. No hay errores de tiempo de compilación. Pero el código está dando errores de tiempo de ejecución. La salida está diciendo: no se puede crear la instancia de la clase WholeFileInputFormat. He escrito este código de acuerdo a los comentarios en¿Cómo leer varios archivos de imagen como entrada de hdfs en map-reduce? Por favor, ayúdame en este código. Contiene 3 clases. ¿Cómo depurarlo? ¿O de otra manera?

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class map2 extends Configured implements Tool {

    public static class MapClass extends MapReduceBase
            implements Mapper<NullWritable, BytesWritable, Text, Text> {

        private Text input_image = new Text();
        private Text input_vector = new Text();

        public void map(NullWritable key,BytesWritable value,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {

            System.out.println("CorrelogramIndex Method:");
        String featureString;
        int MAXIMUM_DISTANCE = 16;
        AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;

        byte[] identifier=value.getBytes();

             BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier)); 

            AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);


            featureString = vd.getStringRepresentation();
            double[] bytearray = vd.getDoubleHistogram();

            System.out.println("image: " + identifier + " " + featureString);

            System.out.println(" ------------- ");

            output.collect(input_image, input_vector);


    public static class Reduce extends MapReduceBase
            implements Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
            String out_vector = "";

            while (values.hasNext()) {
                out_vector += (values.next().toString());
            output.collect(key, new Text(out_vector));

    static int printUsage() {
        System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
        return -1;

    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(getConf(), map2.class);






        List<String> other_args = new ArrayList<>();
        for (int i = 0; i < args.length; ++i) {
            try {
                switch (args[i]) {
                    case "-m":
                    case "-r":
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();

        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();

        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        return 0;

    public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new map2(), args);

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;

public class WholeFileInputFormat<NullWritable, BytesWritable> 
        extends FileInputFormat<NullWritable, BytesWritable> {

    //  @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;

    public WholeFileRecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;

public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
   JobConf job, Reporter reporter)
   throws IOException;


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;

 class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {   //recordreader

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getJobConf();

    public boolean next(NullWritable k, BytesWritable v) throws IOException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
            processed = true;
            return true;
        return false;
    public NullWritable createKey() {
    return NullWritable.get();

    public BytesWritable createValue() {
    return value;

 public long getPos() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");

 public void close() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");

 public float getProgress() throws IOException {
    throw new UnsupportedOperationException("Not supported yet.");

