Как напрямую отправить вывод преобразователя-преобразователя в другой преобразователь-преобразователь, не сохраняя вывод в формате hdf

Problem Solved Eventually проверь мое решение внизу

Недавно я пытался запустить пример рекомендации в chaper6 (листинг 6.1 ~ 6.4) из Mahout в действии. Но я столкнулся с проблемой, и я погуглил, но не могу найти решение.

Вот проблема: у меня есть пара маппер-редуктор

<code>public final class WikipediaToItemPrefsMapper extends
    Mapper<LongWritable, Text, VarLongWritable, VarLongWritable> {

private static final Pattern NUMBERS = Pattern.compile("(\\d+)");

@Override
protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
    String line = value.toString();
    Matcher m = NUMBERS.matcher(line);
    m.find();
    VarLongWritable userID = new VarLongWritable(Long.parseLong(m.group()));
    VarLongWritable itemID = new VarLongWritable();
    while (m.find()) {
        itemID.set(Long.parseLong(m.group()));
        context.write(userID, itemID);
    }
}
}

public class WikipediaToUserVectorReducer
    extends
    Reducer<VarLongWritable, VarLongWritable, VarLongWritable, VectorWritable> {

public void reduce(VarLongWritable userID,
        Iterable<VarLongWritable> itemPrefs, Context context)
        throws IOException, InterruptedException {
    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
    for (VarLongWritable itemPref : itemPrefs) {
        userVector.set((int) itemPref.get(), 1.0f);
    }
    context.write(userID, new VectorWritable(userVector));
}
}
</code>

The reducer output a userID and a userVector and it looks like this: 98955 {590:1.0 22:1.0 9059:1.0 3:1.0 2:1.0 1:1.0}

Затем я хочу использовать другую пару картограф-редуктор для обработки этих данных

<code>public class UserVectorSplitterMapper
    extends
    Mapper<VarLongWritable, VectorWritable, IntWritable, VectorOrPrefWritable> {

public void map(VarLongWritable key, VectorWritable value, Context context)
        throws IOException, InterruptedException {
    long userID = key.get();
    Vector userVector = value.get();
    Iterator<Vector.Element> it = userVector.iterateNonZero();
    IntWritable itemIndexWritable = new IntWritable();
    while (it.hasNext()) {
        Vector.Element e = it.next();
        int itemIndex = e.index();
        float preferenceValue = (float) e.get();
        itemIndexWritable.set(itemIndex);
        context.write(itemIndexWritable, 
                new VectorOrPrefWritable(userID, preferenceValue));
    }
}
}
</code>

Когда я пытаюсь запустить задание, появляется ошибка

org.apache.hadoop.io.Text cannot be cast to org.apache.mahout.math.VectorWritable

первый преобразователь-преобразователь записывает вывод в hdf, а второй преобразователь-преобразователь пытается прочитать вывод, преобразователь может преобразовать 98955 в VarLongWritable, но не может преобразовать {590: 1.0 22: 1.0 9059: 1.0 3: 1.0 2: 1.0 1: 1.0} в VectorWritable, поэтому мне интересно, есть ли способ заставить первый преобразователь-преобразователь напрямую отправлять выходные данные во вторую пару, а затем нет необходимости делать преобразование данных. Я посмотрел Hadoop в действии и hadoop: полное руководство, кажется, нет такого способа сделать это, какие-либо предложения?

Problem solved

Решение: с помощьюSequenceFileOutputFormat, мы можем вывести и сохранить результат сокращения первого рабочего процесса MapReduce в DFS, затем второй рабочий процесс MapReduce может прочитать временный файл как ввод с помощьюSequenceFileInputFormat Класс в качестве параметра при создании картографа. Поскольку вектор будет сохранен в двоичном файле последовательности, который имеет определенный формат,SequenceFileInputFormat может прочитать его и преобразовать обратно в векторный формат.

Вот пример кода:

<code>confFactory ToItemPrefsWorkFlow = new confFactory
            (new Path("/dbout"), //input file path
             new Path("/mahout/output.txt"), //output file path
             TextInputFormat.class, //input format
             VarLongWritable.class, //mapper key format
             Item_Score_Writable.class, //mapper value format
             VarLongWritable.class, //reducer key format
             VectorWritable.class, //reducer value format
             **SequenceFileOutputFormat.class** //The reducer output format             

    );
    ToItemPrefsWorkFlow.setMapper( WikipediaToItemPrefsMapper.class);
    ToItemPrefsWorkFlow.setReducer(WikipediaToUserVectorReducer.class);
    JobConf conf1 = ToItemPrefsWorkFlow.getConf();


    confFactory UserVectorToCooccurrenceWorkFlow = new confFactory
            (new Path("/mahout/output.txt"),
             new Path("/mahout/UserVectorToCooccurrence"),
             SequenceFileInputFormat.class, //notice that the input format of mapper of the second work flow is now SequenceFileInputFormat.class
             //UserVectorToCooccurrenceMapper.class,
             IntWritable.class,
             IntWritable.class,
             IntWritable.class,
             VectorWritable.class,
             SequenceFileOutputFormat.class                                      
             );
     UserVectorToCooccurrenceWorkFlow.setMapper(UserVectorToCooccurrenceMapper.class);
     UserVectorToCooccurrenceWorkFlow.setReducer(UserVectorToCooccurrenceReducer.class);
    JobConf conf2 = UserVectorToCooccurrenceWorkFlow.getConf();

    JobClient.runJob(conf1);
    JobClient.runJob(conf2);
</code>

If you have any problem with this please feel free to contact me

Ответы на вопрос(3)

Ваш ответ на вопрос