Как мне лениво объединять потоки?
Я пытаюсь реализовать поток, который использует другой экземпляр себя в своей реализации. К потоку добавлено несколько постоянных элементов (с IntStream.concat), так что это должно работать, пока каскадный поток лениво создает непостоянную часть. Я думаю, используяПерегрузка StreamSupport.intStream принимает поставщика с IntStream.concat (который"создает ленивый каскадный поток") должно быть достаточно ленивым, чтобы создавать второй сплитератор только тогда, когда от него требуются элементы, но даже создание потока (не оценивая его) переполняет стек. Как я могу лениво объединять потоки?
Я пытаюсь перенести потоковое сито с простыми числами изэтот ответ в Java. Это сито использует другой экземпляр себя (ps = postponed_sieve()
в коде Python). Если я сломаю начальные четыре постоянных элемента (yield 2; yield 3; yield 5; yield 7;
) в свой поток, легко реализовать генератор в качестве сплитератора:
/**
* based on https://stackoverflow.com/a/10733621/3614835
*/
static class PrimeSpliterator extends Spliterators.AbstractIntSpliterator {
private static final int CHARACTERISTICS = Spliterator.DISTINCT | Spliterator.IMMUTABLE | Spliterator.NONNULL | Spliterator.ORDERED | Spliterator.SORTED;
private final Map<Integer, Supplier<IntStream>> sieve = new HashMap<>();
private final PrimitiveIterator.OfInt postponedSieve = primes().iterator();
private int p, q, c = 9;
private Supplier<IntStream> s;
PrimeSpliterator() {
super(105097564 /* according to Wolfram Alpha */ - 4 /* in prefix */,
CHARACTERISTICS);
//p = next(ps) and next(ps) (that's Pythonic?)
postponedSieve.nextInt();
this.p = postponedSieve.nextInt();
this.q = p*p;
}
@Override
public boolean tryAdvance(IntConsumer action) {
for (; c > 0 /* overflow */; c += 2) {
Supplier<IntStream> maybeS = sieve.remove(c);
if (maybeS != null)
s = maybeS;
else if (c < q) {
action.accept(c);
return true; //continue
} else {
s = () -> IntStream.iterate(q+2*p, x -> x + 2*p);
p = postponedSieve.nextInt();
q = p*p;
}
int m = s.get().filter(x -> !sieve.containsKey(x)).findFirst().getAsInt();
sieve.put(m, s);
}
return false;
}
}
Моя первая попытка использования метода primes () возвращает IntStream, объединяющий постоянный поток с новым PrimeSpliterator:
public static IntStream primes() {
return IntStream.concat(IntStream.of(2, 3, 5, 7),
StreamSupport.intStream(new PrimeSpliterator()));
}
Вызов primes () приводит к StackOverflowError, потому что primes () всегда создает экземпляр PrimeSpliterator, но инициализатор поля PrimeSpliterator всегда вызывает primes (). Однако существует перегрузка StreamSupport.intStream, которая принимает поставщика, что должно позволить лениво создать PrimeSpliterator:
public static IntStream primes() {
return IntStream.concat(IntStream.of(2, 3, 5, 7),
StreamSupport.intStream(PrimeSpliterator::new, PrimeSpliterator.CHARACTERISTICS, false));
}
Однако вместо этого я получаю StackOverflowError с другой обратной трассировкой (обрезается, как это повторяется). Обратите внимание, что рекурсия целиком заключается в вызове primes () - итератор терминальной операции никогда не вызывается в возвращаемом потоке.
Exception in thread "main" java.lang.StackOverflowError
at java.util.stream.StreamSpliterators$DelegatingSpliterator$OfInt.<init>(StreamSpliterators.java:582)
at java.util.stream.IntPipeline.lazySpliterator(IntPipeline.java:155)
at java.util.stream.IntPipeline$Head.lazySpliterator(IntPipeline.java:514)
at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:352)
at java.util.stream.IntPipeline.spliterator(IntPipeline.java:181)
at java.util.stream.IntStream.concat(IntStream.java:851)
at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
at com.jeffreybosboom.projecteuler.util.Primes$Lambda$1/834600351.get(Unknown Source)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
at java.util.stream.IntStream.concat(IntStream.java:851)
at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
at com.jeffreybosboom.projecteuler.util.Primes$PrimeSpliterator.<init>(Primes.java:32)
at com.jeffreybosboom.projecteuler.util.Primes$Lambda$1/834600351.get(Unknown Source)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.get(StreamSpliterators.java:513)
at java.util.stream.StreamSpliterators$DelegatingSpliterator.estimateSize(StreamSpliterators.java:536)
at java.util.stream.Streams$ConcatSpliterator.<init>(Streams.java:713)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:789)
at java.util.stream.Streams$ConcatSpliterator$OfPrimitive.<init>(Streams.java:785)
at java.util.stream.Streams$ConcatSpliterator$OfInt.<init>(Streams.java:819)
at java.util.stream.IntStream.concat(IntStream.java:851)
at com.jeffreybosboom.projecteuler.util.Primes.primes(Primes.java:22)
Как я могу достаточно лениво объединять потоки, чтобы позволить потоку использовать другую его копию в своей реализации?