Um fluxo a montante que alimenta vários fluxos a jusante

Eu tenho um problema geral da API do Streams que gostaria de resolver "com eficiência". Suponha que eu tenha um fluxo (possivelmente muito grande, possivelmente infinito). Quero pré-processá-lo de alguma forma, por exemplo, filtrando alguns itens e alterando alguns. Vamos supor que esse pré-processamento seja complexo, demorado e computacional, por isso não quero fazer isso duas vezes.

Em seguida, quero fazer dois conjuntos distintos de operações na sequência de itens e processar o extremo de cada sequência distinta com uma construção diferente do tipo fluxo. Para um fluxo infinito, isso seria um forEach, para um finito, poderia ser um colecionador ou qualquer outra coisa.

Claramente, posso coletar os resultados intermediários em uma lista e arrastar dois fluxos separados para fora dessa lista, processando cada fluxo individualmente. Isso funcionaria para um fluxo finito, embora a) pareça "feio" eb) seja potencialmente impraticável para um fluxo muito grande, e o flat flat não funcionará para um fluxo infinito.

Eu acho que eu poderia usar espiar como uma espécie de "tee". Eu poderia então executar uma cadeia de processamento nos resultados a jusante da espiada e de alguma forma coagir o Consumidor a espiar para fazer o "outro" trabalho, mas agora esse segundo caminho não é mais um fluxo.

Descobri que posso criar um BlockingQueue, usar espiar para enviar coisas para essa fila e obter um fluxo da fila. Essa parece ser uma boa idéia e realmente funciona muito bem, embora eu não consiga entender como o fluxo é fechado (na verdade, mas não consigo ver como). Aqui está um código de exemplo que ilustra isso:

List<Student> ls = Arrays.asList(
  new Student("Fred", 2.3F)
  // more students (and Student definition) elided ...
);

BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();

ls.stream()
  .peek(s -> {
     try {
       pipe.put(s);
     } catch (InterruptedException ioe) {
       ioe.printStackTrace();
     }
   })
   .forEach(System.out::println);

   new Thread(
     new Runnable() {
       public void run() {
         Map<String, Double> map = 
           pipe.stream()
             .collect(Collectors.groupingBy(s->s.getName(),
                      Collectors.averagingDouble(s->s.getGpa())));
         map.forEach(
           (k,v)->
             System.out.println(
               "Students called " + k 
               + " average " + v));

       }
     }).start();

Então, a primeira pergunta é: existe uma maneira "melhor" de fazer isso?

Segunda pergunta, como diabos esse fluxo no BlockingQueue está sendo fechado?

Saúde, Toby

questionAnswers(1)

yourAnswerToTheQuestion