RxJava повторить при странном поведении

Я играю с RxJavaretryWhen оператор. В интернете об этом очень мало. Единственное, что заслуживает упоминания:этот, Это также недостаточно для изучения различных вариантов использования, которые я хотел бы понять. Я также добавил асинхронное выполнение и повторил попытку, чтобы сделать ее более реалистичной.

Моя установка проста: у меня есть классChuckNorrisJokesRepository это возвращает случайное количество шуток Чака Норриса из файла JSON. Мой тестируемый классChuckNorrisJokesService что показано ниже. Интересующие меня варианты использования следующие:

Успешно с 1-й попытки (без попыток)Сбой после 1 попыткиПытается повторить 3 раза, но успешно на 2-м, следовательно, не повторяет 3-й разУспешно на 3-й повтор

Заметка: Проект доступен на моемGitHub.

ChuckNorrisJokesService.java:

@Slf4j
@Builder
public class ChuckNorrisJokesService {
    @Getter
    private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());

    private final Scheduler scheduler;
    private final ChuckNorrisJokesRepository jokesRepository;
    private final CountDownLatch latch;
    private final int numRetries;
    private final Map<String, List<String>> threads;

    public static class ChuckNorrisJokesServiceBuilder {
        public ChuckNorrisJokesService build() {
            if (scheduler == null) {
                scheduler = Schedulers.io();
            }

            if (jokesRepository == null) {
                jokesRepository = new ChuckNorrisJokesRepository();
            }

            if (threads == null) {
                threads = new ConcurrentHashMap<>();
            }

            requireNonNull(latch, "CountDownLatch must not be null.");

            return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
        }
    }

    public void setRandomJokes(int numJokes) {
        mergeThreadNames("getRandomJokes");

        Observable.fromCallable(() -> {
            log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
            mergeThreadNames("fromCallable");
            latch.countDown();

            List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
            log.debug("fromCallable - after call. Latch: {}.", latch.getCount());

            return randomJokes;
        }).retryWhen(errors ->
                errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
                    log.debug("retryWhen. retryCount: {}.", retryCount);
                    mergeThreadNames("retryWhen");

                    return Observable.timer(retryCount, TimeUnit.SECONDS);
                }))
                .subscribeOn(scheduler)
                .subscribe(j -> {
                            log.debug("onNext. Latch: {}.", latch.getCount());
                            mergeThreadNames("onNext");

                            jokes.set(new Jokes("success", j));
                            latch.countDown();
                        },
                        ex -> {
                            log.error("onError. Latch: {}.", latch.getCount(), ex);
                            mergeThreadNames("onError");
                        },
                        () -> {
                            log.debug("onCompleted. Latch: {}.", latch.getCount());
                            mergeThreadNames("onCompleted");

                            latch.countDown();
                        }
                );
    }

    private void mergeThreadNames(String methodName) {
        threads.merge(methodName,
                new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
                (value, newValue) -> {
                    value.addAll(newValue);

                    return value;
                });
    }
}

Для краткости я покажу тестовый пример Спока только для первого варианта использования. См мойGitHub для других тестовых случаев.

def "succeeds on 1st attempt"() {
    setup:
    CountDownLatch latch = new CountDownLatch(2)
    Map<String, List<String>> threads = Mock(Map)
    ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
            .latch(latch)
            .threads(threads)
            .build()

    when:
    service.setRandomJokes(3)
    latch.await(2, TimeUnit.SECONDS)

    Jokes jokes = service.jokes.get()

    then:
    jokes.status == 'success'
    jokes.count() == 3

    1 * threads.merge('getRandomJokes', *_)
    1 * threads.merge('fromCallable', *_)
    0 * threads.merge('retryWhen', *_)
    1 * threads.merge('onNext', *_)
    0 * threads.merge('onError', *_)
    1 * threads.merge('onCompleted', *_)
}

Это не с:

Too few invocations for:

1 * threads.merge('fromCallable', *_)   (0 invocations)
1 * threads.merge('onNext', *_)   (0 invocations)

Я ожидаю, чтоfromCallable называется один раз, это удается,onNext вызывается один раз, затемonCompleted, Что мне не хватает?

П.С .: Полное раскрытие - я также разместил этот вопрос наRxJava GitHub.

Ответы на вопрос(1)

Решение Вопроса

лена ReactiveX Дэвида Карнока.

retryWhen это сложный, возможно, даже глючный оператор. Официальный документ и хотя бы один ответ здесь используютrange оператор, который завершается немедленно, если нет повторных попыток. См мойобсуждение с Дэвидом Карнок.

Код доступен на моемGitHub дополнить следующими тестами:

Успешно с 1-й попытки (без попыток)Сбой после 1 попыткиПытается повторить 3 раза, но успешно на 2-м, следовательно, не повторяет 3-й разУспешно на 3-й повтор
 schwiz16 мар. 2017 г., 19:21
@Abhijit Ваша ссылка на github не работает
 Yaroslav Stavnichiy17 янв. 2017 г., 09:58
Так в чем же причина этого «странного поведения»?
 Abhijit Sarkar17 янв. 2017 г., 10:12
@YaroslavStavnichiy Вы можете перейти по ссылке в моем ответе для деталей, но вкратцеrange завершено сразу же, таким образомonNext и идти прямо кonCompleted, Как я уже сказал в своем ответе, я с тех пор переписал код, чтобы не использоватьrange.
 Abhijit Sarkar17 дек. 2018 г., 19:21
@schwiz Исправлено ..
 Yaroslav Stavnichiy17 янв. 2017 г., 10:21
Я спросил, потому что я не видел, что в вашем ответе, возможно, вам следует обновить его. И почему именноrange завершить немедленно? Это потому чтоnumRetries был установлен на ноль (вы забыли установить его)?
 Abhijit Sarkar17 янв. 2017 г., 21:13
@YaroslavStavnichiy В моем ответе все в порядке "оператор диапазона, который потерпит неудачу, если не будет повторных попыток". Теперь я перефразировал это утверждение, чтобы прояснить его. ДляnumRetries=0Это было преднамеренно (используйте вариант № 1 в моем вопросе).numRetries=0 означает «Я не хочу повторять попытку, я возьму результат первого вызова или потерплю неудачу.

Ваш ответ на вопрос