Mapa do Hadoop Reduce For Google

recebemos como atribuição a tarefa de criar funções de redução de mapa que serão exibidas para cada nó n no google web graph list os nós que você pode ir do nó n em 3 saltos. (Os dados reais podem ser encontrados aqui:http://snap.stanford.edu/data/web-Google.html) Aqui está um exemplo de como serão os itens da lista:

1 2 
1 3 
2 4 
3 4 
3 5 
4 1 
4 5 
4 6 
5 6 

A partir do exemplo acima, um gráfico de exemplo será este

No exemplo simplificado acima, os caminhos, por exemplo, do nó 1 são α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 -> 1], [1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6] e, assim, a redução do mapa produzirá para o nó 1 os vértices 1,5,6 ((a) cada vértice deve ser contado apenas uma vez; e (b) incluiremos o vértice atual somente quando houver um caminho circular de comprimento 3, como é o exemplo [1 -> 2 -> 4 -> 1] e [1 -> 3 -> 4 -> 1].

Estou muito perdido, porque acredito que isso requer conhecimento da teoria dos grafos e algoritmos e não nos ensinaram nada relacionado a isso.

Eu aprecio muito se alguém puder me dar a direção certa para começar. (Eu examinei a teoria do caminho mais curto e coisas do tipo, mas não tenho certeza se será útil para este exercício específico)

Agradecemos antecipadamente e tenha uma boa temporada de festas.

EDITAR

Eu tento criar a lista de adjucência, mas enquanto eu espero que a saída seja "vertexID" "node1 node2 node3 node4 ..." vejo que no arquivo de saída meu redutor divide a lista de cada ID de vértice em pares de três.

por exemplo, se eu tiver o vértice A que se conecta a Z, X, C, V, B, N, M ,, G, H, J, K, L I, ele gera isso como

A Z, X, C

A V, B, N

A M, G, H

A J, K, L

abaixo estão o meu mapeador e redutor

public class AdjacentsListDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {



        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJobName("Test driver");
        job.setJarByClass(AdjacentsListDriver.class);

        String[] arg0 = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (arg0.length != 2) {
            System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
            System.exit(1);
        }

        Path in = new Path(arg0[0]);
        Path out = new Path(arg0[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(ListMapper.class);
        job.setReducerClass(ListReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);    
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AdjacentsListDriver(), args);
        System.exit(res);

    }



}





/**
 * @author George
 * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
 *
 */
public class ListMapper extends Mapper<LongWritable, Text, Text, Text> {

    private Text vertexID = new Text();
    //private LongWritable vertice= new LongWritable(0);
    private Text vertice=new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line,"\n");
        StringTokenizer itrInside;

        //vertice=new LongWritable(Long.valueOf(value.toString()).longValue());


        while (itr.hasMoreTokens()) {
            if(itr.countTokens() > 2){

            }//ignore first line ??
            else{
                itrInside=new StringTokenizer(itr.toString());
                vertexID.set(itr.nextToken());

                while(itrInside.hasMoreTokens()){               
                    vertice.set(itrInside.nextToken());
                    context.write(vertexID, value);
                }           
            }
        }

    }

}

@override
public class ListReducer extends Reducer<Text, Text, Text, Text>{
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        String vertices="";

        for (Text value : values) {
            if(vertices=="")
                vertices+=value.toString();
            else
                vertices=vertices+","+value.toString();         
        }

        Text value=new Text();
        value.set(vertices);
        context.write(key, value);

    }

}

questionAnswers(1)

yourAnswerToTheQuestion