RxJava - Endlose Streams beenden

Ich erforsche reaktive Programmierung und RxJava. Es macht Spaß, aber ich habe ein Problem, auf das ich keine Antwort finden kann. Meine grundsätzliche Frage: Was ist ein reaktionsgerechter Weg, um ein ansonsten unendlich laufendes Observable zu beenden? Ich begrüße auch Kritik und reaktive Best Practices in Bezug auf meinen Code.

Als Übung schreibe ich ein Protokolldatei-Tail-Dienstprogramm. Der Zeilenstrom in der Protokolldatei wird durch ein dargestelltObservable<String>. Um das zu bekommenBufferedReader Um den der Datei hinzugefügten Text weiter zu lesen, ignoriere ich das Üblichereader.readLine() == null Überprüfen Sie die Beendigung und interpretieren Sie sie stattdessen so, dass mein Thread in den Ruhezustand versetzt und auf weiteren Protokollierungstext wartet.

Aber dabei kann ich den Observer mit beendentakeUntilIch muss einen sauberen Weg finden, um den ansonsten unendlich laufenden Datei-Watcher zu beenden. Ich kann meine eigenen schreibenterminateWatcher method / field, aber das bricht die Observable / Observer-Kapselung - und ich möchte dem reaktiven Paradigma so genau wie möglich folgen.

Hier ist derObservable<String> Code:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

Hier ist der Observer-Code, mit dem die neuen Zeilen sofort gedruckt werden:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

Meine zwei Fragen sind:

Was ist eine reaktiv-konsistente Methode, um einen ansonsten unendlich laufenden Stream zu beenden?Welche anderen Fehler in meinem Code bringen Sie zum Weinen? :)

Antworten auf die Frage(1)

Ihre Antwort auf die Frage