RxJava - Terminando flujos infinitos
Estoy explorando la programación reactiva y RxJava. Es divertido, pero estoy atascado en un problema para el que no puedo encontrar una respuesta. Mi pregunta básica: ¿cuál es una manera reactiva-apropiada para terminar un Observable que de otra manera se ejecuta infinitamente? También acojo con agrado las críticas y las mejores prácticas reactivas con respecto a mi código.
Como ejercicio, estoy escribiendo una utilidad de cola de archivo de registro. El flujo de líneas en el archivo de registro está representado por unObservable<String>
. Para obtener elBufferedReader
Para continuar leyendo el texto que se agrega al archivo, ignoro lo habitual.reader.readLine() == null
revise la terminación y, en su lugar, interprete que significa que mi hilo debe estar inactivo y esperar más texto del registrador.
Pero mientras pueda terminar el Observer usandotakeUntil
, Necesito encontrar una manera limpia de terminar el observador de archivos que de otra manera se ejecuta infinitamente. Puedo escribir mi propiaterminateWatcher
método / campo, pero eso rompe la encapsulación observable / observador, y me gustaría ser lo más estricto posible con el paradigma reactivo.
Aquí está elObservable<String>
código:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
Aquí está el código del observador que imprime las nuevas líneas como vienen:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
Mis dos preguntas son:
¿Cuál es una manera reactiva consistente de terminar una corriente que de otra manera se ejecuta infinitamente?¿Qué otros errores en mi código te hacen llorar? :)