RxJava - Endlose Streams beenden
Ich erforsche reaktive Programmierung und RxJava. Es macht Spaß, aber ich habe ein Problem, auf das ich keine Antwort finden kann. Meine grundsätzliche Frage: Was ist ein reaktionsgerechter Weg, um ein ansonsten unendlich laufendes Observable zu beenden? Ich begrüße auch Kritik und reaktive Best Practices in Bezug auf meinen Code.
Als Übung schreibe ich ein Protokolldatei-Tail-Dienstprogramm. Der Zeilenstrom in der Protokolldatei wird durch ein dargestelltObservable<String>
. Um das zu bekommenBufferedReader
Um den der Datei hinzugefügten Text weiter zu lesen, ignoriere ich das Üblichereader.readLine() == null
Überprüfen Sie die Beendigung und interpretieren Sie sie stattdessen so, dass mein Thread in den Ruhezustand versetzt und auf weiteren Protokollierungstext wartet.
Aber dabei kann ich den Observer mit beendentakeUntil
Ich muss einen sauberen Weg finden, um den ansonsten unendlich laufenden Datei-Watcher zu beenden. Ich kann meine eigenen schreibenterminateWatcher
method / field, aber das bricht die Observable / Observer-Kapselung - und ich möchte dem reaktiven Paradigma so genau wie möglich folgen.
Hier ist derObservable<String>
Code:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
Hier ist der Observer-Code, mit dem die neuen Zeilen sofort gedruckt werden:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
Meine zwei Fragen sind:
Was ist eine reaktiv-konsistente Methode, um einen ansonsten unendlich laufenden Stream zu beenden?Welche anderen Fehler in meinem Code bringen Sie zum Weinen? :)