Wysyłanie listy z mapy Hadoop Zmniejsz zadanie za pomocą niestandardowego zapisu
Próbuję utworzyć prostą mapę, zmniejszając zadanie, zmieniając przykład liczby słów podany przez hadoop.
Próbuję wymyślić listę zamiast liczby słów. Przykład ze słowem daje następujący wynik
hello 2
world 2
Próbuję go wyprowadzić jako listę, która będzie stanowić podstawę przyszłych prac
hello 1 1
world 1 1
Myślę, że jestem na dobrej drodze, ale mam problem z pisaniem listy. Zamiast tego otrzymuję
Hello foo.MyArrayWritable@61250ff2
World foo.MyArrayWritable@483a0ab1
Oto mój MyArrayWritable. Włożyłem sys wwrite(DataOuptut arg0)
ale nigdy nie wypuszcza niczego, więc myślę, że ta metoda może nie zostać wywołana i nie wiem dlaczego.
class MyArrayWritable extends ArrayWritable{
public MyArrayWritable(Class<? extends Writable> valueClass, Writable[] values) {
super(valueClass, values);
}
public MyArrayWritable(Class<? extends Writable> valueClass) {
super(valueClass);
}
@Override
public IntWritable[] get() {
return (IntWritable[]) super.get();
}
@Override
public void write(DataOutput arg0) throws IOException {
for(IntWritable i : get()){
i.write(arg0);
}
}
}
EDYCJA - dodanie więcej kodu źródłowego
public class WordCount {
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, MyArrayWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<IntWritable> list = new ArrayList<IntWritable>();
for (IntWritable val : values) {
list.add(val);
}
context.write(key, new MyArrayWritable(IntWritable.class, list.toArray(new IntWritable[list.size()])));
}
}
public static void main(String[] args) throws Exception {
if(args == null || args.length == 0)
args = new String[]{"./wordcount/input","./wordcount/output"};
Path p = new Path(args[1]);
FileSystem fs = FileSystem.get(new Configuration());
fs.exists(p);
fs.delete(p, true);
Configuration conf = new Configuration();
Job job = new Job(conf, "wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}