Как реализовать ExecutorService для выполнения задач на основе ротации?

Я используюjava.util.concurrent.ExecutorService сфиксированный пул потоков выполнить список задач. Мой список задач обычно составляет около 80-150, и я ограничил количество потоков, работающих в любое время, до 10, как показано ниже:

<code>ExecutorService threadPoolService = Executors.newFixedThreadPool(10);

for ( Runnable task : myTasks ) 
{     
    threadPoolService.submit(task); 
}
</code>

Мой вариант использования требует, чтобы даже выполненное задание было повторно отправлено вExecutorService но это должно быть выполнено / принято снова только тогда, когда всеalready представленные задания обслуживаются / завершаются. То есть, в основном, представленные задачи должны выполняться на основе ротации. Следовательно, не будет ниthreadPoolService.shutdown() или жеthreadPoolService.shutdownNow() позвони в этом случае.

У меня вопрос, как мне реализоватьExecutorService обслуживание задач на основе ротации?

Ответы на вопрос(4)

что все задачи выполнены, и повторно отправить их, как только это произойдет, например, вот так:

    List<Future> futures = new ArrayList<>();
    for (Runnable task : myTasks) {
        futures.add(threadPoolService.submit(task));
    }
    //wait until completion of all tasks
    for (Future f : futures) {
        f.get();
    }
    //restart
    ......

EDIT
Кажется, вы хотите повторно отправить задачу, как только она будет выполнена. Вы могли бы использоватьExecutorCompletionService который позволяет получать задачи по мере их выполнения, - см. ниже простой пример с 2 задачами, которые повторно отправляются несколько раз, как только они завершены. Пример вывода:

Task 1 submitted pool-1-thread-1
Task 2 submitted pool-1-thread-2
Task 1 completed pool-1-thread-1
Task 1 submitted pool-1-thread-3
Task 2 completed pool-1-thread-2
Task 1 completed pool-1-thread-3
Task 2 submitted pool-1-thread-4
Task 1 submitted pool-1-thread-5
Task 1 completed pool-1-thread-5
Task 2 completed pool-1-thread-4

public class Test1 {

    public final ConcurrentMap<String, String> concurrentMap = new ConcurrentHashMap<>();
    public final AtomicInteger retries = new AtomicInteger();
    public final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int count = 0;
        List<Runnable> myTasks = new ArrayList<>();
        myTasks.add(getRunnable(1));
        myTasks.add(getRunnable(2));
        ExecutorService threadPoolService = Executors.newFixedThreadPool(10);
        CompletionService<Runnable> ecs = new ExecutorCompletionService<Runnable>(threadPoolService);
        for (Runnable task : myTasks) {
            ecs.submit(task, task);
        }
        //wait until completion of all tasks
        while(count++ < 3) {
            Runnable task = ecs.take().get();
            ecs.submit(task, task);
        }
        threadPoolService.shutdown();
    }

    private static Runnable getRunnable(final int i) {
        return new Runnable() {

            @Override
            public void run() {
                System.out.println("Task " + i + " submitted " + Thread.currentThread().getName() + "  ");
                try {
                    Thread.sleep(500 * i);
                } catch (InterruptedException ex) {
                    System.out.println("Interrupted");
                }
                System.out.println("Task " + i + " completed " + Thread.currentThread().getName() + "  ");
            }
        };
    }
}
 31 авг. 2013 г., 14:55
Здорово! Это то, что я ищу. Спасибо.
 28 апр. 2012 г., 11:38
Смотрите мои правки - я думаю, это то, что вы имели в виду.
 Gnanam28 апр. 2012 г., 08:48
В моем случае мне нужно повторно подавать каждую отдельную задачу по мере ее завершения, но неwait until completion of all tasks, Есть идеи / комментарии?

work queue используется для примераExecutorService, Итак, я предлагаю:

First choose an implementation of java.util.concurrent.BlockingQueue (an example) that provides a circular queue functionality. NOTE, the reason BlockingQueue has been chosen is that to wait until the next task is provided to queue; so, in case of circular + blocking queue, you should be careful how to provide the same behavior and functionality.

Instead of using Executors.new... to create a new ThreadPoolExecutor use a direct constructor such as

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)

Таким образом, если вы не приказываете исполнителюshutdownбудет пытаться получить следующую задачу из очереди для выполнения из своейwork queue который являетсяcircular Контейнер для задач.

Решение Вопроса

где вы можете поместить задание обратно в конец очереди.

public class TaskRepeatingThreadPoolExecutor extends ThreadPoolExecutor {

    public TaskRepeatingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        this.submit(r);
    }
}

Вам, конечно, придется проделать немного больше работы, чтобы создать его самостоятельно без помощиExecutorServiceудобный способ фабрики, но конструкторы достаточно просты, чтобы работать.

 Gnanam28 апр. 2012 г., 08:46
Спасибо за это предложение и дало мне знать о существованииThreadPoolExecutor учебный класс. У меня есть вопрос, что / как / почему мне нужно сдатьBlockingQueue<Runnable> workQueue конструктору. В случаеExecutorServiceЯ использовал, чтобы представить все мои задачи, как этоthreadPoolService.submit(task), Не могу понять об этомBlockingQueue<Runnable> workQueue, Надеюсь, ты сможешь заставить меня понять это.
 05 июн. 2016 г., 06:29
Я не уверен, что этот метод работает, после того, как Execute вызывается с FutureTask (Runnable - Wrapper вокруг фактической задачи), который имеет внутреннее состояние, которое отслеживает состояние завершения. Если он передан, он просто возвращается без выполнения фактической задачи.
 Gnanam30 апр. 2012 г., 12:46
Просто прочитайте оBlockingQueue<Runnable> и думал обновить его здесь.BlockingQueue в основном используется для хранения работы / задачи, которая отправляется исполнителю, когда все потоки в пуле заняты выполнением задач.

которое полностью использует функциональность, существующую в стандартных утилитах параллелизма библиотеки. Он используетCyclicBarrier с классом декоратора задач и барьерным действием, которое повторно передает все задачи:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Rotation {

    private static final class RotationDecorator implements Runnable {
        private final Runnable          task;
        private final CyclicBarrier barrier;


        RotationDecorator( Runnable task, CyclicBarrier barrier ) {
            this.task = task;
            this.barrier = barrier;
        }


        @Override
        public void run() {
            this.task.run();
            try {
                this.barrier.await();
            } catch(InterruptedException e) {
                ; // Consider better exception handling
            } catch(BrokenBarrierException e) {
                ; // Consider better exception handling
            }
        }
    }


    public void startRotation( List<Runnable> tasks ) {
        final ExecutorService threadPoolService = Executors.newFixedThreadPool( 10 );
        final List<Runnable> rotatingTasks = new ArrayList<Runnable>( tasks.size() );
        final CyclicBarrier barrier = new CyclicBarrier( tasks.size(), new Runnable() {
            @Override
            public void run() {
                Rotation.this.enqueueTasks( threadPoolService, rotatingTasks );
            }
        } );
        for(Runnable task : tasks) {
            rotatingTasks.add( new RotationDecorator( task, barrier ) );
        }
        this.enqueueTasks( threadPoolService, rotatingTasks );
    }


    private void enqueueTasks( ExecutorService service, List<Runnable> tasks ) {
        for(Runnable task : tasks) {
            service.submit( task );
        }
    }

}

Ваш ответ на вопрос