Один восходящий поток, подающий несколько нисходящих потоков
У меня есть общая проблема с Streams API, которую я хотел бы решить «эффективно». Предположим, у меня есть (возможно, очень большой, возможно, бесконечный) поток. Я хочу каким-то образом предварительно обработать его, например, отфильтровывая некоторые элементы и изменяя некоторые. Давайте предположим, что эта предварительная обработка сложна, требует много времени и вычислений, поэтому я не хочу делать это дважды.
Далее я хочу сделать два отдельных набора операций с последовательностью элементов и обработать дальний конец каждой отдельной последовательности с помощью другой конструкции типа потока. Для бесконечного потока это будет forEach, для конечного - коллектор или что-то еще.
Ясно, что я мог бы собрать промежуточные результаты в список, а затем перетащить два отдельных потока из этого списка, обрабатывая каждый поток отдельно. Это сработало бы для конечного потока, хотя а) это кажется "уродливым" и б) это потенциально нецелесообразно для очень большого потока, а утончение не будет работать для бесконечного потока.
Я думаю, я мог бы использовать Peek как своего рода "тройник". Затем я мог бы выполнить одну цепочку обработки результатов вниз по течению и каким-то образом заставить Потребителя в поиске выполнить «другую» работу, но теперь этот второй путь больше не является потоком.
Я обнаружил, что могу создать BlockingQueue, использовать peek, чтобы поместить вещи в эту очередь, а затем получить поток из очереди. Это, кажется, хорошая идея и на самом деле работает довольно хорошо, хотя я не понимаю, как поток закрывается (на самом деле, но я не вижу, как). Вот пример кода, иллюстрирующего это:
List<Student> ls = Arrays.asList(
new Student("Fred", 2.3F)
// more students (and Student definition) elided ...
);
BlockingQueue<Student> pipe = new LinkedBlockingQueue<>();
ls.stream()
.peek(s -> {
try {
pipe.put(s);
} catch (InterruptedException ioe) {
ioe.printStackTrace();
}
})
.forEach(System.out::println);
new Thread(
new Runnable() {
public void run() {
Map<String, Double> map =
pipe.stream()
.collect(Collectors.groupingBy(s->s.getName(),
Collectors.averagingDouble(s->s.getGpa())));
map.forEach(
(k,v)->
System.out.println(
"Students called " + k
+ " average " + v));
}
}).start();
Итак, первый вопрос: есть ли «лучший» способ сделать это?
Второй вопрос, как, черт возьми, этот поток в BlockingQueue закрывается?
Ура, Тоби