akka-stream - Wie man das letzte Element eines Streams in einem Flow / Graph anders behandelt
Ich versuche Akka Streams zu implementierenFlow
konvertiert einen Stream von JSON-Objekten in einen Stream eines einzelnen Arrays von JSON-Objekten. Ich kann nutzenConcat
, um ein "[" vor und "]" nach sowie @ hinzuzufügZip
, um Kommas zwischen die Elemente einzufügen, aber ich kann nicht herausfinden, wie ich das letzte Komma nicht einfügen kann.
Der Code, den ich bisher habe, ist:
trait JsonStreamSupport {
protected def toJsonArrayString[T : Writes] = Flow[T].map(Json.toJson(_)).map(_.toString()).via(jsonArrayWrapper)
private[this] val jsonArrayWrapper: Flow[String, String, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val start = Source.single("[")
val comma = Source.repeat(",")
val end = Source.single("]")
val concat = b.add(Concat[String](3))
val zip = b.add(Zip[String,String])
comma ~> zip.in1
start ~> concat.in(0)
zip.out.map({case (msg,delim) => msg + delim}) ~> concat.in(1)
end ~> concat.in(2)
FlowShape(zip.in0, concat.out)
})
}
erzeit ist die Ausgabe:[{"key":"value},{"key","value"},]
aber ich brauche es zu sein[{"key":"value},{"key","value"}]
(ohne abschließendes Komma),
Wo jedes Element des Arrays immer noch ein bestimmtes Element des Streams ist, kann es beispielsweise separat über Chunk-HTTP gesendet werden.