Jak bezpośrednio wysłać wyjście reduktora mapującego do innego reduktora mapującego bez zapisywania danych wyjściowych w plikach hdfs
Problem rozwiązany ostatecznie sprawdź moje rozwiązanie na dole
Ostatnio próbuję uruchomić przykład polecenia w chaper6 (lista 6.1 ~ 6.4) z Mahout in Action. Ale napotkałem problem i przeszukałem go, ale nie mogę znaleźć rozwiązania.
Oto problem: mam parę reduktora mapującego
<code>public final class WikipediaToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> { private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Matcher m = NUMBERS.matcher(line); m.find(); VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); VarLongWritable itemID = new VarLongWritable(); while (m.find()) { itemID.set(Long.parseLong(m.group())); context.write(userID, itemID); } } } public class WikipediaToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> { public void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException { Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for (VarLongWritable itemPref : itemPrefs) { userVector.set((int) itemPref.get(), 1.0f); } context.write(userID, new VectorWritable(userVector)); } } </code>
Reduktor wyświetla identyfikator użytkownika i wektor użytkownika i wygląda tak: 98955 {590: 1,0 22: 1,0 9059: 1,0 3: 1,0 2: 1,0 1: 1,0}
Następnie chcę użyć innej pary reduktora mapującego do przetwarzania tych danych
<code>public class UserVectorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> { public void map(VarLongWritable key, VectorWritable value, Context context) throws IOException, InterruptedException { long userID = key.get(); Vector userVector = value.get(); Iterator<Vector.Element> it = userVector.iterateNonZero(); IntWritable itemIndexWritable = new IntWritable(); while (it.hasNext()) { Vector.Element e = it.next(); int itemIndex = e.index(); float preferenceValue = (float) e.get(); itemIndexWritable.set(itemIndex); context.write(itemIndexWritable, new VectorOrPrefWritable(userID, preferenceValue)); } } } </code>
Kiedy próbuję uruchomić zadanie, rzuca błąd
org.apache.hadoop.io.Text nie może być rzutowany na org.apache.mahout.math.VectorWritable
pierwszy reduktor mapujący zapisuje dane wyjściowe w hdfs, a drugi reduktor mapujący próbuje odczytać dane wyjściowe, program odwzorowujący może rzutować 98955 na VarLongWritable, ale nie może przekonwertować {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0} do VectorWritable, więc zastanawiam się, czy istnieje sposób na to, aby pierwszy reduktor mapujący bezpośrednio wysyłał wyjście do drugiej pary, wtedy nie ma potrzeby konwertowania danych. Sprawdziłem Hadoop w akcji i hadoop: ostateczny przewodnik, wydaje się, że nie ma takiego sposobu, żadnych sugestii?
Problem rozwiązanyRozwiązanie: UżywającSequenceFileOutputFormat, możemy wygenerować i zapisać mniejszy wynik pierwszego przepływu pracy MapReduce na DFS, a następnie drugi przepływ pracy MapReduce może odczytać plik tymczasowy jako dane wejściowe przy użyciuSequenceFileInputFormat klasa jako parametr podczas tworzenia programu odwzorowującego. Ponieważ wektor byłby zapisany w pliku sekwencji binarnych, który ma określony format,SequenceFileInputFormat może je odczytać i przekształcić z powrotem do formatu wektorowego.
Oto przykładowy kod:
<code>confFactory ToItemPrefsWorkFlow = new confFactory (new Path("/dbout"), //input file path new Path("/mahout/output.txt"), //output file path TextInputFormat.class, //input format VarLongWritable.class, //mapper key format Item_Score_Writable.class, //mapper value format VarLongWritable.class, //reducer key format VectorWritable.class, //reducer value format **SequenceFileOutputFormat.class** //The reducer output format ); ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class); ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class); JobConf conf1 = ToItemPrefsWorkFlow.getConf(); confFactory UserVectorToCooccurrenceWorkFlow = new confFactory (new Path("/mahout/output.txt"), new Path("/mahout/UserVectorToCooccurrence"), SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class //UserVectorToCooccurrenceMapper.class, IntWritable.class, IntWritable.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class ); UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class); UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class); JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf(); JobClient.runJob(conf1); JobClient.runJob(conf2); </code>
Jeśli masz z tym jakiś problem, skontaktuj się ze mną