Как напрямую отправить вывод преобразователя-преобразователя в другой преобразователь-преобразователь, не сохраняя вывод в формате hdf
Problem Solved Eventually проверь мое решение внизу
Недавно я пытался запустить пример рекомендации в chaper6 (листинг 6.1 ~ 6.4) из Mahout в действии. Но я столкнулся с проблемой, и я погуглил, но не могу найти решение.
Вот проблема: у меня есть пара маппер-редуктор
<code>public final class WikipediaToItemPrefsMapper extends Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> { private static final Pattern NUMBERS = Pattern.compile("(\\d+)"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); Matcher m = NUMBERS.matcher(line); m.find(); VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group())); VarLongWritable itemID = new VarLongWritable(); while (m.find()) { itemID.set(Long.parseLong(m.group())); context.write(userID, itemID); } } } public class WikipediaToUserVectorReducer extends Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> { public void reduce(VarLongWritable userID, Iterable<VarLongWritable> itemPrefs, Context context) throws IOException, InterruptedException { Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); for (VarLongWritable itemPref : itemPrefs) { userVector.set((int) itemPref.get(), 1.0f); } context.write(userID, new VectorWritable(userVector)); } } </code>
The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}
Затем я хочу использовать другую пару картограф-редуктор для обработки этих данных
<code>public class UserVectorSplitterMapper extends Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> { public void map(VarLongWritable key, VectorWritable value, Context context) throws IOException, InterruptedException { long userID = key.get(); Vector userVector = value.get(); Iterator<Vector.Element> it = userVector.iterateNonZero(); IntWritable itemIndexWritable = new IntWritable(); while (it.hasNext()) { Vector.Element e = it.next(); int itemIndex = e.index(); float preferenceValue = (float) e.get(); itemIndexWritable.set(itemIndex); context.write(itemIndexWritable, new VectorOrPrefWritable(userID, preferenceValue)); } } } </code>
Когда я пытаюсь запустить задание, появляется ошибка
org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable
первый преобразователь-преобразователь записывает вывод в hdf, а второй преобразователь-преобразователь пытается прочитать вывод, преобразователь может преобразовать 98955 в VarLongWritable, но не может преобразовать {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0} в VectorWritable, поэтому мне интересно, есть ли способ заставить первый преобразователь-преобразователь напрямую отправлять выходные данные во вторую пару, а затем нет необходимости делать преобразование данных. Я посмотрел Hadoop в действии и hadoop: полное руководство, кажется, нет такого способа сделать это, какие-либо предложения?
Problem solvedРешение: с помощьюSequenceFileOutputFormat, мы можем вывести и сохранить результат сокращения первого рабочего процесса MapReduce в DFS, затем второй рабочий процесс MapReduce может прочитать временный файл как ввод с помощьюSequenceFileInputFormat Класс в качестве параметра при создании картографа. Поскольку вектор будет сохранен в двоичном файле последовательности, который имеет определенный формат,SequenceFileInputFormat может прочитать его и преобразовать обратно в векторный формат.
Вот пример кода:
<code>confFactory ToItemPrefsWorkFlow = new confFactory (new Path("/dbout"), //input file path new Path("/mahout/output.txt"), //output file path TextInputFormat.class, //input format VarLongWritable.class, //mapper key format Item_Score_Writable.class, //mapper value format VarLongWritable.class, //reducer key format VectorWritable.class, //reducer value format **SequenceFileOutputFormat.class** //The reducer output format ); ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class); ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class); JobConf conf1 = ToItemPrefsWorkFlow.getConf(); confFactory UserVectorToCooccurrenceWorkFlow = new confFactory (new Path("/mahout/output.txt"), new Path("/mahout/UserVectorToCooccurrence"), SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class //UserVectorToCooccurrenceMapper.class, IntWritable.class, IntWritable.class, IntWritable.class, VectorWritable.class, SequenceFileOutputFormat.class ); UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class); UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class); JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf(); JobClient.runJob(conf1); JobClient.runJob(conf2); </code>
If you have any problem with this please feel free to contact me