El uso de un semáforo dentro de una acción de flujo paralelo anidado de Java 8 puede DEADLOCK. ¿Es esto un error?

Considere la siguiente situación: estamos utilizando una secuencia paralela Java 8 para realizar un ciclo paralelo para cada ciclo, por ejemplo,

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})

El número de subprocesos paralelos está controlado por la propiedad del sistema "java.util.concurrent.ForkJoinPool.common.parallelism" y generalmente es igual al número de procesadores.

Ahora suponga que nos gusta limitar el número de ejecuciones paralelas para un trabajo específico, p. Ej. porque esa parte consume mucha memoria y la restricción de memoria implica un límite de ejecuciones paralelas.

Una forma obvia y elegante de limitar las ejecuciones paralelas es usar un semáforo (sugeridoaquí), por ejemplo, la siguiente porción de código limita el número de ejecuciones paralelas a 5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });

¡Esto funciona bien!

Sin embargo: Usar cualquier otra secuencia paralela dentro del trabajador (en/* WORK DONE HERE */) puede resultar en unpunto muerto.

Para mí este es un comportamiento inesperado.

Explicación: Dado que las transmisiones Java utilizan un grupo ForkJoin, forEach interno se bifurca y la unión parece estar esperando para siempre. Sin embargo, este comportamiento aún es inesperado. Tenga en cuenta que las transmisiones paralelas incluso funcionan si configura"java.util.concurrent.ForkJoinPool.common.parallelism" a 1.

Tenga en cuenta también que puede no ser transparente si hay un paralelo interno para cada uno.

Pregunta: ¿Es este comportamiento de acuerdo con la especificación Java 8 (en ese caso implicaría que está prohibido el uso de semáforos dentro de flujos paralelos de trabajadores) o es un error?

Para mayor comodidad: a continuación se muestra un caso de prueba completo. Cualquier combinación de los dos booleanos funciona, excepto "verdadero, verdadero", que da como resultado el punto muerto.

Aclaración: Para aclarar el punto, permítanme enfatizar un aspecto: el punto muerto no ocurre en elacquire del semáforo Tenga en cuenta que el código consta de

adquirir semáforoejecuta un códigoliberar el semáforo

y el punto muerto se produce en 2. si ese fragmento de código está utilizando OTRO flujo paralelo. Entonces el punto muerto ocurre dentro de esa OTRA transmisión. Como consecuencia, parece que no está permitido usar flujos paralelos anidados y operaciones de bloqueo (como un semáforo) juntos.

Tenga en cuenta que está documentado que las transmisiones paralelas usan un ForkJoinPool y que ForkJoinPool y Semaphore pertenecen al mismo paquete:java.util.concurrent (entonces uno esperaría que interoperen bien).

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].
 *
 * Created on 03.05.2014
 */
package net.finmath.experiments.concurrency;

import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;

/**
 * This is a test of Java 8 parallel streams.
 * 
 * The idea behind this code is that the Semaphore concurrentExecutions
 * should limit the parallel executions of the outer forEach (which is an
 * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
 * the parallel executions of the outer forEach should be limited due to a
 * memory constrain).
 * 
 * Inside the execution block of the outer forEach we use another parallel stream
 * to create an inner forEach. The number of concurrent
 * executions of the inner forEach is not limited by us (it is however limited by a
 * system property "java.util.concurrent.ForkJoinPool.common.parallelism").
 * 
 * Problem: If the semaphore is used AND the inner forEach is active, then
 * the execution will be DEADLOCKED.
 * 
 * Note: A practical application is the implementation of the parallel
 * LevenbergMarquardt optimizer in
 * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
 * In one application the number of tasks in the outer and inner loop is very large (>1000)
 * and due to memory limitation the outer loop should be limited to a small (5) number
 * of concurrent executions.
 * 
 * @author Christian Fries
 */
public class ForkJoinPoolTest {

    public static void main(String[] args) {

        // Any combination of the booleans works, except (true,true)
        final boolean isUseSemaphore    = true;
        final boolean isUseInnerStream  = true;

        final int       numberOfTasksInOuterLoop = 20;              // In real applications this can be a large number (e.g. > 1000).
        final int       numberOfTasksInInnerLoop = 100;             // In real applications this can be a large number (e.g. > 1000).
        final int       concurrentExecusionsLimitInOuterLoop = 5;
        final int       concurrentExecutionsLimitForStreams = 10;

        final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

            if(isUseSemaphore) {
                concurrentExecutions.acquireUninterruptibly();
            }

            try {
                System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());

                if(isUseInnerStream) {
                    runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
                }
                else {
                    try {
                        Thread.sleep(10*numberOfTasksInInnerLoop);
                    } catch (Exception e) {
                    }
                }
            }
            finally {
                if(isUseSemaphore) {
                    concurrentExecutions.release();
                }
            }
        });

        System.out.println("D O N E");
    }

    /**
     * Runs code in a parallel forEach using streams.
     * 
     * @param numberOfTasksInInnerLoop Number of tasks to execute.
     */
    private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
        IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
            try {
                Thread.sleep(10);
            } catch (Exception e) {
            }
        });
    }
}

Respuestas a la pregunta(3)

Su respuesta a la pregunta