От чего зависит количество потоков, создаваемых Java ForkJoinPool?

Насколько я понялForkJoinPoolэтот пул создает фиксированное количество потоков (по умолчанию: количество ядер) и никогда не будет создавать больше потоков (если приложение не указывает на необходимость в них с помощьюmanagedBlock).

Однако, используяForkJoinPool.getPoolSize() Я обнаружил, что в программе, которая создает 30 000 задач (RecursiveAction), тоForkJoinPool при выполнении этих задач в среднем используется 700 потоков (число потоков учитывается при каждом создании задачи). Задачи не выполняют ввод-вывод, а просто вычисляют; единственная межзадачная синхронизация вызываетForkJoinTask.join() и доступAtomicBooleans, то есть нет операций по блокировке потоков.

посколькуjoin() не блокирует вызывающий поток, насколько я понимаю, нет причины, по которой какой-либо поток в пуле должен когда-либо блокироваться, и поэтому (я предполагал) не должно быть никаких причин для создания каких-либо дополнительных потоков (что, очевидно, происходит, тем не менее).

Итак, почемуForkJoinPool создать столько потоков? Какие факторы определяют количество созданных потоков?

Я надеялся, что на этот вопрос можно будет ответить без размещения кода, но здесь он приходит по запросу. Этот код является выдержкой из программы, в четыре раза превышающей размер, уменьшенный до основных частей; он не компилируется как есть. При желании я, конечно, могу выложить полную программу тоже.

Программа ищет в лабиринте путь от заданной начальной точки до заданной конечной точки, используя поиск в глубину. Решение гарантированно существует. Основная логика вcompute() методSolverTask: ARecursiveAction который начинается в некоторой заданной точке и продолжается со всеми соседними точками, достижимыми из текущей точки. Вместо того, чтобы создавать новыйSolverTask в каждой точке ветвления (которая создала бы слишком много задач) он выталкивает всех соседей, кроме одного, в стек обратного отслеживания для последующей обработки и продолжает работу только с одним соседом, не помещенным в стек. Как только он таким образом достигает тупика, точка, недавно добавленная к стеку обратного отслеживания, отбрасывается, и поиск продолжается оттуда (соответственно сокращая путь, построенный из начальной точки taks). Новая задача создается, когда задача обнаруживает, что ее стек отслеживания больше определенного порога; с этого времени задача, продолжая извлекаться из своего стека обратного отслеживания до тех пор, пока она не будет исчерпана, не будет выдвигать дальнейшие точки в свой стек при достижении точки ветвления, но будет создавать новую задачу для каждой такой точки. Таким образом, размер задач может быть скорректирован с использованием порогового предела стека.

Числа, которые я цитировал выше («30 000 задач, в среднем 700 потоков») взяты из поиска в лабиринте 5000x5000 ячеек. Итак, вот основной код:

class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;

/**
 * @return Tries to compute a path through the maze from local start to end
 * and returns that (or null if no such path found)
 */
@Override
public ArrayDeque<Point>  compute() {
    // Is this task still accepting new branches for processing on its own,
    // or will it create new tasks to handle those?
    boolean stillAcceptingNewBranches = true;
    Point current = localStart;
    ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>();  // Path from localStart to (including) current
    ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
    // Used as a stack: Branches not yet taken; solver will backtrack to these branching points later

    Direction[] allDirections = Direction.values();

    while (!current.equals(end)) {
        pathFromLocalStart.addLast(current);
        // Collect current's unvisited neighbors in random order: 
        ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);  
        for (Direction directionToNeighbor: allDirections) {
            Point neighbor = current.getNeighbor(directionToNeighbor);

            // contains() and hasPassage() are read-only methods and thus need no synchronization
            if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
                neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
        }
        // Process unvisited neighbors
        if (neighborsToVisit.size() == 1) {
            // Current node is no branch: Continue with that neighbor
            current = neighborsToVisit.getFirst().getPoint();
            continue;
        }
        if (neighborsToVisit.size() >= 2) {
            // Current node is a branch
            if (stillAcceptingNewBranches) {
                current = neighborsToVisit.removeLast().getPoint();
                // Push all neighbors except one on the backtrack stack for later processing
                for(PointAndDirection neighborAndDirection: neighborsToVisit) 
                    backtrackStack.push(neighborAndDirection);
                if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
                    stillAcceptingNewBranches = false;
                // Continue with the one neighbor that was not pushed onto the backtrack stack
                continue;
            } else {
                // Current node is a branch point, but this task does not accept new branches any more: 
                // Create new task for each neighbor to visit and wait for the end of those tasks
                SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
                int t = 0;
                for(PointAndDirection neighborAndDirection: neighborsToVisit)  {
                    SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
                    task.fork();
                    subTasks[t++] = task;
                }
                for (SolverTask task: subTasks) {
                    ArrayDeque<Point> subTaskResult = null;
                    try {
                        subTaskResult = task.join();
                    } catch (CancellationException e) {
                        // Nothing to do here: Another task has found the solution and cancelled all other tasks
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (subTaskResult != null) { // subtask found solution
                        pathFromLocalStart.addAll(subTaskResult);
                        // No need to wait for the other subtasks once a solution has been found
                        return pathFromLocalStart;
                    }
                } // for subTasks
            } // else (not accepting any more branches) 
        } // if (current node is a branch)
        // Current node is dead end or all its neighbors lead to dead ends:
        // Continue with a node from the backtracking stack, if any is left:
        if (backtrackStack.isEmpty()) {
            return null; // No more backtracking avaible: No solution exists => end of this task
        }
        // Backtrack: Continue with cell saved at latest branching point:
        PointAndDirection pd = backtrackStack.pop();
        current = pd.getPoint();
        Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
        // DEBUG System.out.println("Backtracking to " +  branchingPoint);
        // Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
        while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
            // DEBUG System.out.println("    Going back before " + pathSoFar.peekLast());
            pathFromLocalStart.removeLast();
        }
        // continue while loop with newly popped current
    } // while (current ...
    if (!current.equals(end)) {         
        // this task was interrupted by another one that already found the solution 
        // and should end now therefore:
        return null;
    } else {
        // Found the solution path:
        pathFromLocalStart.addLast(current);
        return pathFromLocalStart;
    }
} // compute()
} // class SolverTask

@SuppressWarnings("serial")
public class ParallelMaze  {

// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;

/**
 * Atomically marks this point as visited unless visited before
 * @return whether the point was visited for the first time, i.e. whether it could be marked
 */
boolean visit(Point p) {
    return  visited[p.getX()][p.getY()].compareAndSet(false, true);
}

public static void main(String[] args) {
    ForkJoinPool pool = new ForkJoinPool();
    ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
    // Start initial task
    long startTime = System.currentTimeMillis();
     // since SolverTask.compute() expects its starting point already visited, 
    // must do that explicitly for the global starting point:
    maze.visit(maze.start);
    maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
    // One solution is enough: Stop all tasks that are still running
    pool.shutdownNow();
    pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
    long endTime = System.currentTimeMillis();
    System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " + 
            width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}

Ответы на вопрос(4)

Ваш ответ на вопрос