ForeachRDD выполняется на драйвере?

Я пытаюсь обработать некоторые данные XML, полученные в очереди JMS (QPID), используя потоковую передачу Spark. После получения XML в качестве DStream я преобразую их в Dataframes, чтобы я мог соединить их с некоторыми из моих статических данных в виде уже загруженных Dataframes. Но согласно документации API для метода foreachRdd в DStream: он исполняется в драйвере, поэтому это означает, что вся логика обработки будет работать только в драйвере и не будет распространяться среди работников / исполнителей.

Документация по API

foreachRDD(func)

Наиболее общий оператор вывода, который применяет функцию func к каждому RDD, сгенерированному из потока. Эта функция должна передавать данные в каждом СДР во внешнюю систему, например сохранять СДР в файлы или записывать их по сети в базу данных. Обратите внимание, что функция func выполняется в процессе драйвера, выполняющем потоковое приложение, и обычно содержит действия RDD, которые вызывают вычисление потоковых RDD.

Ответы на вопрос(2)

что вся логика обработки будет работать только на драйвере и не будет распространяться среди работников / исполнителей.

Нет,сама функция работает на драйвере, но не забывайте, что он работает наRDD, Внутренние функции, которые вы будете использовать наRDD, такие какforeachPartition, map, filter и т.д. будетпо-прежнему работает на рабочих узлах, этоне будет заставить все данные отправляться обратно по сети драйверу, если только вы не вызываете такие методы, какcollect, которые делают.

 avocado04 апр. 2017 г., 12:55
Ну, это так сбивает с толку: «Нет, сама функция работает на драйвере, но не забывайте, что она работает на СДР», предположим,rdd.foreachRDD(func) называется, и этоfunc Функция записывает данные в Redis через глобальную переменнуюredis_client, это означаетfunc относится кredis_client, поэтому Q: будут ли возникать какие-либо исключения вforeachRDD звоните, так какredis_client не сериализуем.
 CᴴᴀZ17 июл. 2017 г., 16:01
@YuvalItzchakov Цитирую ваш комментарий:rdd.foreachRDD выполняет func для исполнителей.   Отдокументы, foreachRDD исполняетfunc наВодитель. foreach, foreachPartition работает наИсполнители хоть.
 Yuval Itzchakov17 июл. 2017 г., 16:08
@ CᴴᴀZ Операция на весьRDD должно произойти на стороне водителя, но любая операция, которую вы делаете наRDD сам, т.е.foreach или жеforeachPartition работает на исполнителей. Это было мое намерение.
 Yuval Itzchakov04 апр. 2017 г., 12:58
@loganecolss Согласен, семантика исполнения сложна.rdd.foreachRDD исполняетfunc на исполнителей. Еслиfunc захваты redis_client через закрытие, вы получитеTaskNotSerializable исключение. Если экземплярredis_client выделяется внутриfunc, вам будет хорошо.

если вы запустите следующее, вы увидите «обезьяну» на стандартный вывод драйвера:

myDStream.foreachRDD { rdd =>
  println("monkey")
}

Если вы запустите следующее, вы увидите «обезьяну» на стандартный вывод драйвера и фильтрРабота будет сделано на любых исполнителейrdd распределяется по:

myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
}

Давайте добавим упрощение, котороеmyDStream только когда-либо получает один RDD, и что этот RDD распределен по набору разделов, которые мы будем называтьPartitionSetA которые существуют наMachineSetB гдеExecutorSetC бегут. Если вы запустите следующее, вы увидите «обезьяну» на стандартном выводе водителя, вы увидите «черепаху» на стандартном выводе всех исполнителей вExecutorSetC («черепаха» будет отображаться один раз для каждого раздела - на компьютере, на котором работает исполнитель, может быть много разделов), и работа с фильтрами и операциями сложения будет выполняться вExecutorSetC:

myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
  }
}

Еще одна вещь, на которую следует обратить внимание: в следующем кодеy в конечном итоге будет отправлен через сеть от водителя ко всемExecutorSetC для каждогоrdd:

val y = 2
myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
    val z = x + y
  }
}

Чтобы избежать этих издержек, вы можете использовать широковещательные переменные, которые отправляют значение из драйвера исполнителям только один раз. Например:

val y = 2
val broadcastY = sc.broadcast(y)
myDStream.foreachRDD { rdd =>
  println("monkey")
  rdd.filter(element => element == "Save me!")
  rdd.foreachPartition { partition =>
    println("turtle")
    val x = 1 + 1
    val z = x + broadcastY.value
  }
}

Для отправки более сложных вещей в виде переменных рассылки, таких как объекты, которые не легко сериализуются после создания экземпляра, вы можете увидеть следующее сообщение в блоге:https://allegro.tech/2015/08/spark-kafka-integration.html

Ваш ответ на вопрос