Iterador asíncrono

Tengo el siguiente código:

while(slowIterator.hasNext()) {
  performLengthTask(slowIterator.next());
}

Debido a que tanto el iterador como la tarea son lentos, tiene sentido colocarlos en hilos separados. Aquí hay un intento rápido y sucio de un envoltorio de iterador:

class AsyncIterator<T> implements Iterator<T> {
    private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);

    private AsyncIterator(final Iterator<T> delegate) {
      new Thread() {
        @Override
        public void run() {
          while(delegate.hasNext()) {
            queue.put(delegate.next()); // try/catch removed for brevity
          }
        }
      }.start();
    }

    @Override
    public boolean hasNext() {
      return true;
    }

    @Override
    public T next() {
        return queue.take(); // try/catch removed for brevity
    }
    // ... remove() throws UnsupportedOperationException
  }

Sin embargo, esta implementación carece de soporte para "hasNext ()". Por supuesto, estaría bien que el método hasNext () se bloquee hasta que sepa si se debe devolver verdadero o no. Podría tener un objeto Peek en mi AsyncIterator y podría cambiar hasNext () para tomar un objeto de la cola y hacer que next () devuelva este Peek. Pero esto haría que hasNext () se bloquee indefinidamente si se ha llegado al final del iterador delegado.

Por supuesto, en lugar de utilizar el ArrayBlockingQueue, yo mismo podría hacer comunicación por hilos:

private static class AsyncIterator<T> implements Iterator<T> {

  private final Queue<T> queue = new LinkedList<T>();
  private boolean delegateDone = false;

  private AsyncIterator(final Iterator<T> delegate) {
    new Thread() {
      @Override
      public void run() {
        while (delegate.hasNext()) {
          final T next = delegate.next();
          synchronized (AsyncIterator.this) {
            queue.add(next);
            AsyncIterator.this.notify();
          }
        }
        synchronized (AsyncIterator.this) {
          delegateDone = true;
          AsyncIterator.this.notify();
        }
      }
    }.start();
  }

  @Override
  public boolean hasNext() {
    synchronized (this) {
      while (queue.size() == 0 && !delegateDone) {
        try {
          wait();
        } catch (InterruptedException e) {
          throw new Error(e);
        }
      }
    }
    return queue.size() > 0;
  }

  @Override
  public T next() {
    return queue.remove();
  }

  @Override
  public void remove() {
    throw new UnsupportedOperationException();
  }
}

Sin embargo, todas las sincronizaciones, esperas y notificaciones adicionales no hacen que el código sea más legible y es fácil ocultar una condición de carrera en algún lugar.

¿Alguna idea mejor?

Actualizar

Sí, sí conozco los patrones comunes de observador / observable. Sin embargo, las implementaciones habituales no prevén el fin del flujo de datos y no son iteradores.

Específicamente quiero un iterador aquí, porque en realidad el bucle mencionado anteriormente existe en una biblioteca externa y quiere un iterador.

Respuestas a la pregunta(2)

Su respuesta a la pregunta