Akka Stream + Akka Http - Anfrage bei Fehler abrufen

Ich habe den folgenden Stream, der ziemlich gut funktioniert:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

und ein einfacher Akteur, der den Antwortstatus und die endgültige Nachricht behandelt, wenn der Stream abgeschlossen ist:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

Ich weiß nicht, ob dies der beste Ansatz ist, um Dinge zu zählen und auch den Antwortstatus zu behandeln, aber er funktioniert bisher.

Die Frage ist, wie ich die ursprüngliche Anfrage an den Schauspieler weitergeben kann, damit ich weiß, welche Anfrage einen bestimmten Fehler verursacht hat.

Mein Schauspieler könnte stattdessen Folgendes erwarten:

case (request: String, response: HttpResponse) =>

Aber wie übergebe ich die Informationen, die ich am Anfang meiner Pipeline habe?

Ich dachte anmap so was

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

Aber ich habe keine Ahnung, wie der HTTP-Fluss ausgelöst werden soll.

Irgendein Vorschlag

Antworten auf die Frage(2)

Ihre Antwort auf die Frage