В Apache Spark я могу легко повторить / вложить SparkContext.parallelize?

Я пытаюсь смоделировать генетическую проблему, которую мы пытаемся решить, постепенно наращивая ее. Я могу успешно запустить примеры PiAverage из Spark examples. Этот пример «бросает дротики» по кругу (10 ^ 6 в нашем случае) и подсчитывает число, которое «приземляется в кругу» для оценки PI

Допустим, я хочу повторить этот процесс 1000 раз (параллельно) и усреднить все эти оценки. Я пытаюсь найти лучший подход, кажется, что будет два вызова для распараллеливания? Вложенные звонки? Разве нет способа объединить карту или уменьшить количество вызовов? Я не вижу этого

Я хочу знать мудрость чего-то вроде идеи ниже. Я думал о том, чтобы отслеживать полученные оценки с помощью аккумулятора. JSC - мой SparkContext, полный код одиночного прогона в конце вопроса, спасибо за любой вклад!

Accumulator<Double> accum = jsc.accumulator(0.0);

// make a list 1000 long to pass to parallelize (no for loops in Spark, right?)
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);

// pass this "dummy list" to parallelize, which then 
// calls a pieceOfPI method to produce each individual estimate  
// accumulating the estimates. PieceOfPI would contain a 
// parallelize call too with the individual test in the code at the end
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES)));

// get the value of the total of PI estimates and print their average
double totalPi = accum.value();

// output the average of averages
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi / HOW_MANY_ESTIMATES);

Это не похоже на то, что матрица или другие ответы, которые я вижу на SO, дают ответ на этот конкретный вопрос, я провел несколько поисков, но я не вижу, как это сделать без «распараллеливания распараллеливания». Это плохая идея?

(и да, я понимаю, математически, я мог бы просто сделать больше оценок и эффективно получить те же результаты :) Пытаясь построить структуру, которую хочет мой босс, еще раз спасибо!

Я поместил здесь всю свою программу для одного теста, если это поможет, без аккумулятора, который я тестировал. Ядром этого станет PieceOfPI ():

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.Accumulable;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;

public class PiAverage implements Serializable {

public static void main(String[] args) {

    PiAverage pa = new PiAverage();
    pa.go();

}

public void go() {

    // should make a parameter like all these finals should be
    // int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
    final int SLICES = 16;

    // how many "darts" are thrown at the circle to get one single Pi estimate
    final int HOW_MANY_DARTS = 1000000;

    // how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi
    final int HOW_MANY_ESTIMATES = 1000;

    SparkConf sparkConf = new SparkConf().setAppName("PiAverage")
        .setMaster("local[4]");

    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw
    List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS);
    for (int i = 0; i < HOW_MANY_DARTS; i++) {
        throwsList.add(i);
    }

    // setup "dummy" ArrayList of size HOW_MANY_ESTIMATES
    List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES);
    for (int i = 0; i < HOW_MANY_ESTIMATES; i++) {
        numberOfEstimates.add(i);
    }

    JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES);

    long totalPi = dataSet.filter(new Function<Integer, Boolean>() {
        public Boolean call(Integer i) {
            double x = Math.random();
            double y = Math.random();
            if (x * x + y * y < 1) {
                return true;
            } else
                return false;
        }
    }).count();

    System.out.println(
            "The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi / (double)HOW_MANY_DARTS);

    jsc.stop();
    jsc.close();
}
}

Ответы на вопрос(1)

Ваш ответ на вопрос