Was bestimmt die Anzahl der Threads, die ein Java ForkJoinPool erstellt?

Soweit ich verstanden hatteForkJoinPoolWenn dieser Pool eine feste Anzahl von Threads erstellt (Standard: Anzahl der Kerne), werden niemals mehr Threads erstellt (es sei denn, die Anwendung gibt an, dass dies erforderlich ist, indem sie verwendetmanagedBlock).

Verwenden Sie jedochForkJoinPool.getPoolSize() Ich habe festgestellt, dass in einem Programm, das 30.000 Aufgaben erstellt (RecursiveAction), dasForkJoinPool Für die Ausführung dieser Aufgaben werden durchschnittlich 700 Threads verwendet (Threads werden jedes Mal gezählt, wenn eine Aufgabe erstellt wird). Die Aufgaben erledigen keine E / A-Vorgänge, sondern reine Berechnungen. Die einzige Synchronisation zwischen Tasks ist das AufrufenForkJoinTask.join() und ZugriffAtomicBooleanes gibt keine Thread-Blockierungsoperationen.

Schon seitjoin() blockiert den aufrufenden Thread nicht, wie ich es verstehe, es gibt keinen Grund, warum ein Thread im Pool jemals blockiert werden sollte, und daher sollte (wie ich angenommen hatte) kein Grund bestehen, weitere Threads zu erstellen (was offensichtlich trotzdem passiert).

Warum also?ForkJoinPool so viele Threads erstellen? Welche Faktoren bestimmen die Anzahl der erstellten Threads?

Ich hatte gehofft, dass diese Frage ohne Postleitzahl beantwortet werden kann, aber hier kommt sie auf Anfrage. Dieser Code ist ein Auszug aus einem Programm mit der vierfachen Größe, das auf die wesentlichen Teile reduziert ist. Es wird nicht so kompiliert, wie es ist. Auf Wunsch kann ich natürlich auch das komplette Programm posten.

Das Programm durchsucht ein Labyrinth nach einem Pfad von einem bestimmten Startpunkt zu einem bestimmten Endpunkt unter Verwendung der Tiefensuche. Eine Lösung ist garantiert. Die Hauptlogik ist in dercompute() Methode vonSolverTask: EINRecursiveAction Das beginnt an einem bestimmten Punkt und setzt sich mit allen Nachbarpunkten fort, die vom aktuellen Punkt aus erreichbar sind. Anstatt ein neues zu erstellenSolverTask An jedem Verzweigungspunkt (der viel zu viele Aufgaben verursachen würde) werden alle Nachbarn mit einer Ausnahme auf einen Backtracking-Stapel verschoben, um später verarbeitet zu werden, und es wird nur der eine Nachbar fortgesetzt, der nicht auf den Stapel verschoben wird. Sobald es auf diese Weise eine Sackgasse erreicht, wird der Punkt, der zuletzt auf den Backtracking-Stapel verschoben wurde, gesprungen, und die Suche wird von dort fortgesetzt (wobei der vom Startpunkt des Takes gebaute Pfad entsprechend gekürzt wird). Eine neue Aufgabe wird erstellt, sobald eine Aufgabe einen Rückverfolgungsstapel findet, der einen bestimmten Schwellenwert überschreitet. Von diesem Zeitpunkt an wird die Aufgabe, während sie weiter von ihrem Backtracking-Stapel springt, bis diese erschöpft ist, beim Erreichen eines Verzweigungspunkts keine weiteren Punkte mehr auf ihren Stapel schieben, sondern für jeden solchen Punkt eine neue Aufgabe erstellen. Somit kann die Größe der Aufgaben unter Verwendung der Stapelgrenzschwelle angepasst werden.

Die oben angegebenen Zahlen ("30.000 Aufgaben, durchschnittlich 700 Threads") stammen aus der Suche in einem Labyrinth von 5000 x 5000 Zellen. Also, hier ist der wesentliche Code:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}

Antworten auf die Frage(4)

Ihre Antwort auf die Frage