Co decyduje o liczbie wątków tworzonych przez Java ForkJoinPool?

O ile zrozumiałemForkJoinPool, ta pula tworzy stałą liczbę wątków (domyślnie: liczba rdzeni) i nigdy nie utworzy więcej wątków (chyba że aplikacja wskazuje na potrzebę użycia tych wątków przy użyciumanagedBlock).

Jednak używającForkJoinPool.getPoolSize() Odkryłem, że w programie, który tworzy 30 000 zadań (RecursiveAction)ForkJoinPool wykonywanie tych zadań średnio wykorzystuje 700 wątków (wątki liczone przy każdym utworzeniu zadania). Zadania nie wykonują operacji we / wy, ale czyste obliczenia; dzwoni jedyna synchronizacja między zadaniamiForkJoinTask.join() i dostępAtomicBooleans, tj. nie ma operacji blokowania wątków.

Odjoin() nie blokuje wywołującego wątku, jak go rozumiem, nie ma powodu, dla którego jakikolwiek wątek w puli miałby kiedykolwiek blokować się, a więc (założyłem) nie powinno być powodu do tworzenia jakichkolwiek dalszych wątków (co oczywiście się dzieje).

Więc dlaczegoForkJoinPool stworzyć tak wiele wątków? Jakie czynniki określają liczbę utworzonych wątków?

Miałem nadzieję, że na to pytanie można odpowiedzieć bez podania kodu, ale tutaj pojawia się na życzenie. Kod ten jest fragmentem programu czterokrotnie większego, zredukowanego do podstawowych części; nie kompiluje się tak, jak jest. W razie potrzeby mogę oczywiście opublikować pełny program.

Program przeszukuje labirynt w poszukiwaniu ścieżki od danego punktu początkowego do danego punktu końcowego, używając wyszukiwania głębokościowego. Gwarantowane jest rozwiązanie. Główna logika jest wcompute() metodaSolverTask: ARecursiveAction który zaczyna się w pewnym danym punkcie i kontynuuje z wszystkimi sąsiadującymi punktami osiągalnymi z bieżącego punktu. Zamiast tworzyć noweSolverTask w każdym punkcie rozgałęzienia (który stworzyłby zbyt wiele zadań), popycha wszystkich sąsiadów, z wyjątkiem jednego do stosu śledzenia wstecznego, który ma być przetworzony później i kontynuuje tylko z jednym sąsiadem, który nie zostanie zepchnięty na stos. Gdy w ten sposób dotrze do ślepego zaułka, punkt ostatnio pchnięty do stosu wycofywania jest wyrzucany, a wyszukiwanie trwa od tego miejsca (odpowiednio wycinając ścieżkę zbudowaną z punktu początkowego zadania). Nowe zadanie jest tworzone, gdy zadanie znajdzie swój stos śledzenia wstecznego większy niż określony próg; od tego czasu zadanie, mimo że nadal wyskakuje ze stosu śledzenia wstecznego, aż do wyczerpania, nie będzie pchać żadnych dalszych punktów do swojego stosu po osiągnięciu punktu rozgałęzienia, ale utworzyć nowe zadanie dla każdego takiego punktu. Tak więc wielkość zadań można dostosować za pomocą progu limitu stosu.

Liczby, które zacytowałem powyżej („30 000 zadań, średnio 700 wątków”), pochodzą z przeszukiwania labiryntu 5000x5000 komórek. Oto podstawowy kod:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}

questionAnswers(4)

yourAnswerToTheQuestion