@etiennepeiniau Я полагаю, что в простом Flux это не "параллельно == больше потоков". Я отредактирую свой ответ и добавлю пример.

аюсь интегрировать блокирующий потребитель какFlux Абонент в Реакторе Алюминий-SR1. Я хотел бы использовать параллельпланировщик, чтобы выполнить операции блокировки одновременно.

Я реализовал основной класс, чтобы описать свое намерение:

package etienne.peiniau;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
                .elapsed()
                .publishOn(Schedulers.parallel())
                .subscribe(new Subscriber<Tuple2<Long, Integer>>() {
                    @Override
                    public void onSubscribe(Subscription subscription) {
                        System.out.println("[" + Thread.currentThread().getName() + "] Subscription");
                        subscription.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Tuple2<Long, Integer> t2) {
                        System.out.println("[" + Thread.currentThread().getName() + "] " + t2);
                        try {
                            Thread.sleep(1000); // long operation
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        System.err.println("[" + Thread.currentThread().getName() + "] Error: " + throwable.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("[" + Thread.currentThread().getName() + "] Complete");
                    }
                });
        // Waiting for the program to complete
        System.out.println("[" + Thread.currentThread().getName() + "] Main");
        Thread.sleep(100000);
    }

}

Вывод этого кода следующий:

[main] Subscription
[main] Main
[parallel-1] [3,1]
[parallel-1] [1000,2]
[parallel-1] [1001,3]
[parallel-1] [1000,4]
[parallel-1] [1000,5]
[parallel-1] [1000,6]
[parallel-1] [1001,7]
[parallel-1] [1000,8]
[parallel-1] [1000,9]
[parallel-1] [1000,10]
[parallel-1] [1000,11]
[parallel-1] [1001,12]
[parallel-1] [1000,13]
[parallel-1] [1000,14]
[parallel-1] [1000,15]
[parallel-1] [1000,16]
[parallel-1] [1001,17]
[parallel-1] [1000,18]
[parallel-1] [1000,19]
[parallel-1] [1000,20]
[parallel-1] Complete

Моя проблема в том, что длинная операция всегда выполняется в потоке параллельно-1 и каждую 1 секунду.

Я пытался увеличить параллельность вручную или использовать эластикпланировщик, но результат тот же.

Я думал, что метод publishOn был специально разработан для этого случая использования. Можете ли вы сказать мне, если я что-то неправильно понял?

Ответы на вопрос(1)

Ваш ответ на вопрос