Еще раз работает как шарм. Я понимаю, почему должна вызываться функция complete (), но как получить эти знания другими средствами, кроме stackoverflow. Я имею в виду, есть ли какое-то общее правило? Или ошибка заключалась в том, что complete () должен вызываться всегда при работе на FluxSink после того, как все элементы были отправлены, точка.

приведен пример печати целых чисел от 1 до 10 и списка (7, 8, 9, 10)

public void streamCollect() {

    ConnectableFlux<Integer> connect = Flux.range(1, 10)
            .publish();

    connect.subscribe(v -> System.out.println("1: " + v));

    connect
            .filter(number -> number > 6)
            .collectList()
            .subscribe(v -> System.out.println("4: " + v));

    connect.connect();
}

Результат:

1: 1

1: 2

1: 3

1: 4

1: 5

1: 6

1: 7

1: 8

1: 9

1: 10

4: [7, 8, 9, 10]

Следующий пример должен привести к тому же результату, но вместо этого вывести только цифры от 1 до 10, но не список. Почему?

public void streamCollect() {

    ConnectableFlux<Integer> connect = Flux.<Integer>create(emitter -> {

        Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .forEach(t -> emitter.next(t));
    }).publish();

    connect.subscribe(v -> System.out.println("1: " + v));

    connect
            .filter(number -> number > 6)
            .collectList()
            .subscribe(v -> System.out.println("4: " + v));

    connect.connect();
}

Результат:

1: 1

1: 2

1: 3

1: 4

1: 5

1: 6

1: 7

1: 8

1: 9

1: 10

Ответы на вопрос(1)

Ваш ответ на вопрос