RxJava - Terminando flujos infinitos

Estoy explorando la programación reactiva y RxJava. Es divertido, pero estoy atascado en un problema para el que no puedo encontrar una respuesta. Mi pregunta básica: ¿cuál es una manera reactiva-apropiada para terminar un Observable que de otra manera se ejecuta infinitamente? También acojo con agrado las críticas y las mejores prácticas reactivas con respecto a mi código.

Como ejercicio, estoy escribiendo una utilidad de cola de archivo de registro. El flujo de líneas en el archivo de registro está representado por unObservable<String>. Para obtener elBufferedReader&nbsp;Para continuar leyendo el texto que se agrega al archivo, ignoro lo habitual.reader.readLine() == null&nbsp;revise la terminación y, en su lugar, interprete que significa que mi hilo debe estar inactivo y esperar más texto del registrador.

Pero mientras pueda terminar el Observer usandotakeUntil, Necesito encontrar una manera limpia de terminar el observador de archivos que de otra manera se ejecuta infinitamente. Puedo escribir mi propiaterminateWatcher&nbsp;método / campo, pero eso rompe la encapsulación observable / observador, y me gustaría ser lo más estricto posible con el paradigma reactivo.

Aquí está elObservable<String>&nbsp;código:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

Aquí está el código del observador que imprime las nuevas líneas como vienen:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

Mis dos preguntas son:

¿Cuál es una manera reactiva consistente de terminar una corriente que de otra manera se ejecuta infinitamente?¿Qué otros errores en mi código te hacen llorar? :)