Spark Streaming: como não reiniciar o receptor após a falha do receptor
Estamos usando um receptor spark personalizado que lê dados transmitidos de um link http fornecido. Se o link http fornecido estiver incorreto, o receptor falhará. O problema é que o spark reiniciará continuamente o receptor e o aplicativo nunca será encerrado. A questão é como dizer ao Spark para encerrar o aplicativo se o receptor falhar.
Este é um extrato do nosso receptor personalizado:
def onStart() {
// Start the thread that receives data over a connection
new Thread("Receiver") {
override def run() { receive() }
}.start()
}
private def receive(): Unit = {
....
val response: CloseableHttpResponse = httpclient.execute(req)
try {
val sl = response.getStatusLine()
if (sl.getStatusCode != 200){
val errorMsg = "Error: " + sl.getStatusCode
val thrw = new RuntimeException(errorMsg)
stop(errorMsg, thrw)
} else {
...
store(doc)
}
Temos um aplicativo de streaming de faísca que usa este receptor:
val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()
Tudo funciona como esperado se o receptor não tiver erros. Se o receptor falhar (por exemplo, com um link http errado), o spark o reiniciará continuamente e o aplicativo nunca será encerrado.
16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.
Nós apenas queremos encerrar o aplicativo inteiro se um receptor falhar.