Большое спасибо, пока!
ожусь в ситуации, когда я хочу, чтобы остановить / отменить работу Flink из кода. Это мой интеграционный тест, в котором я отправляю задание на мою работу Flink и проверяю результат. Поскольку задание выполняется асинхронно, оно не останавливается, даже если тест не пройден / пройден. Я хочу сделать остановку после окончания теста.
Я попробовал несколько вещей, которые я перечисляю ниже:
Получить работу менеджераПолучить рабочие местаДля каждого запущенного задания отправьте запрос на отмену менеджеру по работеЭто, конечно, не работает, но я не уверен, является ли Jobmanager actorref неправильным или что-то еще отсутствует.
Я получаю сообщение об ошибке: [flink-akka.actor.default-dispatcher-5] [akka: // flink / user / jobmanager_1] Сообщение [org.apache.flink.runtime.messages.JobManagerMessages $ RequestRunningJobsStatus $] от Actor [ akka: // flink / temp / $ a] для Actor [akka: // flink / user / jobmanager_1] не был доставлен. [1] встреченные мертвые буквы. Это ведение журнала можно отключить или настроить с помощью параметров конфигурации «akka.log-dead-letters» и «akka.log-dead-letters-во время-выключения»
Это означает, что либо ссылка на актера менеджера заданий неверна, либо сообщение, отправленное ему, неверно.
Код выглядит следующим образом:
val system = ActorSystem("flink", ConfigFactory.load.getConfig("akka")) //I debugged to get this path
val jobManager = system.actorSelection("/user/jobmanager_1") //also got this akka path by debugging and getting the jobmanager akka url
val responseRunningJobs = Patterns.ask(jobManager, JobManagerMessages.getRequestRunningJobsStatus, new FiniteDuration(10000, TimeUnit.MILLISECONDS))
try {
val result = Await.result(responseRunningJobs, new FiniteDuration(5000, TimeUnit.MILLISECONDS))
if(result.isInstanceOf[RunningJobsStatus]){
val runningJobs = result.asInstanceOf[RunningJobsStatus].getStatusMessages()
val itr = runningJobs.iterator()
while(itr.hasNext){
val jobId = itr.next().getJobId
val killResponse = Patterns.ask(jobManager, new CancelJob(jobId), new Timeout(new FiniteDuration(2000, TimeUnit.MILLISECONDS)));
try {
Await.result(killResponse, new FiniteDuration(2000, TimeUnit.MILLISECONDS))
}
catch {
case e : Exception =>"Canceling the job with ID " + jobId + " failed." + e
}
}
}
}
catch{
case e : Exception => "Could not retrieve running jobs from the JobManager." + e
}
}
Может кто-нибудь проверить, правильный ли это подход?
РЕДАКТИРОВАТЬ: Чтобы полностью остановить задание, необходимо остановить TaskManager вместе с JobManager в следующем порядке TaskManager, t, а затем JobManager.