Asynchroniczny Iterator
Mam następujący kod:
while(slowIterator.hasNext()) {
performLengthTask(slowIterator.next());
}
Ponieważ zarówno iterator, jak i zadanie są wolne, sensowne jest umieszczenie ich w osobnych wątkach. Oto szybka i brudna próba owinięcia Iteratora:
class AsyncIterator<T> implements Iterator<T> {
private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while(delegate.hasNext()) {
queue.put(delegate.next()); // try/catch removed for brevity
}
}
}.start();
}
@Override
public boolean hasNext() {
return true;
}
@Override
public T next() {
return queue.take(); // try/catch removed for brevity
}
// ... remove() throws UnsupportedOperationException
}
W tej implementacji brakuje jednak wsparcia dla „hasNext ()”. Oczywiście byłoby dobrze, gdyby metoda hasNext () blokowała się, dopóki nie wie, czy zwrócić true, czy nie. Mógłbym mieć obiekt podglądu w moim AsyncIterator i mógłbym zmienić hasNext (), aby odebrać obiekt z kolejki i mieć next () zwracający ten podgląd. Ale spowodowałoby to, że hasNext () blokowałby się w nieskończoność, jeśli osiągnięty został koniec iteratora delegującego.
Zamiast wykorzystywać ArrayBlockingQueue, mógłbym oczywiście sam wykonać komunikację wątkową:
private static class AsyncIterator<T> implements Iterator<T> {
private final Queue<T> queue = new LinkedList<T>();
private boolean delegateDone = false;
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while (delegate.hasNext()) {
final T next = delegate.next();
synchronized (AsyncIterator.this) {
queue.add(next);
AsyncIterator.this.notify();
}
}
synchronized (AsyncIterator.this) {
delegateDone = true;
AsyncIterator.this.notify();
}
}
}.start();
}
@Override
public boolean hasNext() {
synchronized (this) {
while (queue.size() == 0 && !delegateDone) {
try {
wait();
} catch (InterruptedException e) {
throw new Error(e);
}
}
}
return queue.size() > 0;
}
@Override
public T next() {
return queue.remove();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
Jednak wszystkie dodatkowe synchronizacje, oczekiwania i powiadomienia nie sprawiają, że kod jest bardziej czytelny i łatwo jest gdzieś ukryć warunki wyścigu.
Jakieś lepsze pomysły?
AktualizacjaTak, wiem o wspólnych wzorcach obserwatorów / obserwowalnych. Jednak zwykłe implementacje nie przewidują końca przepływu danych i nie są iteratorami.
W szczególności chcę tutaj iteratora, ponieważ w rzeczywistości wspomniana powyżej pętla istnieje w zewnętrznej bibliotece i chce Iteratora.