RxJava retryWhen bizarres Verhalten

Ich spiele mit dem RxJava retryWhen Operator. Im Internet wird nur sehr wenig darüber gefunden. Der einzige, der es wert ist, erwähnt zu werden, istDie. Auch das reicht nicht aus, um die verschiedenen Anwendungsfälle zu untersuchen, die ich verstehen möchte. Ich habe auch die asynchrone Ausführung aktiviert und es mit Backoff erneut versucht, um es realistischer zu machen.

Meine Einrichtung ist einfach: Ich habe eine KlasseChuckNorrisJokesRepository, das eine zufällige Anzahl von Chuck Norris-Witzen aus einer JSON-Datei zurückgibt. Meine getestete Klasse istChuckNorrisJokesService was unten gezeigt wird. Folgende Anwendungsfälle interessieren mich:

Erfolg beim ersten Versuch (keine Wiederholungsversuche)Fails after 1 retryVersucht, es dreimal zu wiederholen, ist aber am 2. erfolgreich, daher wird das 3. Mal nicht wiederholtSucceeds on 3rd retry

Hinwei: Das Projekt ist auf meinem @ verfügb GitHub.

ChuckNorrisJokesService.java:

@Slf4j
@Builder
public class ChuckNorrisJokesService {
    @Getter
    private final AtomicReference<Jokes> jokes = new AtomicReference<>(new Jokes());

    private final Scheduler scheduler;
    private final ChuckNorrisJokesRepository jokesRepository;
    private final CountDownLatch latch;
    private final int numRetries;
    private final Map<String, List<String>> threads;

    public static class ChuckNorrisJokesServiceBuilder {
        public ChuckNorrisJokesService build() {
            if (scheduler == null) {
                scheduler = Schedulers.io();
            }

            if (jokesRepository == null) {
                jokesRepository = new ChuckNorrisJokesRepository();
            }

            if (threads == null) {
                threads = new ConcurrentHashMap<>();
            }

            requireNonNull(latch, "CountDownLatch must not be null.");

            return new ChuckNorrisJokesService(scheduler, jokesRepository, latch, numRetries, threads);
        }
    }

    public void setRandomJokes(int numJokes) {
        mergeThreadNames("getRandomJokes");

        Observable.fromCallable(() -> {
            log.debug("fromCallable - before call. Latch: {}.", latch.getCount());
            mergeThreadNames("fromCallable");
            latch.countDown();

            List<Joke> randomJokes = jokesRepository.getRandomJokes(numJokes);
            log.debug("fromCallable - after call. Latch: {}.", latch.getCount());

            return randomJokes;
        }).retryWhen(errors ->
                errors.zipWith(Observable.range(1, numRetries), (n, i) -> i).flatMap(retryCount -> {
                    log.debug("retryWhen. retryCount: {}.", retryCount);
                    mergeThreadNames("retryWhen");

                    return Observable.timer(retryCount, TimeUnit.SECONDS);
                }))
                .subscribeOn(scheduler)
                .subscribe(j -> {
                            log.debug("onNext. Latch: {}.", latch.getCount());
                            mergeThreadNames("onNext");

                            jokes.set(new Jokes("success", j));
                            latch.countDown();
                        },
                        ex -> {
                            log.error("onError. Latch: {}.", latch.getCount(), ex);
                            mergeThreadNames("onError");
                        },
                        () -> {
                            log.debug("onCompleted. Latch: {}.", latch.getCount());
                            mergeThreadNames("onCompleted");

                            latch.countDown();
                        }
                );
    }

    private void mergeThreadNames(String methodName) {
        threads.merge(methodName,
                new ArrayList<>(Arrays.asList(Thread.currentThread().getName())),
                (value, newValue) -> {
                    value.addAll(newValue);

                    return value;
                });
    }
}

Der Kürze halber zeige ich den Spock-Testfall nur für den ersten Anwendungsfall. Sieh mein GitHub für die anderen Testfälle.

def "succeeds on 1st attempt"() {
    setup:
    CountDownLatch latch = new CountDownLatch(2)
    Map<String, List<String>> threads = Mock(Map)
    ChuckNorrisJokesService service = ChuckNorrisJokesService.builder()
            .latch(latch)
            .threads(threads)
            .build()

    when:
    service.setRandomJokes(3)
    latch.await(2, TimeUnit.SECONDS)

    Jokes jokes = service.jokes.get()

    then:
    jokes.status == 'success'
    jokes.count() == 3

    1 * threads.merge('getRandomJokes', *_)
    1 * threads.merge('fromCallable', *_)
    0 * threads.merge('retryWhen', *_)
    1 * threads.merge('onNext', *_)
    0 * threads.merge('onError', *_)
    1 * threads.merge('onCompleted', *_)
}

Dies schlägt fehl mit:

Too few invocations for:

1 * threads.merge('fromCallable', *_)   (0 invocations)
1 * threads.merge('onNext', *_)   (0 invocations)

Was ich erwarte ist, dassfromCallable wird einmal aufgerufen, es gelingt,onNext wird einmal aufgerufen, gefolgt vononCompleted. Was vermisse ich

P.S .: Vollständige Offenlegung - Ich habe diese Frage auch auf @ geposteRxJava GitHub.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage