MapReduce Output ArrayWritable
Ich versuche, eine Ausgabe von ArrayWritable in einer einfachen MapReduce-Task zu erhalten. Ich habe ein paar Fragen mit einem ähnlichen Problem gefunden, kann das Problem jedoch nicht in meinem eigenen Code lösen. Also freue ich mich auf eure Hilfe. Vielen Dank :)
Eingang Textdatei mit einem Satz.
Ausgab sollte sein
<Word, <length, number of same words in Textfile>>
Example: Hello 5 2
Die Ausgabe, die ich in meinem Job bekomme, ist:
hello WordLength_V01$IntArrayWritable@221cf05
test WordLength_V01$IntArrayWritable@799e525a
Ich denke, das Problem liegt in der Unterklasse von IntArrayWritable, aber ich bekomme nicht die richtige Korrektur, um dies zu beheben. Durch die haben wir Hadoop 2.5. Ich verwende den folgenden Code, um dieses Ergebnis zu erhalten:
Hauptmethode:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word length V1");
// Set Classes
job.setJarByClass(WordLength_V01.class);
job.setMapperClass(MyMapper.class);
// job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
// Set Output and Input Parameters
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntArrayWritable.class);
// Number of Reducers
job.setNumReduceTasks(1);
// Set FileDestination
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Mapper:
public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {
// Initialize Variables
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// Map Method
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// Use Tokenizer
StringTokenizer itr = new StringTokenizer(value.toString());
// Select each word
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
// Output Pair
context.write(word, one);
}
}
}
Reducer:
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntArrayWritable> {
// Initialize Variables
private IntWritable count = new IntWritable();
private IntWritable length = new IntWritable();
// Reduce Method
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// Count Words
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
count.set(sum);
// Wordlength
length.set(key.getLength());
// Define Output
IntWritable[] temp = new IntWritable[2];
IntArrayWritable output = new IntArrayWritable(temp);
temp[0] = count;
temp[1] = length;
// Output
output.set(temp);
context.write(key, new IntArrayWritable(output.get()));
}
}
Unterklass
public static class IntArrayWritable extends ArrayWritable {
public IntArrayWritable(IntWritable[] intWritables) {
super(IntWritable.class);
}
@Override
public IntWritable[] get() {
return (IntWritable[]) super.get();
}
@Override
public void write(DataOutput arg0) throws IOException {
for(IntWritable data : get()){
data.write(arg0);
}
}
}
Ich habe die folgenden Links verwendet, um eine Lösung zu finden:
Interface Writable (hadoop.apache.org)Class ArrayWritable (hadoop.apache.org) stackoverflow.com (1) stackoverflow.com (2)Ich bin wirklich dankbar für jede Idee!
-------- Lösung --------
Neue Unterklasse:
public static class IntArrayWritable extends ArrayWritable {
public IntArrayWritable(IntWritable[] values) {
super(IntWritable.class, values);
}
@Override
public IntWritable[] get() {
return (IntWritable[]) super.get();
}
@Override
public String toString() {
IntWritable[] values = get();
return values[0].toString() + ", " + values[1].toString();
}
}
Neue Verkleinerungsmethode:
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// Count Words
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
count.set(sum);
// Wordlength
length.set(key.getLength());
// Define Output
IntWritable[] temp = new IntWritable[2];
temp[0] = count;
temp[1] = length;
context.write(key, new IntArrayWritable(temp));
}