Запуск 3000+ моделей произвольных лесов по группам с помощью Spark MLlib Scala API

Я пытаюсь построить случайные модели леса по группам (School_ID, более 3 тысяч) на основе входного csv-файла большой модели с использованием Spark Scala API. Каждая группа содержит около 3000-4000 записей. Ресурсы, которыми я располагаю, составляют 20-30 aws m3.2.

В R я могу создавать модели по группам и сохранять их в виде списка:

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>% 
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

Список может храниться где-то, и я могу позвонить им, когда мне нужно использовать их, как показано ниже -

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)

Мне было интересно, как это сделать в Spark, нужно ли мне сначала разбить данные по группам и как это сделать эффективно, если это необходимо.

Мне удалось разделить файл по School_ID на основе приведенного ниже кода, но, похоже, он создает одно отдельное задание для поднабора для каждой итерации и занимает много времени для завершения заданий. Есть ли способ сделать это за один проход?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
  bySchoolArray(i).
    write.format("com.databricks.spark.csv").
    option("header", "true").
    save("model_input_bySchool/model_input_"+ schools(i))
}

Источник:Как я могу разбить фрейм данных на фреймы с одинаковыми значениями столбцов в SCALA и SPARK

Редактировать 24.08.2015 Я пытаюсь преобразовать мой фрейм данных в формат, который принимается моделью случайного леса. Я следую инструкции в этой темеКак создать правильный фрейм данных для классификации в Spark ML

По сути, я создаю новую переменную «label» и сохраняю свой класс в Double. Затем я комбинирую все свои функции, используя функцию VectorAssembler, и преобразую свои входные данные следующим образом:

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
  select("SCHOOL_ID", "label", "features")

Частичное сообщение об ошибке (дайте мне знать, если вам нужно полное сообщение журнала) -

scala.MatchError: StringType (класса org.apache.spark.sql.types.StringType $) в org.apache.spark.ml.feature.VectorAssembler $ anonfun $ 2.apply (VectorAssembler.scala: 57)

Это решается после преобразования всех переменных в числовые типы.

Редактировать 25.08.2015 Модель ml не принимает метку, которую я кодировал вручную, поэтому мне нужно использовать StringIndexer, чтобы обойти проблему, как указаноВот, Согласноофициальная документация, самый частый ярлык получает 0. Это вызывает несовместимые ярлыки по School_ID. Мне было интересно, если есть способ создать метки без сброса порядка значений.

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")

Любые предложения или указания будут полезны и не стесняйтесь задавать любые вопросы. Спасибо!

Ответы на вопрос(1)

Ваш ответ на вопрос