Spark Streaming: как не перезапускать приемник после сбоя приемника

Мы используем специальный искровой приемник, который считывает потоковые данные по предоставленной http-ссылке. Если предоставленная ссылка http неверна, получатель не работает. Проблема в том, что искра будет непрерывно перезапускать приемник, и приложение никогда не прекратит работу. Вопрос заключается в том, как сообщить Spark о прекращении работы приложения в случае сбоя приемника.

Вот выдержка из нашего пользовательского получателя:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

У нас есть приложение для потокового воспроизведения, которое использует этот приемник:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

Все работает как положено, если у получателя нет ошибок. Если получатель выходит из строя (например, с неправильной http-ссылкой), spark непрерывно перезапускает его, и приложение никогда не завершает работу.

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

Мы просто хотим завершить все приложение, если получатель не работает.

 Jake 09 мар. 2018 г., 04:55
к сожалению, сейчас устарела

Ответы на вопрос(2)

Существует способ управления жизненным циклом пользовательских приложений на основе искрового потока. Определите слушателя прогресса работы для своего приложения и следите за тем, что происходит.

class CustomReceiverListener extends StreamingJobProgressListener {
    private boolean receiverStopped = false;

    public CustomReceiverListener(StreamingContext ssc) { super(ssc);}

    public boolean isReceiverStopped() {
        return receiverStopped;
    }
    @Override
    public void onReceiverStopped(StreamingListenerReceiverStopped receiverStopped) {
        LOG.info("Update the flag field");
        this.receiverStopped = true;
    }
}

И в вашем драйвере инициализируйте поток для мониторинга состоянияreceiverStopped флаг. Драйвер остановит потоковое приложение, когда этот поток будет завершен. (Лучший подход - определить метод обратного вызова, определенный драйвером, который остановит потоковое приложение).

CustomReceiverListener listener = new CustomReceiverListener(ssc);
ssc.addStreamingListener(listener);
ssc.start();
Thread thread = new Thread(() -> {
    while (!listener.isReceiverStopped()) {
        LOG.info("Sleepy head...");
        try {
            Thread.sleep(2 * 1000); /*check after 2 seconds*/
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
thread.start();
thread.join();
LOG.info("Listener asked to die! Going to commit suicide :(");
ssc.stop(true, false);

Замечания: В случае нескольких экземпляров ваших приемников, измените реализациюCustomReceiverListener чтобы убедиться, что все экземпляры получателя остановлены.

 Jake 09 мар. 2018 г., 07:45
Я думаю, что это устарело

Похоже, что планирование в Spark Streaming работает таким образом, что ReceiverTracker будет перезапускать сбойный получатель до тех пор, пока ReceiverTracker сам не остановится.

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala#L618

Чтобы остановить ReceiverTracker, нам нужно остановить все приложение. Таким образом, кажется, нет способа контролировать этот процесс из самого получателя.

Ваш ответ на вопрос