Spark Streaming: как не перезапускать приемник после сбоя приемника

Мы используем специальный искровой приемник, который считывает потоковые данные по предоставленной http-ссылке. Если предоставленная ссылка http неверна, получатель не работает. Проблема в том, что искра будет непрерывно перезапускать приемник, и приложение никогда не прекратит работу. Вопрос заключается в том, как сообщить Spark о прекращении работы приложения в случае сбоя приемника.

Вот выдержка из нашего пользовательского получателя:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

У нас есть приложение для потокового воспроизведения, которое использует этот приемник:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

Все работает как положено, если у получателя нет ошибок. Если получатель выходит из строя (например, с неправильной http-ссылкой), spark непрерывно перезапускает его, и приложение никогда не завершает работу.

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

Мы просто хотим завершить все приложение, если получатель не работает.

Ответы на вопрос(2)

Ваш ответ на вопрос