Spark java.lang.StackOverflowError

Я использую спарк, чтобы вычислить рейтинг страниц отзывов пользователей, но я продолжаю получать Sparkjava.lang.StackOverflowError когда я запускаю свой код на большом наборе данных (40 тыс. записей). хотя при выполнении кода на небольшом количестве записей он работает нормально.

Пример ввода:

product/productId: B00004CK40   review/userId: A39IIHQF18YGZA   review/profileName: C. A. M. Salas  review/helpfulness: 0/0 review/score: 4.0   review/time: 1175817600 review/summary: Reliable comedy review/text: Nice script, well acted comedy, and a young Nicolette Sheridan. Cusak is in top form.

Код:

public void calculatePageRank() {
    sc.clearCallSite();
    sc.clearJobGroup();

    JavaRDD < String > rddFileData = sc.textFile(inputFileName).cache();
    sc.setCheckpointDir("pagerankCheckpoint/");

    JavaRDD < String > rddMovieData = rddFileData.map(new Function < String, String > () {

        @Override
        public String call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            String movieId = data[0].split(":")[1].trim();
            String userId = data[1].split(":")[1].trim();
            return movieId + "\t" + userId;
        }
    });

    JavaPairRDD<String, Iterable<String>> rddPairReviewData = rddMovieData.mapToPair(new PairFunction < String, String, String > () {

        @Override
        public Tuple2 < String, String > call(String arg0) throws Exception {
            String[] data = arg0.split("\t");
            return new Tuple2 < String, String > (data[0], data[1]);
        }
    }).groupByKey().cache();


    JavaRDD<Iterable<String>> cartUsers = rddPairReviewData.map(f -> f._2());
      List<Iterable<String>> cartUsersList = cartUsers.collect();
      JavaPairRDD<String,String> finalCartesian = null;
      int iterCounter = 0;
      for(Iterable<String> out : cartUsersList){
          JavaRDD<String> currentUsersRDD = sc.parallelize(Lists.newArrayList(out));
          if(finalCartesian==null){
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD);
          }
          else{
              finalCartesian = currentUsersRDD.cartesian(currentUsersRDD).union(finalCartesian);
              if(iterCounter % 20 == 0) {
                  finalCartesian.checkpoint();
              }
          }
      }
      JavaRDD<Tuple2<String,String>> finalCartesianToTuple = finalCartesian.map(m -> new Tuple2<String,String>(m._1(),m._2()));

      finalCartesianToTuple = finalCartesianToTuple.filter(x -> x._1().compareTo(x._2())!=0);
      JavaPairRDD<String, String> userIdPairs = finalCartesianToTuple.mapToPair(m -> new Tuple2<String,String>(m._1(),m._2()));

      JavaRDD<String> userIdPairsString = userIdPairs.map(new Function < Tuple2<String, String>, String > () {

        //Tuple2<Tuple2<MovieId, userId>, Tuple2<movieId, userId>>
          @Override
          public String call (Tuple2<String, String> t) throws Exception {
            return t._1 + " " + t._2;
          }
      });

    try {

//calculate pagerank using this https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
        JavaPageRank.calculatePageRank(userIdPairsString, 100);
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    sc.close();

}

Ответы на вопрос(3)

Spark больше не может отслеживать происхождение. Включите контрольную точку в цикле for, чтобы проверять ваш rdd каждые 10 итераций или около того. Контрольная точка решит проблему. Не забудьте очистить каталог контрольных точек после.

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

когда ваш DAG становится большим и в вашем коде происходит слишком много уровней преобразований. JVM не сможет удерживать операции для выполнения отложенного выполнения, когда действие выполняется в конце.

Контрольная точка является одним из вариантов. Я бы предложил реализовать spark-sql для такого рода агрегатов. Если ваши данные структурированы, попробуйте загрузить их в рамки данных и выполнить группировку и другие функции mysql для достижения этой цели.

которые помогут вам значительно улучшить производительность кода в вашем вопросе.

Кэширование: Кэширование должно использоваться в тех наборах данных, к которым нужно обращаться снова и снова для одних и тех же / разных операций (итерационные алгоритмы.

Примером является СДР.count - чтобы сказать вам количество строк в файле, файл должен быть прочитан. Так что если ты пишешь RDD.count, в этот момент файл будет прочитан, строки будут подсчитаны, и счет будет возвращен.

Что если ты позвонишь в RDD?count снова? То же самое: файл будет прочитан и снова подсчитан. Так что же значит СДР.cache делать? Теперь, если вы запускаете RDD.count в первый раз файл будет загружен, кэширован и подсчитан. Если вы позвоните в RDD.count во второй раз операция будет использовать кеш. Он просто возьмет данные из кэша и посчитает строки, без повторного вычисления.

Узнайте больше о кешированииВот.

В вашем примере кода вы не используете повторно все, что вы кэшировали. Таким образом, вы можете удалить.cache оттуда.

Распараллеливание: В примере кода вы распараллелили каждый отдельный элемент в вашем RDD, который уже является распределенной коллекцией. Я предлагаю вам объединитьrddFileData, rddMovieData а такжеrddPairReviewData шаги, чтобы это произошло за один раз.

Избавляться от.collect так как это возвращает результаты обратно водителю и, возможно, фактическую причину вашей ошибки.

Ваш ответ на вопрос