Hadoop Map Reduce For Google веб-график
В качестве задания нам была дана задача создания функций уменьшения карты, которые будут выводить для каждого узла n в списке веб-графиков Google список узлов, которые можно перейти с узла n за 3 прыжка. (Фактические данные можно найти здесь:http://snap.stanford.edu/data/web-Google.html) Вот пример того, как элементы в списке будут:
1 2
1 3
2 4
3 4
3 5
4 1
4 5
4 6
5 6
Из приведенного выше примера граф будет такой
В приведенном выше упрощенном примере пути для примера узла 1 представляют собой α [1 -> 2 -> 4 -> 1], [1 -> 2 -> 4 -> 5], [1 -> 2 -> 4 -> 6], [1 -> 3 -> 4 -> 1], [1 -> 3 -> 4 -> 5], [1 -> 3 -> 4 -> 6] και [1 -> 3 -> 5 -> 6] и, следовательно, уменьшение карты выведет для узла 1 вершины 1,5,6 ((а) каждая вершина должна быть посчитана только один раз, и (б) мы включаем текущую вершину только тогда, когда существует круговой путь длина 3, как в примере [1 -> 2 -> 4 -> 1] и [1 -> 3 -> 4 -> 1].
Я очень растерялся, потому что считаю, что это требует знания теории графов и алгоритмов, и нас ничему не научили.
Я буду очень признателен, если кто-нибудь может дать мне правильное направление для начала. (Я изучил теорию кратчайшего пути и тому подобное, но я не уверен, будет ли она полезна для этого конкретного упражнения)
Заранее спасибо, и хорошего сезона отпусков.
РЕДАКТИРОВАТЬ
Я пытаюсь создать список смежности, но, хотя я ожидаю, что на выходе будет "vertexID" "узел1 узел2 узел3 узел4 ...", я вижу, что в выходном файле мой редуктор разбивает список для каждого идентификатора вершины на пары по три.
например, если у меня есть вершина A, которая соединяется с Z, X, C, V, B, N, M, G, H, J, K, L I, она выводит это как
A Z, X, C
A V, B, N
A M, G, H
A J, K, L
ниже мой картер и редуктор
public class AdjacentsListDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf);
job.setJobName("Test driver");
job.setJarByClass(AdjacentsListDriver.class);
String[] arg0 = new GenericOptionsParser(conf, args).getRemainingArgs();
if (arg0.length != 2) {
System.err.println("Usage: hadoop jar <my_jar> <input_dir> <output_dir>");
System.exit(1);
}
Path in = new Path(arg0[0]);
Path out = new Path(arg0[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
job.setMapperClass(ListMapper.class);
job.setReducerClass(ListReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new AdjacentsListDriver(), args);
System.exit(res);
}
}
/**
* @author George
* Theoretically this takes a key(vertexID) and maps all nodes that are connected to it in one hop....
*
*/
public class ListMapper extends Mapper<LongWritable, Text, Text, Text> {
private Text vertexID = new Text();
//private LongWritable vertice= new LongWritable(0);
private Text vertice=new Text();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line,"\n");
StringTokenizer itrInside;
//vertice=new LongWritable(Long.valueOf(value.toString()).longValue());
while (itr.hasMoreTokens()) {
if(itr.countTokens() > 2){
}//ignore first line ??
else{
itrInside=new StringTokenizer(itr.toString());
vertexID.set(itr.nextToken());
while(itrInside.hasMoreTokens()){
vertice.set(itrInside.nextToken());
context.write(vertexID, value);
}
}
}
}
}
@override
public class ListReducer extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String vertices="";
for (Text value : values) {
if(vertices=="")
vertices+=value.toString();
else
vertices=vertices+","+value.toString();
}
Text value=new Text();
value.set(vertices);
context.write(key, value);
}
}