Die Verwendung eines Semaphors in einer verschachtelten Java 8-Aktion für parallele Streams kann zu DEADLOCK führen. Ist das ein Bug?

Betrachten Sie die folgende Situation: Wir verwenden einen parallelen Java 8-Stream, um eine parallele forEach-Schleife auszuführen, z.

IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})

Die Anzahl der parallelen Threads wird von der Systemeigenschaft "java.util.concurrent.ForkJoinPool.common.parallelism" gesteuert und entspricht normalerweise der Anzahl der Prozessoren.

Angenommen, wir möchten die Anzahl der parallelen Ausführungen für eine bestimmte Arbeit begrenzen, z. weil dieser Teil speicherintensiv ist und Speicherbeschränkungen eine Begrenzung paralleler Ausführungen implizieren.

Eine naheliegende und elegante Möglichkeit, die parallele Ausführung einzuschränken, ist die Verwendung eines Semaphors (empfohlen)Hier), beispielsweise begrenzt der folgende Codeteil die Anzahl der parallelen Ausführungen auf 5:

        final Semaphore concurrentExecutions = new Semaphore(5);
        IntStream.range(0,20).parallel().forEach(i -> {

            concurrentExecutions.acquireUninterruptibly();

            try {
                /* WORK DONE HERE */
            }
            finally {
                concurrentExecutions.release();
            }
        });

Das funktioniert gut!

Jedoch: Verwenden eines anderen parallelen Streams innerhalb des Workers (at/* WORK DONE HERE */) kann zu a führenSackgasse.

Für mich ist das ein unerwartetes Verhalten.

Erläuterung: Da Java-Streams einen ForkJoin-Pool verwenden, gabelt sich der innere forEach und der Join scheint auf immer zu warten. Dieses Verhalten ist jedoch immer noch unerwartet. Beachten Sie, dass parallele Streams auch funktionieren, wenn Sie festlegen"java.util.concurrent.ForkJoinPool.common.parallelism" bis 1.

Beachten Sie auch, dass es möglicherweise nicht transparent ist, wenn für jeden eine innere Parallele vorhanden ist.

Frage: Stimmt dieses Verhalten mit der Java 8-Spezifikation überein (in diesem Fall bedeutet dies, dass die Verwendung von Semaphoren in Workern mit parallelen Streams verboten ist), oder handelt es sich um einen Fehler?

Der Einfachheit halber: Nachfolgend finden Sie einen vollständigen Testfall. Alle Kombinationen der beiden Booleschen Werte funktionieren, mit Ausnahme von "true, true", was zu einem Deadlock führt.

Klärung: Lassen Sie mich zur Verdeutlichung einen Aspekt hervorheben: Der Deadlock tritt am nicht aufacquire des Semaphors. Beachten Sie, dass der Code aus besteht

Semaphor erwerbenFühren Sie einen Code ausSemaphor freigeben

und der Deadlock tritt bei 2 auf, wenn dieses Codestück einen anderen parallelen Stream verwendet. Dann tritt der Deadlock in diesem ANDEREN Stream auf. Infolgedessen ist es anscheinend nicht erlaubt, verschachtelte parallele Streams und Blockierungsoperationen (wie ein Semaphor) zusammen zu verwenden!

Beachten Sie, dass dokumentiert ist, dass parallele Streams einen ForkJoinPool verwenden und dass ForkJoinPool und Semaphore zum selben Paket gehören -java.util.concurrent (so würde man erwarten, dass sie gut zusammenarbeiten).

/*
 * (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].
 *
 * Created on 03.05.2014
 */
package net.finmath.experiments.concurrency;

import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;

/**
 * This is a test of Java 8 parallel streams.
 * 
 * The idea behind this code is that the Semaphore concurrentExecutions
 * should limit the parallel executions of the outer forEach (which is an
 * <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
 * the parallel executions of the outer forEach should be limited due to a
 * memory constrain).
 * 
 * Inside the execution block of the outer forEach we use another parallel stream
 * to create an inner forEach. The number of concurrent
 * executions of the inner forEach is not limited by us (it is however limited by a
 * system property "java.util.concurrent.ForkJoinPool.common.parallelism").
 * 
 * Problem: If the semaphore is used AND the inner forEach is active, then
 * the execution will be DEADLOCKED.
 * 
 * Note: A practical application is the implementation of the parallel
 * LevenbergMarquardt optimizer in
 * {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
 * In one application the number of tasks in the outer and inner loop is very large (>1000)
 * and due to memory limitation the outer loop should be limited to a small (5) number
 * of concurrent executions.
 * 
 * @author Christian Fries
 */
public class ForkJoinPoolTest {

    public static void main(String[] args) {

        // Any combination of the booleans works, except (true,true)
        final boolean isUseSemaphore    = true;
        final boolean isUseInnerStream  = true;

        final int       numberOfTasksInOuterLoop = 20;              // In real applications this can be a large number (e.g. > 1000).
        final int       numberOfTasksInInnerLoop = 100;             // In real applications this can be a large number (e.g. > 1000).
        final int       concurrentExecusionsLimitInOuterLoop = 5;
        final int       concurrentExecutionsLimitForStreams = 10;

        final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);

        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
        System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));

        IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {

            if(isUseSemaphore) {
                concurrentExecutions.acquireUninterruptibly();
            }

            try {
                System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());

                if(isUseInnerStream) {
                    runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
                }
                else {
                    try {
                        Thread.sleep(10*numberOfTasksInInnerLoop);
                    } catch (Exception e) {
                    }
                }
            }
            finally {
                if(isUseSemaphore) {
                    concurrentExecutions.release();
                }
            }
        });

        System.out.println("D O N E");
    }

    /**
     * Runs code in a parallel forEach using streams.
     * 
     * @param numberOfTasksInInnerLoop Number of tasks to execute.
     */
    private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
        IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
            try {
                Thread.sleep(10);
            } catch (Exception e) {
            }
        });
    }
}

Antworten auf die Frage(3)

Ihre Antwort auf die Frage