Leyendo imágenes de HDFS usando mapreduce

Por favor ayúdame en este código. Estoy tratando de reiar imágenes de HDFS. Estoy usando WholeFileInputFormat. con WholeFileRecordreader. No hay errores de tiempo de compilación. Pero el código está dando errores de tiempo de ejecución. La salida está diciendo: no se puede crear la instancia de la clase WholeFileInputFormat. He escrito este código de acuerdo a los comentarios en¿Cómo leer varios archivos de imagen como entrada de hdfs en map-reduce? Por favor, ayúdame en este código. Contiene 3 clases. ¿Cómo depurarlo? ¿O de otra manera?

import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class map2 extends Configured implements Tool {


    public static class MapClass extends MapReduceBase
            implements Mapper<NullWritable, BytesWritable, Text, Text> {


        private Text input_image = new Text();
        private Text input_vector = new Text();


        @Override
        public void map(NullWritable key,BytesWritable value,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {

            System.out.println("CorrelogramIndex Method:");
        String featureString;
        int MAXIMUM_DISTANCE = 16;
        AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;



        byte[] identifier=value.getBytes();

             BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier)); 

            AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);

            vd.extract(bimg);

            featureString = vd.getStringRepresentation();
            double[] bytearray = vd.getDoubleHistogram();

            System.out.println("image: " + identifier + " " + featureString);




            System.out.println(" ------------- ");


            input_image.set(identifier);
            input_vector.set(featureString);
            output.collect(input_image, input_vector);




        }
    }



    public static class Reduce extends MapReduceBase
            implements Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterator<Text> values,
                OutputCollector<Text, Text> output,
                Reporter reporter) throws IOException {
            String out_vector = "";

            while (values.hasNext()) {
                out_vector += (values.next().toString());
            }
            output.collect(key, new Text(out_vector));
        }
    }

    static int printUsage() {
        System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }


    @Override
    public int run(String[] args) throws Exception {



        JobConf conf = new JobConf(getConf(), map2.class);
        conf.setJobName("image_mapreduce");

            conf.setInputFormat(WholeFileInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);


        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(MapClass.class);

        conf.setReducerClass(Reduce.class);






        List<String> other_args = new ArrayList<>();
        for (int i = 0; i < args.length; ++i) {
            try {
                switch (args[i]) {
                    case "-m":
                        conf.setNumMapTasks(Integer.parseInt(args[++i]));
                        break;
                    case "-r":
                        conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                        break;
                    default:
                        other_args.add(args[i]);
                        break;
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();
            }
        }

        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();
        }




        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new map2(), args);
    System.exit(res);
    }
}
 -----------------------------------------------------------------------------------
//WholeFileInputFormat

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;

public class WholeFileInputFormat<NullWritable, BytesWritable> 
        extends FileInputFormat<NullWritable, BytesWritable> {

    //  @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return false;
    }
    //@Override

    public WholeFileRecordReader createRecordReader(
            InputSplit split, TaskAttemptContext context) throws IOException,
            InterruptedException {
        WholeFileRecordReader reader = new WholeFileRecordReader();
        reader.initialize(split, context);
        return reader;
    }




 @Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
   JobConf job, Reporter reporter)
   throws IOException;
}

    -------------------------------------------------------------------------------
   //WholeInputFileRecorder

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;

 class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> {   //recordreader

    private FileSplit fileSplit;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();
    private boolean processed = false;

    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        this.fileSplit = (FileSplit) split;
        this.conf = context.getJobConf();
    }

    @Override
    public boolean next(NullWritable k, BytesWritable v) throws IOException {
        if (!processed) {
            byte[] contents = new byte[(int) fileSplit.getLength()];
            Path file = fileSplit.getPath();
            org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
            FSDataInputStream in = null;
            try {
                in = fs.open(file);
                IOUtils.readFully(in, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(in);
            }
            processed = true;
            return true;
        }
        return false;
    }
@Override
    public NullWritable createKey() {
    return NullWritable.get();
    }

    @Override
    public BytesWritable createValue() {
    return value;
    }

 @Override
 public long getPos() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public void close() throws IOException {
     throw new UnsupportedOperationException("Not supported yet.");
 }

 @Override
 public float getProgress() throws IOException {
    throw new UnsupportedOperationException("Not supported yet.");
 }
    }

Respuestas a la pregunta(1)

Su respuesta a la pregunta