Асинхронный итератор

У меня есть следующий код:

while(slowIterator.hasNext()) {
  performLengthTask(slowIterator.next());
}

Поскольку итератор и задача медленны, имеет смысл поместить их в отдельные потоки. Вот быстрая и грязная попытка обертки Iterator:

class AsyncIterator<T> implements Iterator<T> {
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);

    private AsyncIterator(final Iterator<T> delegate) {
      new Thread() {
        @Override
        public void run() {
          while(delegate.hasNext()) {
            queue.put(delegate.next()); // try/catch removed for brevity
          }
        }
      }.start();
    }

    @Override
    public boolean hasNext() {
      return true;
    }

    @Override
    public T next() {
        return queue.take(); // try/catch removed for brevity
    }
    // ... remove() throws UnsupportedOperationException
  }

Однако в этой реализации отсутствует поддержка hasNext (). Конечно, было бы нормально, чтобы метод hasNext () блокировался, пока не узнает, следует ли возвращать true или нет. Я мог бы иметь объект просмотра в моем AsyncIterator, и я мог бы изменить hasNext (), чтобы взять объект из очереди и заставить next () вернуть этот просмотр. Но это приведет к тому, что hasNext () будет блокироваться бесконечно, если достигнут конец итератора делегата.

Вместо того, чтобы использовать ArrayBlockingQueue, я, конечно, мог бы сам осуществлять связь между потоками:

private static class AsyncIterator<T> implements Iterator<T> {

  private final Queue<T> queue = new LinkedList<T>();
  private boolean delegateDone = false;

  private AsyncIterator(final Iterator<T> delegate) {
    new Thread() {
      @Override
      public void run() {
        while (delegate.hasNext()) {
          final T next = delegate.next();
          synchronized (AsyncIterator.this) {
            queue.add(next);
            AsyncIterator.this.notify();
          }
        }
        synchronized (AsyncIterator.this) {
          delegateDone = true;
          AsyncIterator.this.notify();
        }
      }
    }.start();
  }

  @Override
  public boolean hasNext() {
    synchronized (this) {
      while (queue.size() == 0 && !delegateDone) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      }
    }
    return queue.size() > 0;
  }

  @Override
  public T next() {
    return queue.remove();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

Однако все дополнительные синхронизации, ожидания и уведомления на самом деле не делают код более читабельным, и где-то легко спрятать условие гонки.

Есть идеи получше?

Обновить

Да, я знаю об общих наблюдателях / наблюдаемых моделях. Однако обычные реализации не предусматривают конец потока данных, и они не являются итераторами.

Я специально хочу итератор здесь, потому что на самом деле вышеупомянутый цикл существует во внешней библиотеке, и он хочет итератор.

Ответы на вопрос(2)

Ваш ответ на вопрос