MapReduce Output ArrayWritable

Estoy tratando de obtener una salida de un ArrayWritable en un MapReduce-Task simple. Encontré algunas preguntas con un problema similar, pero no puedo resolver el problema en mi propio código. Espero su ayuda. Gracias :)!

Entrada: Archivo de texto con alguna oración.

Salida debiera ser:

<Word, <length, number of same words in Textfile>>
 Example: Hello  5  2 

El resultado que obtengo en mi trabajo es:

hello WordLength_V01$IntArrayWritable@221cf05
test WordLength_V01$IntArrayWritable@799e525a

Creo que el problema está en la subclase de IntArrayWritable, pero no obtengo la corrección correcta para solucionarlo. Por el tenemos Hadoop 2.5. Utilizo el siguiente código para obtener este resultado:

Método principal:

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word length V1");

    // Set Classes
    job.setJarByClass(WordLength_V01.class);
    job.setMapperClass(MyMapper.class);
    // job.setCombinerClass(MyReducer.class);
    job.setReducerClass(MyReducer.class);

    // Set Output and Input Parameters
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntArrayWritable.class);

    // Number of Reducers
    job.setNumReduceTasks(1);

    // Set FileDestination
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

Mapeador:

public static class MyMapper extends Mapper<Object, Text, Text, IntWritable> {

    // Initialize Variables
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    // Map Method
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

        // Use Tokenizer
        StringTokenizer itr = new StringTokenizer(value.toString());

        // Select each word
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());

            // Output Pair
            context.write(word, one);
        }
    }
}

Reductor:

public static class MyReducer extends Reducer<Text, IntWritable, Text, IntArrayWritable> {

    // Initialize Variables
    private IntWritable count = new IntWritable();
    private IntWritable length = new IntWritable();

    // Reduce Method
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        // Count Words
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

        count.set(sum);

        // Wordlength
        length.set(key.getLength());

        // Define Output
        IntWritable[] temp = new IntWritable[2];
        IntArrayWritable output = new IntArrayWritable(temp);

        temp[0] = count;
        temp[1] = length;

        // Output
        output.set(temp);
        context.write(key, new IntArrayWritable(output.get()));
    }
}

Subclase

public static class IntArrayWritable extends ArrayWritable {
    public IntArrayWritable(IntWritable[] intWritables) {
        super(IntWritable.class);
    }

    @Override
    public IntWritable[] get() {
        return (IntWritable[]) super.get();
    }

    @Override
    public void write(DataOutput arg0) throws IOException {
        for(IntWritable data : get()){
            data.write(arg0);
        }
    }
}   

Utilicé los siguientes enlaces para encontrar una solución:

Interfaz de escritura (hadoop.apache.org)Class ArrayWritable (hadoop.apache.org)stackoverflow.com (1)stackoverflow.com (2)

Estoy realmente agradecido por cualquier idea!

-------- Solución --------

Nueva subclase:

public static class IntArrayWritable extends ArrayWritable {

    public IntArrayWritable(IntWritable[] values) {
        super(IntWritable.class, values);
    }

    @Override
    public IntWritable[] get() {
        return (IntWritable[]) super.get();
    }

    @Override
    public String toString() {
        IntWritable[] values = get();
        return values[0].toString() + ", " + values[1].toString();
    }
}

Nuevo método de reducción:

public void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {

        // Count Words
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }

        count.set(sum);

        // Wordlength
        length.set(key.getLength());

        // Define Output
        IntWritable[] temp = new IntWritable[2];
        temp[0] = count;
        temp[1] = length;

        context.write(key, new IntArrayWritable(temp));
}

Respuestas a la pregunta(1)

Su respuesta a la pregunta