(Hadoop) MapReduce - Цепочка заданий - JobControl не останавливается

Мне нужно связать два задания MapReduce. Я использовал JobControl, чтобы установить job2 как зависимую от job1. Работает, выходные файлы созданы !! Но это нене останавливайся! В оболочке он остается в таком состоянии:

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1

Как я могу это остановить? Это моя главная.

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Configuration conf2 = new Configuration();

    Job job1 = new Job(conf, "canzoni");
    job1.setJarByClass(CanzoniOrdinate.class);
    job1.setMapperClass(CanzoniMapper.class);
    job1.setReducerClass(CanzoniReducer.class);
    job1.setOutputKeyClass(Text.class);
    job1.setOutputValueClass(IntWritable.class);

    ControlledJob cJob1 = new ControlledJob(conf);
    cJob1.setJob(job1);
    FileInputFormat.addInputPath(job1, new Path(args[0]));
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp"));


    Job job2 = new Job(conf2, "songsort");
    job2.setJarByClass(CanzoniOrdinate.class);
    job2.setMapperClass(CanzoniSorterMapper.class);
    job2.setSortComparatorClass(ReverseOrder.class);
    job2.setInputFormatClass(KeyValueTextInputFormat.class);
    job2.setReducerClass(CanzoniSorterReducer.class);
    job2.setMapOutputKeyClass(IntWritable.class);
    job2.setMapOutputValueClass(Text.class);
    job2.setOutputKeyClass(Text.class);
    job2.setOutputValueClass(IntWritable.class);

    ControlledJob cJob2 = new ControlledJob(conf2);
    cJob2.setJob(job2);
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*"));
    FileOutputFormat.setOutputPath(job2, new Path(args[1]));

    JobControl jobctrl = new JobControl("jobctrl");
    jobctrl.addJob(cJob1);
    jobctrl.addJob(cJob2);
    cJob2.addDependingJob(cJob1);
    jobctrl.run();


    ////////////////
    // NEW CODE ///   
    //////////////


    // delete jobctrl.run();
    Thread t = new Thread(jobctrl);
    t.start();
    String oldStatusJ1 = null;
    String oldStatusJ2 = null;
    while (!jobctrl.allFinished()) {
      String status =cJob1.toString();
      String status2 =cJob2.toString();
      if (!status.equals(oldStatusJ1)) {
        System.out.println(status);
        oldStatusJ1 = status;
      }
      if (!status2.equals(oldStatusJ2)) {
        System.out.println(status2);
        oldStatusJ2 = status2;
      }     
     }
    System.exit(0);

}}

Ответы на вопрос(4)

Ваш ответ на вопрос