RxJava - прекращение бесконечных потоков

Я изучаю реактивное программирование и RxJava. Это весело, но я застрял на проблеме, на которую не могу найти ответ. Мой основной вопрос: что такое реактивно-подходящий способ прекратить бесконечно запущенную Наблюдаемую? Я также приветствую критику и передовой опыт в отношении моего кода.

В качестве упражнения я пишу утилиту для файла журнала. Поток строк в файле журнала представленObservable, Чтобы получитьBufferedReader чтобы продолжить чтение текста, который добавляется в файл, я игнорирую обычныйreader.readLine() == null проверка завершения и вместо этого интерпретировать это, чтобы означать, что мой поток должен спать и ждать больше текста журнала.

Но пока я могу прекратить работу Обозревателя, используяtakeUntilМне нужно найти чистый способ прекратить бесконечно работающий наблюдатель файла. Я могу написать свойterminateWatcher метод / поле, но это нарушает инкапсуляцию Observable / Observer - и яЯ хотел бы оставаться настолько строгим к реактивной парадигме, насколько это возможно.

ЗдесьObservable код:

public class FileWatcher implements OnSubscribeFunc {
    private Path path = . . .;

    @Override
    // The  generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

Вот код Observer, который печатает новые строки по мере их поступления:

public static void main(String... args) {
    . . .
    Observable lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

Мои два вопроса:

Что такое реактивно-согласованный способ завершения бесконечно работающего потока?Какие другие ошибки в моем коде заставляют тебя плакать? :)

Ответы на вопрос(1)

Ваш ответ на вопрос