¿Qué determina el número de subprocesos que crea un Java ForkJoinPool?

Por lo que yo había entendidoForkJoinPool, ese grupo crea un número fijo de subprocesos (predeterminado: número de núcleos) y nunca creará más subprocesos (a menos que la aplicación indique la necesidad de usarlosmanagedBlock).

Sin embargo, utilizandoForkJoinPool.getPoolSize() Descubrí que en un programa que crea 30,000 tareas (RecursiveAction), laForkJoinPool la ejecución de esas tareas utiliza 700 subprocesos en promedio (subprocesos contabilizados cada vez que se crea una tarea). Las tareas no hacen E / S, sino pura computación; la única sincronización entre tareas está llamandoForkJoinTask.join() y accediendoAtomicBooleans, es decir, no hay operaciones de bloqueo de hilo.

Ya quejoin() no bloquea el subproceso de llamada tal como lo entiendo, no hay ninguna razón por la que un subproceso en el grupo deba bloquearse, por lo que (suponía) no debería haber ninguna razón para crear más subprocesos (lo que obviamente está sucediendo).

Entonces, ¿por quéForkJoinPool crear tantos hilos? ¿Qué factores determinan el número de hilos creados?

Esperaba que esta pregunta pudiera ser respondida sin publicar un código, pero aquí viene cuando se solicita. Este código es un extracto de un programa de cuatro veces el tamaño, reducido a las partes esenciales; no compila como es. Si lo desea, también puedo publicar el programa completo, por supuesto.

El programa busca en un laberinto una ruta desde un punto de inicio dado hasta un punto final determinado mediante la búsqueda en profundidad. Una solución está garantizada para existir. La lógica principal está en elcompute() método deSolverTask: UNARecursiveAction que comienza en un punto dado y continúa con todos los puntos adyacentes accesibles desde el punto actual. En lugar de crear un nuevoSolverTask en cada punto de bifurcación (lo que crearía demasiadas tareas), empuja a todos los vecinos, excepto a uno, a una pila de seguimiento hacia atrás para que se procese más tarde y continúa con solo un vecino no empujado a la pila. Una vez que llega a un callejón sin salida de esa manera, el punto más recientemente empujado a la pila de seguimiento de retroceso se hace estallar, y la búsqueda continúa desde allí (recortando el camino construido desde el punto de inicio de los taks en consecuencia). Una nueva tarea se crea una vez que una tarea encuentra que su pila de seguimiento es más grande que un cierto umbral; a partir de ese momento, la tarea, mientras continúa saliendo de su pila de seguimiento hasta que se agota, no empujará más puntos a su pila cuando alcance un punto de bifurcación, sino que creará una nueva tarea para cada uno de esos puntos. Por lo tanto, el tamaño de las tareas se puede ajustar utilizando el umbral de límite de pila.

Los números que cité arriba ("30,000 tareas, 700 hilos en promedio") provienen de la búsqueda de un laberinto de 5000x5000 celdas. Entonces, aquí está el código esencial:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}

Respuestas a la pregunta(4)

Su respuesta a la pregunta