Использование семафора во вложенном потоке параллельных потоков Java 8 может привести к DEADLOCK. Это ошибка?

Рассмотрим следующую ситуацию: Мы используем параллельный поток Java 8 для выполнения параллельного цикла forEach, например,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})

Количество параллельных потоков контролируется системным свойством "java.util.concurrent.ForkJoinPool.common.parallelism" и обычно равно количеству процессоров.

Теперь предположим, что мы хотели бы ограничить количество параллельных выполнений для конкретного произведения - например, потому что эта часть интенсивно использует память, а ограничение памяти подразумевает ограничение параллельных выполнений.

Очевидный и элегантный способ ограничить параллельное выполнение - использовать семафор (рекомендуетсяВот), например, следующий фрагмент кода ограничивает число параллельных выполнений 5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });

Это работает просто отлично!

Однако: Использование любого другого параллельного потока внутри рабочего (в/* WORK DONE HERE */) может привести ктупик.

Для меня это неожиданное поведение.

Объяснение: Поскольку потоки Java используют пул ForkJoin, внутренний forEach разветвляется, и соединение, кажется, ждет вечно. Однако такое поведение все еще неожиданно. Обратите внимание, что параллельные потоки даже работают, если вы установите"java.util.concurrent.ForkJoinPool.common.parallelism" до 1.

Также обратите внимание, что он может быть не прозрачным, если есть внутренняя параллель forEach.

Вопрос: Соответствует ли это поведение спецификации Java 8 (в этом случае это будет означать, что использование семафоров внутри рабочих в параллельных потоках запрещено) или это ошибка?

Для удобства: ниже приведен полный тестовый пример. Любые комбинации двух логических значений работают, кроме «true, true», что приводит к тупику.

Разъяснение: Чтобы прояснить суть, позвольте мне подчеркнуть один аспект: тупик не возникает наacquire семафора. Обратите внимание, что код состоит из

приобрести семафорзапустить кодвыпустить семафор

и тупик возникает в 2. если этот кусок кода использует ДРУГОЙ параллельный поток. Тогда тупик возникает внутри ДРУГОГО потока. Как следствие, оказывается, что нельзя использовать вложенные параллельные потоки и операции блокировки (например, семафор) вместе!

Обратите внимание, что задокументировано, что параллельные потоки используют ForkJoinPool и что ForkJoinPool и Semaphore принадлежат одному и тому же пакету -java.util.concurrent (так что можно ожидать, что они хорошо взаимодействуют).

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].
 *
 * Created on 03.05.2014
 */
package net.finmath.experiments.concurrency;

import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;

/**
 * This is a test of Java 8 parallel streams.
 * 
 * The idea behind this code is that the Semaphore concurrentExecutions
 * should limit the parallel executions of the outer forEach (which is an
 * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
 * the parallel executions of the outer forEach should be limited due to a
 * memory constrain).
 * 
 * Inside the execution block of the outer forEach we use another parallel stream
 * to create an inner forEach. The number of concurrent
 * executions of the inner forEach is not limited by us (it is however limited by a
 * system property "java.util.concurrent.ForkJoinPool.common.parallelism").
 * 
 * Problem: If the semaphore is used AND the inner forEach is active, then
 * the execution will be DEADLOCKED.
 * 
 * Note: A practical application is the implementation of the parallel
 * LevenbergMarquardt optimizer in
 * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
 * In one application the number of tasks in the outer and inner loop is very large (>1000)
 * and due to memory limitation the outer loop should be limited to a small (5) number
 * of concurrent executions.
 * 
 * @author Christian Fries
 */
public class ForkJoinPoolTest {

    public static void main(String[] args) {

        // Any combination of the booleans works, except (true,true)
        final boolean isUseSemaphore    = true;
        final boolean isUseInnerStream  = true;

        final int       numberOfTasksInOuterLoop = 20;              // In real applications this can be a large number (e.g. > 1000).
        final int       numberOfTasksInInnerLoop = 100;             // In real applications this can be a large number (e.g. > 1000).
        final int       concurrentExecusionsLimitInOuterLoop = 5;
        final int       concurrentExecutionsLimitForStreams = 10;

        final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

            if(isUseSemaphore) {
                concurrentExecutions.acquireUninterruptibly();
            }

            try {
                System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());

                if(isUseInnerStream) {
                    runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
                }
                else {
                    try {
                        Thread.sleep(10*numberOfTasksInInnerLoop);
                    } catch (Exception e) {
                    }
                }
            }
            finally {
                if(isUseSemaphore) {
                    concurrentExecutions.release();
                }
            }
        });

        System.out.println("D O N E");
    }

    /**
     * Runs code in a parallel forEach using streams.
     * 
     * @param numberOfTasksInInnerLoop Number of tasks to execute.
     */
    private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
        IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
            try {
                Thread.sleep(10);
            } catch (Exception e) {
            }
        });
    }
}

Ответы на вопрос(3)

Ваш ответ на вопрос