Как использовать Akka Streams SourceQueue с PlayFramework
Я хотел бы использовать SourceQueue для динамического добавления элементов в источник Akka Stream. Контроллеру воспроизведения необходим источник, чтобы иметь возможность передавать результат с помощьюchuncked
метод.
Поскольку Play использует свой собственный Akka Stream Sink изнутри, я не могу самостоятельно материализовать исходную очередь, используя Sink, потому что источник будет использован до того, как он будет использованchunked
метод (кроме случаев, когда я использую следующий хак).
Я могу заставить это работать, если я предварительно материализую исходную очередь, используя издателя реактивных потоков, но это своего рода «грязный хак»:
def sourceQueueAction = Action{
val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()
//stupid example to push elements dynamically
val tick = Source.tick(0 second, 1 second, "tick")
tick.runForeach(t => queue.offer(t))
Ok.chunked(Source.fromPublisher(pub))
}
Есть ли более простой способ использовать SourceQueue Akka Streams с PlayFramework?
Спасибо