Чтение изображений из HDFS с помощью mapreduce

Пожалуйста, помогите мне в этом коде. Я пытаюсь прочитать изображения из HDFS. Я использую WholeFileInputFormat. с WholeFileRecordreader. Нет ошибок времени компиляции. Но код дает ошибки времени выполнения. Вывод говорит: не может создать экземпляр данного класса WholeFileInputFormat. Я написал этот код в соответствии с комментариями на Как читать несколько файлов изображений в качестве входных данных из hdfs в map-Reduce? Пожалуйста, помогите мне в этом коде. Он содержит 3 класса. Как его отладить? Или любым другим способом?

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class map2 extends Configured implements Tool {


    public static class MapClass extends MapReduceBase
            implements Mapper<NullWritable, BytesWritable, Text, Text> {


        private Text input_image = new Text();
        private Text input_vector = new Text();


        @Override
        public void map(NullWritable key,BytesWritable value,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {

            System.out.println("CorrelogramIndex Method:");
        String featureString;
        int MAXIMUM_DISTANCE = 16;
        AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;



        byte[] identifier=value.getBytes();

             BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier)); 

            AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);

            vd.extract(bimg);

            featureString = vd.getStringRepresentation();
            double[] bytearray = vd.getDoubleHistogram();

            System.out.println("image: " + identifier + " " + featureString);




            System.out.println(" ------------- ");


            input_image.set(identifier);
            input_vector.set(featureString);
            output.collect(input_image, input_vector);




        }
    }



    public static class Reduce extends MapReduceBase
            implements Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
            String out_vector = "";

            while (values.hasNext()) {
                out_vector += (values.next().toString());
            }
            output.collect(key, new Text(out_vector));
        }
    }

    static int printUsage() {
        System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }


    @Override
    public int run(String[] args) throws Exception {



        JobConf conf = new JobConf(getConf(), map2.class);
        conf.setJobName("image_mapreduce");

            conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);


        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(MapClass.class);

        conf.setReducerClass(Reduce.class);






        List<String> other_args = new ArrayList<>();
        for (int i = 0; i < args.length; ++i) {
            try {
                switch (args[i]) {
                    case "-m":
                        conf.setNumMapTasks(Integer.parseInt(args[++i]));
                        break;
                    case "-r":
                        conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                        break;
                    default:
                        other_args.add(args[i]);
                        break;
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();
            }
        }

        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();
        }




        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new map2(), args);
    System.exit(res);
    }
}
 -----------------------------------------------------------------------------------
//WholeFileInputFormat

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;

public class WholeFileInputFormat<NullWritable, BytesWritable> 
        extends FileInputFormat<NullWritable, BytesWritable> {

    //  @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    //@Over,ride

    public WholeFileRecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }




 @Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
   JobConf job, Reporter reporter)
   throws IOException;
}

    -------------------------------------------------------------------------------
   //WholeInputFileRecorder

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;

 class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {   //recordreader

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getJobConf();
    }

    @Override
    public boolean next(NullWritable k, BytesWritable v) throws IOException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }
@Override
    public NullWritable createKey() {
    return NullWritable.get();
    }

    @Override
    public BytesWritable createValue() {
    return value;
    }

 @Override
 public long getPos() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public void close() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public float getProgress() throws IOException {
    throw new UnsupportedOperationException("Not supported yet.");
 }
    }

Ответы на вопрос(1)

Ваш ответ на вопрос