Очереди работы производителя / потребителя

Я борюсь с лучшим способом реализовать мой конвейер обработки.

Мои продюсеры передают работу в BlockingQueue. Что касается потребителя, я опрашиваю очередь, упаковываю то, что получаю в задаче Runnable, и отправляю ее в ExecutorService.

while (!isStopping())
{
    String work = workQueue.poll(1000L, TimeUnit.MILLISECONDS);
    if (work == null)
    {
        break;
    }
    executorService.execute(new Worker(work));   // needs to block if no threads!
}

Это не идеально; Конечно, у ExecutorService есть своя собственная очередь, поэтому на самом деле происходит то, что я всегда полностью истощаю свою рабочую очередь и заполняю очередь задач, которая медленно завершается по мере выполнения задач.

Я понимаю, что могу ставить задачи в очередь на стороне продюсера, но на самом деле я бы предпочел этого не делать - мне нравится, что косвенная / изолированная очередь моей работы - это глупые строки; на самом деле это не какое-то дело продюсера. Заставить продюсера поставить в очередь Runnable или Callable нарушает абстракцию, IMHO.

Но я хочу, чтобы общая рабочая очередь представляла текущее состояние обработки. Я хочу иметь возможность блокировать производителей, если потребители не поспевают за ними.

Я бы хотел использовать Executors, но я чувствую, что борюсь с их дизайном. Могу ли я частично выпить Kool-ade или я должен проглотить его? Я ошибаюсь, сопротивляясь задачам в очереди? (Я подозреваю, что мог бы настроить ThreadPoolExecutor для использования очереди из 1 задачи и переопределить его метод execute, чтобы блокировать, а не отклонять при заполнении очереди, но это выглядит брутто.)

Предложения?

Ответы на вопрос(3)

Ваш ответ на вопрос