Spark Streaming: So starten Sie den Empfänger nach einem Empfängerausfall nicht neu

Wir verwenden einen benutzerdefinierten Funkenempfänger, der gestreamte Daten von einem bereitgestellten http-Link liest. Wenn der angegebene http-Link falsch ist, fällt der Empfänger aus. Das Problem ist, dass der Funke den Empfänger ständig neu startet und die Anwendung niemals beendet wird. Die Frage ist, wie Spark angewiesen wird, die Anwendung zu beenden, wenn der Empfänger ausfällt.

Dies ist ein Auszug aus unserem benutzerdefinierten Empfänger:

 def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Receiver") {
      override def run() { receive() }
    }.start()
  }

  private def receive(): Unit = {
    ....
    val response: CloseableHttpResponse = httpclient.execute(req)
    try {
      val sl = response.getStatusLine()
      if (sl.getStatusCode != 200){
        val errorMsg = "Error: " + sl.getStatusCode 
        val thrw = new RuntimeException(errorMsg)
        stop(errorMsg, thrw)
      } else {
      ...
        store(doc)
      }

Wir haben eine Spark-Streaming-Anwendung, die diesen Empfänger verwendet:

val ssc = new StreamingContext(sparkConf, duration)
val changes = ssc.receiverStream(new CustomReceiver(...
...
ssc.start()
ssc.awaitTermination()

Alles funktioniert wie erwartet, wenn der Empfänger keine Fehler aufweist. Wenn der Empfänger ausfällt (z. B. mit einem falschen http-Link), startet der Funke ihn kontinuierlich neu und die Anwendung wird niemals beendet.

16/05/31 17:03:38 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
16/05/31 17:03:38 ERROR ReceiverTracker: Receiver has been stopped. Try to restart it.

Wir möchten nur die gesamte Anwendung beenden, wenn ein Empfänger ausfällt.

Antworten auf die Frage(4)

Ihre Antwort auf die Frage