Lesen von Bildern aus HDFS mit mapreduce
Bitte hilf mir in diesem Code. Ich versuche, Bilder von HDFS zu reiaden. Ich verwende WholeFileInputFormat. mit WholeFileRecordreader. Keine Fehler bei der Kompilierung. Der Code gibt jedoch Laufzeitfehler aus. Die Ausgabe lautet: Die Instanz der angegebenen Klasse WholeFileInputFormat kann nicht erstellt werden. Ich habe diesen Code gemäß den Kommentaren auf geschriebenWie lese ich mehrere Bilddateien als Eingabe von HDFS in Map-Reduce? Bitte helfen Sie mir in diesem Code. Es enthält 3 Klassen. Wie man es debuggt? Oder auf andere Weise?
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.imageio.ImageIO;
import net.semanticmetadata.lire.imageanalysis.AutoColorCorrelogram;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class map2 extends Configured implements Tool {
public static class MapClass extends MapReduceBase
implements Mapper<NullWritable, BytesWritable, Text, Text> {
private Text input_image = new Text();
private Text input_vector = new Text();
@Override
public void map(NullWritable key,BytesWritable value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
System.out.println("CorrelogramIndex Method:");
String featureString;
int MAXIMUM_DISTANCE = 16;
AutoColorCorrelogram.Mode mode = AutoColorCorrelogram.Mode.FullNeighbourhood;
byte[] identifier=value.getBytes();
BufferedImage bimg = ImageIO.read(new ByteArrayInputStream(identifier));
AutoColorCorrelogram vd = new AutoColorCorrelogram(MAXIMUM_DISTANCE, mode);
vd.extract(bimg);
featureString = vd.getStringRepresentation();
double[] bytearray = vd.getDoubleHistogram();
System.out.println("image: " + identifier + " " + featureString);
System.out.println(" ------------- ");
input_image.set(identifier);
input_vector.set(featureString);
output.collect(input_image, input_vector);
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String out_vector = "";
while (values.hasNext()) {
out_vector += (values.next().toString());
}
output.collect(key, new Text(out_vector));
}
}
static int printUsage() {
System.out.println("map2 [-m <maps>] [-r <reduces>] <input> <output>");
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), map2.class);
conf.setJobName("image_mapreduce");
conf.setInputFormat(WholeFileInputFormat.class);
conf.setOutputFormat(NullOutputFormat.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
List<String> other_args = new ArrayList<>();
for (int i = 0; i < args.length; ++i) {
try {
switch (args[i]) {
case "-m":
conf.setNumMapTasks(Integer.parseInt(args[++i]));
break;
case "-r":
conf.setNumReduceTasks(Integer.parseInt(args[++i]));
break;
default:
other_args.add(args[i]);
break;
}
} catch (NumberFormatException except) {
System.out.println("ERROR: Integer expected instead of " + args[i]);
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from "
+ args[i - 1]);
return printUsage();
}
}
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: "
+ other_args.size() + " instead of 2.");
return printUsage();
}
FileInputFormat.setInputPaths(conf, other_args.get(0));
FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new map2(), args);
System.exit(res);
}
}
-----------------------------------------------------------------------------------
//WholeFileInputFormat
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
public class WholeFileInputFormat<NullWritable, BytesWritable>
extends FileInputFormat<NullWritable, BytesWritable> {
// @Override
protected boolean isSplitable(JobContext context, Path file) {
return false;
}
//@Override
public WholeFileRecordReader createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
WholeFileRecordReader reader = new WholeFileRecordReader();
reader.initialize(split, context);
return reader;
}
@Override
public RecordReader<NullWritable, BytesWritable> getRecordReader(InputSplit split,
JobConf job, Reporter reporter)
throws IOException;
}
-------------------------------------------------------------------------------
//WholeInputFileRecorder
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.TaskAttemptContext;
class WholeFileRecordReader implements RecordReader<NullWritable, BytesWritable> { //recordreader
private FileSplit fileSplit;
private Configuration conf;
private BytesWritable value = new BytesWritable();
private boolean processed = false;
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
this.fileSplit = (FileSplit) split;
this.conf = context.getJobConf();
}
@Override
public boolean next(NullWritable k, BytesWritable v) throws IOException {
if (!processed) {
byte[] contents = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
org.apache.hadoop.fs.FileSystem fs = file.getFileSystem(conf);
FSDataInputStream in = null;
try {
in = fs.open(file);
IOUtils.readFully(in, contents, 0, contents.length);
value.set(contents, 0, contents.length);
} finally {
IOUtils.closeStream(in);
}
processed = true;
return true;
}
return false;
}
@Override
public NullWritable createKey() {
return NullWritable.get();
}
@Override
public BytesWritable createValue() {
return value;
}
@Override
public long getPos() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public void close() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public float getProgress() throws IOException {
throw new UnsupportedOperationException("Not supported yet.");
}
}