Распараллелить / избежать цикла foreach в искре

Я написал класс, который получает DataFrame, выполняет некоторые вычисления и может экспортировать результаты. Кадры данных генерируются списком ключей. Я знаю, что сейчас делаю это очень неэффективно:

var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()                               // writes the Results into another Dataframe that is saved to HDFS
}

Я думаю, что foreach в списке Scala не параллелен, так как я могу избежать использования foreach здесь? Вычисление DataFrames может происходить параллельно, так как результаты вычислений НЕ вводятся для следующего DataFrame - как я могу это реализовать?

Огромное спасибо!!

__редактировать:

что я пытался сделать:

val l = List(34, 32, 132, 352)      // Scala List
var l_DF:List[DataFrame] = List()
l.foreach{ i =>
    DataContainer.getDataFrame(i)::l        //append DataFrame to List of Dataframes
}

val rdd:DataFrame = sc.parallelize(l)
rdd.foreach(data =>
    val x = new MyClass(data)
)

но дает

Invalid tree; null:
null

редактировать 2: Хорошо, я не понимаю, как все работает под капотом ....

1) Все работает нормально, когда я выполняю это в spark-shell

spark-shell –driver-memory 10g       
//...
var l = List(34, 32, 132, 352)      // Scala List

l.foreach{i => 
    val data:DataFrame = AllData.where($"a" === i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass     with new Object
    x.calcSomething()
}

2) Ошибка, когда я начинаю то же самое с

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g  
// same code as above
java.util.concurrent.RejectedExecutionException: Task [email protected] rejected from [email protected][Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 1263]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at scala.concurrent.impl.ExecutionContextImpl$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.Future$anonfun$recover$1.apply(Future.scala:324)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$anon$1.execute(ExecutionContextImpl.scala:133)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.Future$anonfun$map$1.apply(Future.scala:235)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)

3) когда я пытаюсь распараллелить это, я тоже получаю ошибку

spark-shell --master yarn-client --num-executors 10 –driver-memory 10g
//...
var l = List(34, 32, 132, 352).par
// same code as above, just parallelized before calling foreach
// i can see the parallel execution by the console messages (my class gives some and they are printed out parallel now instead of sequentielly

scala.collection.parallel.CompositeThrowable: Multiple exceptions thrown during a parallel computation: java.lang.IllegalStateException: SparkContext has been shutdown
org.apache.spark.SparkContext.runJob(SparkContext.scala:1816)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1850)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215)
    org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207)
org.apache.spark.sql.DataFrame$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.DataFrame$anonfun$collect$1.apply(DataFrame.scala:1385)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
.
.
.

java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext                  org.apache.spark.SparkContext.org$apache$spark$SparkContext$assertNotStopped(SparkContext.scala:104)
 org.apache.spark.SparkContext.broadcast(SparkContext.scala:1320)
   org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:104)
org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58)
org.apache.spark.sql.catalyst.planning.QueryPlanner$anonfun$1.apply(QueryPlanner.scala:58)
scala.collection.Iterator$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.makeBroadcastHashJoin(SparkStrategies.scala:92)
org.apache.spark.sql.execution.SparkStrategies$EquiJoinSelection$.apply(SparkStrategies.scala:104)

На самом деле исполнителей более 10, но 4 узла. Я никогда не настраиваю искро-контекст. Это уже дано при запуске.

 Sachin Tyagi28 июн. 2016 г., 09:58
Пожалуйста, предоставьте ошибку завершения стека. Также линияDataContainer.getDataFrame(i)::l не выглядит правильно.

Ответы на вопрос(2)

Вы можете использовать Scala Future и Spark Fair Scheduling, например,

import scala.concurrent._
import scala.concurrent.duration._
import ExecutionContext.Implicits.global

object YourApp extends App { 
  val sc = ... // SparkContext, be sure to set spark.scheduler.mode=FAIR
  var pool = 0
  // this is to have different pools per job, you can wrap it to limit the no. of pools
  def poolId = {
    pool = pool + 1
    pool
  }
  def runner(i: Int) = Future {
    sc.setLocalProperty("spark.scheduler.pool", poolId)
    val data:DataFrame = DataContainer.getDataFrame(i) // get DataFrame
    val x = new MyClass(data)                     // initialize MyClass with new Object
    x.setSettings(...)
    x.calcSomething()
    x.saveResults()
  }

  val l = List(34, 32, 132, 352)      // Scala List
  val futures = l map(i => runner(i))

  // now you need to wait all your futures to be completed
  futures foreach(f => Await.ready(f, Duration.Inf))

}

Благодаря FairScheduler и различным пулам каждая параллельная работа будет иметь значительную долю ресурсов искрового кластера.

Некоторая ссылка относительно будущего СкалыВот, Возможно, вам придется добавить необходимые обратные вызовы при завершении, успехе и / или сбоях.

Вы можете использовать Скалапараллельные коллекции достигатьforeach параллелизм на стороне водителя.

val l = List(34, 32, 132, 352).par
l.foreach{i => // your code to be run in parallel for each i}

* Однако, предостережение: способен ли ваш кластер выполнять задания параллельно? Вы можете отправлять задания в ваш искровой кластер параллельно, но в итоге они могут попасть в очередь в кластере и выполняться последовательно.

 johntechendso28 июн. 2016 г., 09:41
я не в локальном режиме, я использую клиента пряжи и 4 узла исполнителя
 johntechendso28 июн. 2016 г., 09:27
Спасибо! кластер, который я использую, имеет несколько исполнителей. Это уже самый эффективный способ? что делает мое решение (см. редактирование)
 Sachin Tyagi28 июн. 2016 г., 09:34
Вы используете искровой автономный кластер или с YARN?
 Sachin Tyagi28 июн. 2016 г., 09:46
Вы можете настроить использование пряжиЧЕСТНОЕ планирование также. Я думаю, что он тоже использует FIFO по умолчанию.
 Sachin Tyagi28 июн. 2016 г., 09:33
Пожалуйста, посмотрите на это из документации Spark -spark.apache.org/docs/latest/...  Ниже приводится соответствующая цитата: «По умолчанию планировщик Spark запускает задания в режиме FIFO. [...] Начиная с Spark 0.8, также можно настроить справедливое распределение между заданиями. При справедливом совместном использовании Spark назначает задачи между заданиями в «Круговой» способ, так что все задания получают примерно равную долю ресурсов кластера. Чтобы включить честный планировщик, просто установите для свойства spark.scheduler.mode значение FAIR при настройке SparkContext. "

Ваш ответ на вопрос