Akka Stream + Akka Http - Obter solicitação por erro
Eu tenho o seguinte fluxo que funciona muito bem:
source
.map(x => HttpRequest(uri = x.rawRequest))
.via(Http().outgoingConnection(host, port))
.to(Sink.actorRef(myActor, IsDone))
.run()
e um agente simples para lidar com o status da resposta e a mensagem final quando o fluxo for concluído:
/**
* A simple actor to count how many rows have been processed
* in the complete process given a http status
*
* It also finish the main thread upon a message of type [[IsDone]] is received
*/
class MyActor extends Actor with ActorLogging {
var totalProcessed = 0
def receive = LoggingReceive {
case response: HttpResponse =>
if(response.status.isSuccess()) {
totalProcessed = totalProcessed + 1
} else if(response.status.isFailure()) {
log.error(s"Http response error: ${response.status.intValue()} - ${response.status.reason()}")
} else {
log.error(s"Error: ${response.status.intValue()} - ${response.status.reason()}")
}
case IsDone =>
println(s"total processed: $totalProcessed")
sys.exit()
}
}
case object IsDone
Não sei se essa é a melhor abordagem para contar as coisas e também para lidar com o status da resposta, mas está funcionando até agora.
A questão é como passar a solicitação original ao ator de uma maneira que eu pudesse saber qual solicitação causou um erro específico.
Meu ator poderia esperar o seguinte:
case (request: String, response: HttpResponse) =>
Mas como passar essas informações que eu tenho no início do meu pipeline?
Eu estava pensando emmap
como isso:
source
.map(x => (HttpRequest(uri = x.rawRequest), x.rawRequest))
Mas não tenho idéia de como disparar o fluxo Http.
Alguma sugestão?