Tworzenie dynamicznej (rosnącej / kurczącej się) puli wątków
Muszę zaimplementować pulę wątków w Javie (java.util.concurrent), której liczba wątków ma wartość minimalną w stanie bezczynności, rośnie do górnej granicy (ale nigdy więcej), gdy zadania są przesyłane do niej szybciej niż kończą wykonywanie i zmniejsza się do dolnej granicy, gdy wszystkie zadania zostaną wykonane i nie zostaną przesłane żadne kolejne zadania.
Jak wdrożyłbyś coś takiego? Wyobrażam sobie, że byłby to dość powszechny scenariusz użycia, ale najwyraźniejjava.util.concurrent.Executors
metody fabryczne mogą tworzyć tylko pule o stałym rozmiarze i pule, które rosną bez ograniczeń, gdy przesyłanych jest wiele zadań. TheThreadPoolExecutor
klasa zapewniacorePoolSize
imaximumPoolSize
parametry, ale jego dokumentacja wydaje się sugerować, że jedyny sposób, aby mieć więcej niżcorePoolSize
wątki w tym samym czasie to użycie ograniczonej kolejki zadań, w którym to przypadku, jeśli osiągnąłeśmaximumPoolSize
wątki, odrzucisz pracę, którą sam sobie poradzisz? Wymyśliłem to:
//pool creation
ExecutorService pool = new ThreadPoolExecutor(minSize, maxSize, 500, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(minSize));
...
//submitting jobs
for (Runnable job : ...) {
while (true) {
try {
pool.submit(job);
System.out.println("Job " + job + ": submitted");
break;
} catch (RejectedExecutionException e) {
// maxSize jobs executing concurrently atm.; re-submit new job after short wait
System.out.println("Job " + job + ": rejected...");
try {
Thread.sleep(300);
} catch (InterruptedException e1) {
}
}
}
}
Czy coś przeoczyłem? Czy jest lepszy sposób, aby to zrobić? Ponadto, w zależności od wymagań, może być problematyczne, że powyższy kod nie skończy się przynajmniej (myślę)(total number of jobs) - maxSize
prace zostały zakończone. Więc jeśli chcesz móc przesłać dowolną liczbę zadań do puli i przejść natychmiast, nie czekając na zakończenie żadnego z nich, nie widzę, jak możesz to zrobić bez dedykowanego wątku „suma zadań”, który zarządza wymagana niezwiązana kolejka do przechowywania wszystkich złożonych zadań. AFAICS, jeśli używasz nieograniczonej kolejki dla samego ThreadPoolExecutor, jego liczba wątków nigdy nie wzrośnie poza corePoolSize.