Hadoop-Fehler bei der Ausführung: Geben Sie im Schlüssel von der Karte einen Fehler ein: erwartet org.apache.hadoop.io.Text, empfangen org.apache.hadoop.io.LongWritable

Ich implementiere einen PageRank-Algorithmus auf Hadoop und wie der Titel besagt, habe ich beim Versuch, den Code auszuführen, den folgenden Fehler festgestellt:

Typenkonflikt im Schlüssel von der Karte: erwartet org.apache.hadoop.io.Text, erhalten org.apache.hadoop.io.LongWritable

In meiner Eingabedatei speichere ich Grafikknoten-IDs als Schlüssel und einige Informationen dazu als Wert. Meine Eingabedatei hat das folgende Format:

1 \ t 3.4,2,5,6,67

4 \ t 4.2,77,2,7,83

... ...

Beim Versuch zu verstehen, was der Fehler besagt, versuche ich, LongWritable als meinen Hauptvariablentyp zu verwenden, wie Sie im folgenden Code sehen können. Das heißt, ich habe:

map <LongWritable, LongWritable, LongWritable, LongWritable>

<LongWritable, LongWritable, LongWritable, LongWritable> reduzieren

aber ich habe auch versucht:

map <Text, Text, Text, Text>

<Text, Text, Text, Text> verkleinern

und auch:

map <LongWritable, Text, LongWritable, Text>

<LongWritable, Text, LongWritable, Text> reduzieren

und ich komme immer mit dem gleichen Fehler. Ich glaube, ich habe Probleme zu verstehen, was die erwarteten und empfangenen Mittel des Fehlers sind. Bedeutet das, dass meine Kartenfunktion LongWritable von meiner Eingabedatei erwartet und Text erhalten hat? Gibt es ein Problem mit dem Format der von mir verwendeten Eingabedatei oder mit den Variablentypen?

Hier ist der vollständige Code. Können Sie mir sagen, was und wo geändert werden soll ?:

import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.lang.Object.*;

import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.log4j.*;
import org.apache.commons.logging.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;



public class Pagerank
{   


public static class PRMap extends Mapper<LongWritable, LongWritable, LongWritable, LongWritable>
{

    public void map(LongWritable lineNum, LongWritable line, OutputCollector<LongWritable, LongWritable> outputCollector, Reporter reporter) throws IOException, InterruptedException
    {
        if (line.toString().length() == 0) {
            return;
        }

        Text key = new Text();
        Text value = new Text();
        LongWritable valuel = new LongWritable();
        StringTokenizer spline = new StringTokenizer(line.toString(),"\t");
        key.set(spline.nextToken());  
        value.set(spline.nextToken());  

        valuel.set(Long.parseLong(value.toString()));
        outputCollector.collect(lineNum,valuel);


        String info = value.toString();
        String splitter[] = info.split(",");

        if(splitter.length >= 3)
        {
            float f = Float.parseFloat(splitter[0]);
            float pagerank = f / (splitter.length - 2);

            for(int i=2;i<splitter.length;i++)
            {
                LongWritable key2 = new LongWritable();
                LongWritable value2 = new LongWritable();
                long l;

                l = Long.parseLong(splitter[i]);
                key2.set(l);
                //key2.set(splitter[i]);
                value2.set((long)f);

                outputCollector.collect(key2, value2);
            }
        }
    }
}

public static class PRReduce extends Reducer<LongWritable,LongWritable,LongWritable,LongWritable>
{
    private Text result = new Text();
    public void reduce(LongWritable key, Iterator<LongWritable> values,OutputCollector<LongWritable, LongWritable> results, Reporter reporter) throws IOException, InterruptedException
    {

        float pagerank = 0;
        String allinone = ",";
        while(values.hasNext())
        {
            LongWritable temp = values.next();
            String converted = temp.toString();
            String[] splitted = converted.split(",");

            if(splitted.length > 1)
            {                   
                for(int i=1;i<splitted.length;i++)
                {
                    allinone = allinone.concat(splitted[i]);
                    if(i != splitted.length - 1)
                        allinone = allinone.concat(",");
                }
            }
            else
            {
                float f = Float.parseFloat(splitted[0]);
                pagerank = pagerank + f;
            }
        }
        String last = Float.toString(pagerank);
        last = last.concat(allinone);

        LongWritable value = new LongWritable();
        value.set(Long.parseLong(last));

        results.collect(key, value);
    }      
}



public static void main(String[] args) throws Exception
{      


    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }

    Job job = new Job(conf, "pagerank_itr0");

    job.setJarByClass(Pagerank.class);      
    job.setMapperClass(Pagerank.PRMap.class);       
    job.setReducerClass(Pagerank.PRReduce.class);    


    job.setOutputKeyClass(LongWritable.class);            
    job.setOutputValueClass(LongWritable.class);            
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    job.waitForCompletion(true);

}
}

Antworten auf die Frage(1)

Ihre Antwort auf die Frage