Большое спасибо, пока!

ожусь в ситуации, когда я хочу, чтобы остановить / отменить работу Flink из кода. Это мой интеграционный тест, в котором я отправляю задание на мою работу Flink и проверяю результат. Поскольку задание выполняется асинхронно, оно не останавливается, даже если тест не пройден / пройден. Я хочу сделать остановку после окончания теста.

Я попробовал несколько вещей, которые я перечисляю ниже:

Получить работу менеджераПолучить рабочие местаДля каждого запущенного задания отправьте запрос на отмену менеджеру по работе

Это, конечно, не работает, но я не уверен, является ли Jobmanager actorref неправильным или что-то еще отсутствует.

Я получаю сообщение об ошибке: [flink-akka.actor.default-dispatcher-5] [akka: // flink / user / jobmanager_1] Сообщение [org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $] от Actor [ akka: // flink / temp / $ a] для Actor [akka: // flink / user / jobmanager_1] не был доставлен. [1] встреченные мертвые буквы. Это ведение журнала можно отключить или настроить с помощью параметров конфигурации «akka.log-dead-letters» и «akka.log-dead-letters-во время-выключения»

Это означает, что либо ссылка на актера менеджера заданий неверна, либо сообщение, отправленное ему, неверно.

Код выглядит следующим образом:

val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
 val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
    try {
      val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
      if(result.isInstanceOf[RunningJobsStatus]){
        val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
        val itr = runningJobs.iterator()
        while(itr.hasNext){
          val jobId = itr.next().getJobId
          val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
          try {
            Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
          }
          catch {
            case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
          }

        }
      }
    }
    catch{
      case e : Exception => "Could not retrieve running jobs from the JobManager." + e
    }

  }

Может кто-нибудь проверить, правильный ли это подход?

РЕДАКТИРОВАТЬ: Чтобы полностью остановить задание, необходимо остановить TaskManager вместе с JobManager в следующем порядке TaskManager, t, а затем JobManager.

Ответы на вопрос(1)

Ваш ответ на вопрос