Spring Integration Java DSL - настройка агрегатора

У меня очень простой процесс интеграции, когда запрос RESTful пересылается двум провайдерам, используя канал публикации-подписки. Результат обеих служб RESTful затем объединяется в один массив. Эскиз интеграционного потока показан ниже:

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                ).subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class)
                        )
                )
            )
            .aggregate()
            .get();
}

Однако при запуске моего кода результирующий массив содержит элементы, возвращаемые только одной из служб RESTful. Есть ли какой-то шаг настройки, который мне не хватает?

ОБНОВИТЬ

Следующая версия соответствует полному решению с учетом комментариев Артема.

@Bean
IntegrationFlow flow() throws Exception {
    return IntegrationFlows.from("inputChannel-scatter")
            .publishSubscribeChannel(s -> s.applySequence(true)
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider1.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather"))
                .subscribe(f -> f
                        .handle(Http.outboundGateway("http://provider2.com/...")
                                .httpMethod(HttpMethod.GET)
                                .expectedResponseType(ItemDTO[].class))
                        .channel("inputChannel-gather")))
            .get();
}

@Bean
IntegrationFlow gatherFlow() {
    return IntegrationFlows.from("inputChannel-gather")
            .aggregate(a -> a.outputProcessor(g ->  new GenericMessage<ItemDTO[]>(
                        g.getMessages().stream()
                                .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
                                .collect(Collectors.toList()).toArray(new ItemDTO[0]))))
            .get();
}

Ответы на вопрос(1)

Ваш ответ на вопрос