Spark Streaming: cómo no reiniciar el receptor después de la falla del receptor

Estamos utilizando un receptor de chispa personalizado que lee los datos transmitidos desde un enlace http proporcionado. Si el enlace http proporcionado es incorrecto, el receptor falla. El problema es que la chispa reiniciará continuamente el receptor y la aplicación nunca terminará. La pregunta es cómo decirle a Spark que finalice la aplicación si falla el receptor.

Este es un extracto de nuestro receptor personalizado:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

Tenemos una aplicación de transmisión por chispa que utiliza este receptor:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

Todo funciona como se espera si el receptor no tiene errores. Si el receptor falla (por ejemplo, con un enlace http incorrecto), la chispa lo reiniciará continuamente y la aplicación nunca terminará.

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

Solo queremos finalizar la aplicación completa si falla un receptor.

Respuestas a la pregunta(2)

Su respuesta a la pregunta