Akka Stream + Akka Http - Obter solicitação por erro

Eu tenho o seguinte fluxo que funciona muito bem:

source
  .map(x => HttpRequest(uri = x.rawRequest))
  .via(Http().outgoingConnection(host, port))
  .to(Sink.actorRef(myActor, IsDone))
  .run()

e um agente simples para lidar com o status da resposta e a mensagem final quando o fluxo for concluído:

/**
  * A simple actor to count how many rows have been processed
  * in the complete process given a http status
  *
  * It also finish the main thread upon a message of type [[IsDone]] is received
  */
class MyActor extends Actor with ActorLogging {

  var totalProcessed = 0

  def receive = LoggingReceive {

    case response: HttpResponse =>

      if(response.status.isSuccess()) {
        totalProcessed = totalProcessed + 1
      } else if(response.status.isFailure()) {
        log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
      } else {
        log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
      }

    case IsDone =>
      println(s"total processed: $totalProcessed")
      sys.exit()
  }
}

case object IsDone

Não sei se essa é a melhor abordagem para contar as coisas e também para lidar com o status da resposta, mas está funcionando até agora.

A questão é como passar a solicitação original ao ator de uma maneira que eu pudesse saber qual solicitação causou um erro específico.

Meu ator poderia esperar o seguinte:

case (request: String, response: HttpResponse) =>

Mas como passar essas informações que eu tenho no início do meu pipeline?

Eu estava pensando emmap como isso:

source
  .map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))

Mas não tenho idéia de como disparar o fluxo Http.

Alguma sugestão?

questionAnswers(1)

yourAnswerToTheQuestion