Jak bezpośrednio wysłać wyjście reduktora mapującego do innego reduktora mapującego bez zapisywania danych wyjściowych w plikach hdfs

Problem rozwiązany ostatecznie sprawdź moje rozwiązanie na dole

Ostatnio próbuję uruchomić przykład polecenia w chaper6 (lista 6.1 ~ 6.4) z Mahout in Action. Ale napotkałem problem i przeszukałem go, ale nie mogę znaleźć rozwiązania.

Oto problem: mam parę reduktora mapującego

<code>public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}
</code>

Reduktor wyświetla identyfikator użytkownika i wektor użytkownika i wygląda tak: 98955 {590: 1,0 22: 1,0 9059: 1,0 3: 1,0 2: 1,0 1: 1,0}

Następnie chcę użyć innej pary reduktora mapującego do przetwarzania tych danych

<code>public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}
</code>

Kiedy próbuję uruchomić zadanie, rzuca błąd

org.apache.hadoop.io.Text nie może być rzutowany na org.apache.mahout.math.VectorWritable

pierwszy reduktor mapujący zapisuje dane wyjściowe w hdfs, a drugi reduktor mapujący próbuje odczytać dane wyjściowe, program odwzorowujący może rzutować 98955 na VarLongWritable, ale nie może przekonwertować {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0} do VectorWritable, więc zastanawiam się, czy istnieje sposób na to, aby pierwszy reduktor mapujący bezpośrednio wysyłał wyjście do drugiej pary, wtedy nie ma potrzeby konwertowania danych. Sprawdziłem Hadoop w akcji i hadoop: ostateczny przewodnik, wydaje się, że nie ma takiego sposobu, żadnych sugestii?

Problem rozwiązany

Rozwiązanie: UżywającSequenceFileOutputFormat, możemy wygenerować i zapisać mniejszy wynik pierwszego przepływu pracy MapReduce na DFS, a następnie drugi przepływ pracy MapReduce może odczytać plik tymczasowy jako dane wejściowe przy użyciuSequenceFileInputFormat klasa jako parametr podczas tworzenia programu odwzorowującego. Ponieważ wektor byłby zapisany w pliku sekwencji binarnych, który ma określony format,SequenceFileInputFormat może je odczytać i przekształcić z powrotem do formatu wektorowego.

Oto przykładowy kod:

<code>confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             

    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();


    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

    JobClient.runJob(conf1);
    JobClient.runJob(conf2);
</code>

Jeśli masz z tym jakiś problem, skontaktuj się ze mną

questionAnswers(3)

yourAnswerToTheQuestion