Akka Stream + Akka Http - Obtener solicitud por error

Tengo la siguiente secuencia que funciona bastante bien:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

y un actor simple para manejar el estado de respuesta y el mensaje final cuando se completa la secuencia:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

No sé si este es el mejor enfoque para contar cosas y también para manejar el estado de respuesta, pero está funcionando hasta ahora.

La pregunta es cómo pasar la solicitud original al actor de una manera que yo pueda saber qué solicitud causó un error específico.

Mi actor podría esperar lo siguiente:

case (request: String, response: HttpResponse) =>

¿Pero cómo pasar esa información que tengo al comienzo de mi cartera?

Estaba pensando enmap Me gusta esto:

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

Pero no tengo idea de cómo disparar el flujo Http.

¿Cualquier sugerencia?

Respuestas a la pregunta(1)

Su respuesta a la pregunta