Hadoop Map Reduce For Google веб-график

В качестве задания нам была дана задача создания функций уменьшения карты, которые будут выводить для каждого узла n в списке веб-графиков Google список узлов, которые можно перейти с узла n за 3 прыжка. (Фактические данные можно найти здесь:http://snap.stanford.edu/data/web-Google.html) Вот пример того, как элементы в списке будут:

1 2 
1 3 
2 4 
3 4 
3 5 
4 1 
4 5 
4 6 
5 6 

Из приведенного выше примера граф будет такой

В приведенном выше упрощенном примере пути для примера узла 1 представляют собой α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 -> 1], [1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6] и, следовательно, уменьшение карты выведет для узла 1 вершины 1,5,6 ((а) каждая вершина должна быть посчитана только один раз, и (б) мы включаем текущую вершину только тогда, когда существует круговой путь длина 3, как в примере [1 -> 2 -> 4 -> 1] и [1 -> 3 -> 4 -> 1].

Я очень растерялся, потому что считаю, что это требует знания теории графов и алгоритмов, и нас ничему не научили.

Я буду очень признателен, если кто-нибудь может дать мне правильное направление для начала. (Я изучил теорию кратчайшего пути и тому подобное, но я не уверен, будет ли она полезна для этого конкретного упражнения)

Заранее спасибо, и хорошего сезона отпусков.

РЕДАКТИРОВАТЬ

Я пытаюсь создать список смежности, но, хотя я ожидаю, что на выходе будет "vertexID" "узел1 узел2 узел3 узел4 ...", я вижу, что в выходном файле мой редуктор разбивает список для каждого идентификатора вершины на пары по три.

например, если у меня есть вершина A, которая соединяется с Z, X, C, V, B, N, M, G, H, J, K, L I, она выводит это как

A Z, X, C

A V, B, N

A M, G, H

A J, K, L

ниже мой картер и редуктор

public class AdjacentsListDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {



        Configuration conf = getConf();
        Job job = Job.getInstance(conf);
        job.setJobName("Test driver");
        job.setJarByClass(AdjacentsListDriver.class);

        String[] arg0 = new GenericOptionsParser(conf, args).getRemainingArgs();
        if (arg0.length != 2) {
            System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
            System.exit(1);
        }

        Path in = new Path(arg0[0]);
        Path out = new Path(arg0[1]);

        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);

        job.setMapperClass(ListMapper.class);
        job.setReducerClass(ListReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);    
        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AdjacentsListDriver(), args);
        System.exit(res);

    }



}





/**
 * @author George
 * Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
 *
 */
public class ListMapper extends Mapper<LongWritable, Text, Text, Text> {

    private Text vertexID = new Text();
    //private LongWritable vertice= new LongWritable(0);
    private Text vertice=new Text();

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line,"\n");
        StringTokenizer itrInside;

        //vertice=new LongWritable(Long.valueOf(value.toString()).longValue());


        while (itr.hasMoreTokens()) {
            if(itr.countTokens() > 2){

            }//ignore first line ??
            else{
                itrInside=new StringTokenizer(itr.toString());
                vertexID.set(itr.nextToken());

                while(itrInside.hasMoreTokens()){               
                    vertice.set(itrInside.nextToken());
                    context.write(vertexID, value);
                }           
            }
        }

    }

}

@override
public class ListReducer extends Reducer<Text, Text, Text, Text>{
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        String vertices="";

        for (Text value : values) {
            if(vertices=="")
                vertices+=value.toString();
            else
                vertices=vertices+","+value.toString();         
        }

        Text value=new Text();
        value.set(vertices);
        context.write(key, value);

    }

}

Ответы на вопрос(1)

Ваш ответ на вопрос