TCP-Stream verbrauchen und auf einen anderen Sink umleiten (mit Akka Streams)

Ich versuche mit Akka 2.4.3 einen TCP-Stream auf einen anderen Sink umzuleiten / weiterzuleiten. Das Programm sollte einen Server-Socket öffnen, auf eingehende Verbindungen warten und dann den TCP-Stream verbrauchen. Unser Absender erwartet / akzeptiert keine Antworten von uns, daher senden wir niemals etwas zurück - wir verbrauchen nur den Stream. Nachdem wir den TCP-Stream gerahmt haben, müssen wir die Bytes in etwas Nützlicheres umwandeln und an die Senke senden.

Ich habe bisher Folgendes versucht, aber ich habe vor allem mit dem Teil zu kämpfen, wie ich TCP-Pakete nicht an den Absender zurücksende und die Senke richtig verbinde.

import scala.util.Failure
import scala.util.Success

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow

object TcpConsumeOnlyStreamToSink {
  implicit val system = ActorSystem("stream-system")
  private val log = Logging(system, getClass.getName)    

  //The Sink
  //In reality this is of course a real Sink doing some useful things :-)
  //The Sink accept types of "SomethingMySinkUnderstand"
  val mySink = Sink.ignore;

  def main(args: Array[String]): Unit = {
    //our sender is not interested in getting replies from us
    //so we just want to consume the tcp stream and never send back anything to the sender
    val (address, port) = ("127.0.0.1", 6000)
    server(system, address, port)
  }

  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)

      conn handleWith Flow[ByteString]
      //this is neccessary since we use a self developed tcp wire protocol
      .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
      //here we want to map the raw bytes into something our Sink understands
      .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
      //here we like to connect our Sink to the Tcp Source
      .to(mySink) //<------ NOT COMPILING
    }


    val tcpSource = Tcp().bind(address, port)
    val binding = tcpSource.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }

  }

  class SomethingMySinkUnderstand(x:String) {

  }
}

Update: Fügen Sie dies zu Ihrer build.sbt-Datei hinzu, um die erforderlichen Deps zu erhalten.

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"

Antworten auf die Frage(2)

Ihre Antwort auf die Frage