CompletableFuture # whenComplete не вызывается, если thenApply используется
У меня есть следующий код (в результатемой предыдущий вопрос), который планирует задачу на удаленном сервере, а затем опрашивает для завершения, используяScheduledExecutorService#scheduleAtFixedRate
, Как только задача завершена, она загружает результат. Я хочу вернутьFuture
абоненту, чтобы он мог решить, когда и как долго блокировать, и дать им возможность отменить задачу.
Моя проблема в том, что если клиент отменяетFuture
вернулсяdownload
метод,whenComplete
блок не выполняется. Если я удалюthenApply
оно делает. Очевидно, я что-то неправильно понимаюFuture
состав ... Что я должен изменить?
public Future<Object> download(Something something) {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
return job.thenApply(this::downloadResult);
}
private CompletableFuture<String> pollForCompletion(String jobId) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
CompletableFuture<String> completionFuture = new CompletableFuture<>();
ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {
if (pollRemoteServer(jobId).equals("COMPLETE")) {
completionFuture.complete(jobId);
}
}, 0, 10, TimeUnit.SECONDS);
completionFuture
.whenComplete((result, thrown) -> {
System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed
checkFuture.cancel(true);
executor.shutdown();
});
return completionFuture;
}
На той же ноте, если я сделаю:
return completionFuture.whenComplete(...)
вместо
completionFuture.whenComplete(...);
return completionFuture;
whenComplete
также никогда не выполняет. Это кажется мне очень нелогичным. Не должно логическиFuture
вернулсяwhenComplete
быть тем, кого я должен держаться?
РЕДАКТИРОВАТЬ:
Я изменил свой код для явного обратного распространения отмены. Это отвратительно и нечитабельно, но это работает, и я не мог найти лучшего способа:
public Future<Object> download(Something something) throws ChartDataGenException, Exception {
String jobId = schedule(something);
CompletableFuture<String> job = pollForCompletion(jobId);
CompletableFuture<Object> resulting = job.thenApply(this::download);
resulting.whenComplete((result, thrown) -> {
if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better
job.cancel(true);
}
});
return resulting;
}
РЕДАКТИРОВАТЬ 2:
Я обнаружилtascalate-параллельныйзамечательная библиотека, обеспечивающая разумную реализациюCompletionStage
с поддержкой зависимых обещаний (черезDependentPromise
класс), который может прозрачно обратно распространять отмены. Кажется, идеально подходит для этого варианта использования.
Этого должно быть достаточно:
DependentPromise
.from(pollForCompletion(jobId))
.thenApply(this::download, true); //true means the cancellation should back-propagate
Заметьте, не проверял этот подход.