Асинхронный итератор
У меня есть следующий код:
while(slowIterator.hasNext()) {
performLengthTask(slowIterator.next());
}
Поскольку итератор и задача медленны, имеет смысл поместить их в отдельные потоки. Вот быстрая и грязная попытка обертки Iterator:
class AsyncIterator<T> implements Iterator<T> {
private final BlockingQueue<T> queue = new ArrayBlockingQueue<T>(100);
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while(delegate.hasNext()) {
queue.put(delegate.next()); // try/catch removed for brevity
}
}
}.start();
}
@Override
public boolean hasNext() {
return true;
}
@Override
public T next() {
return queue.take(); // try/catch removed for brevity
}
// ... remove() throws UnsupportedOperationException
}
Однако в этой реализации отсутствует поддержка hasNext (). Конечно, было бы нормально, чтобы метод hasNext () блокировался, пока не узнает, следует ли возвращать true или нет. Я мог бы иметь объект просмотра в моем AsyncIterator, и я мог бы изменить hasNext (), чтобы взять объект из очереди и заставить next () вернуть этот просмотр. Но это приведет к тому, что hasNext () будет блокироваться бесконечно, если достигнут конец итератора делегата.
Вместо того, чтобы использовать ArrayBlockingQueue, я, конечно, мог бы сам осуществлять связь между потоками:
private static class AsyncIterator<T> implements Iterator<T> {
private final Queue<T> queue = new LinkedList<T>();
private boolean delegateDone = false;
private AsyncIterator(final Iterator<T> delegate) {
new Thread() {
@Override
public void run() {
while (delegate.hasNext()) {
final T next = delegate.next();
synchronized (AsyncIterator.this) {
queue.add(next);
AsyncIterator.this.notify();
}
}
synchronized (AsyncIterator.this) {
delegateDone = true;
AsyncIterator.this.notify();
}
}
}.start();
}
@Override
public boolean hasNext() {
synchronized (this) {
while (queue.size() == 0 && !delegateDone) {
try {
wait();
} catch (InterruptedException e) {
throw new Error(e);
}
}
}
return queue.size() > 0;
}
@Override
public T next() {
return queue.remove();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
Однако все дополнительные синхронизации, ожидания и уведомления на самом деле не делают код более читабельным, и где-то легко спрятать условие гонки.
Есть идеи получше?
ОбновитьДа, я знаю об общих наблюдателях / наблюдаемых моделях. Однако обычные реализации не предусматривают конец потока данных, и они не являются итераторами.
Я специально хочу итератор здесь, потому что на самом деле вышеупомянутый цикл существует во внешней библиотеке, и он хочет итератор.