RxJava - Terminando Fluxos Infinitos

Eu estou explorando a programação reativa e o RxJava. É divertido, mas estou preso a um problema para o qual não encontro uma resposta. Minha pergunta básica: qual é uma maneira apropriada de reagir para encerrar um Observável de outra forma infinitamente ativa? Também agradeço críticas e melhores práticas reativas em relação ao meu código.

Como exercício, estou escrevendo um utilitário de cauda de arquivo de log. O fluxo de linhas no arquivo de log é representado por umObservable<String>. Para obter oBufferedReader para continuar lendo o texto que é adicionado ao arquivo, eu ignoro o habitualreader.readLine() == null verificação de terminação e, em vez disso, interpretá-lo para significar que meu thread deve dormir e aguardar por mais texto do logger.

Mas enquanto eu posso terminar o Observer usandotakeUntilPreciso encontrar uma maneira limpa de encerrar o observador de arquivos que, de outra forma, seria executado infinitamente. Eu posso escrever meu próprioterminateWatcher método / campo, mas que quebra o encapsulamento Observable / Observer - e eu gostaria de permanecer tão rígido quanto possível ao paradigma reativo.

Aqui está oObservable<String> código:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

Aqui está o código do Observer que imprime as novas linhas à medida que elas aparecem:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

Minhas duas perguntas são:

O que é uma maneira reativa consistente de terminar um fluxo de execução infinitamente diferente?Que outros erros no meu código fazem você chorar? :)

questionAnswers(1)

yourAnswerToTheQuestion