Один восходящий поток, подающий несколько нисходящих потоков

У меня есть общая проблема с Streams API, которую я хотел бы решить «эффективно». Предположим, у меня есть (возможно, очень большой, возможно, бесконечный) поток. Я хочу каким-то образом предварительно обработать его, например, отфильтровывая некоторые элементы и изменяя некоторые. Давайте предположим, что эта предварительная обработка сложна, требует много времени и вычислений, поэтому я не хочу делать это дважды.

Далее я хочу сделать два отдельных набора операций с последовательностью элементов и обработать дальний конец каждой отдельной последовательности с помощью другой конструкции типа потока. Для бесконечного потока это будет forEach, для конечного - коллектор или что-то еще.

Ясно, что я мог бы собрать промежуточные результаты в список, а затем перетащить два отдельных потока из этого списка, обрабатывая каждый поток отдельно. Это сработало бы для конечного потока, хотя а) это кажется "уродливым" и б) это потенциально нецелесообразно для очень большого потока, а утончение не будет работать для бесконечного потока.

Я думаю, я мог бы использовать Peek как своего рода "тройник". Затем я мог бы выполнить одну цепочку обработки результатов вниз по течению и каким-то образом заставить Потребителя в поиске выполнить «другую» работу, но теперь этот второй путь больше не является потоком.

Я обнаружил, что могу создать BlockingQueue, использовать peek, чтобы поместить вещи в эту очередь, а затем получить поток из очереди. Это, кажется, хорошая идея и на самом деле работает довольно хорошо, хотя я не понимаю, как поток закрывается (на самом деле, но я не вижу, как). Вот пример кода, иллюстрирующего это:

List<Student> ls = Arrays.asList(
  new Student("Fred", 2.3F)
  // more students (and Student definition) elided ...
);

BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();

ls.stream()
  .peek(s -> {
     try {
       pipe.put(s);
     } catch (InterruptedException ioe) {
       ioe.printStackTrace();
     }
   })
   .forEach(System.out::println);

   new Thread(
     new Runnable() {
       public void run() {
         Map<String, Double> map = 
           pipe.stream()
             .collect(Collectors.groupingBy(s->s.getName(),
                      Collectors.averagingDouble(s->s.getGpa())));
         map.forEach(
           (k,v)->
             System.out.println(
               "Students called " + k 
               + " average " + v));

       }
     }).start();

Итак, первый вопрос: есть ли «лучший» способ сделать это?

Второй вопрос, как, черт возьми, этот поток в BlockingQueue закрывается?

Ура, Тоби

Ответы на вопрос(1)

Ваш ответ на вопрос