Ejecute más de 3000 modelos de bosque aleatorio por grupo utilizando Spark MLlib Scala API

Estoy tratando de construir modelos de bosque aleatorios por grupo (School_ID, más de 3 mil) en un archivo csv de entrada de modelo grande usando Spark Scala API. Cada uno de los grupos contiene alrededor de 3000-4000 registros. Los recursos que tengo a disposición son 20-30 aws m3.2xlarge instancias.

En R, puedo construir modelos por grupo y guardarlos en una lista como esta:

library(dplyr);library(randomForest);
    Rf_model <- train %>% group_by(School_ID) %>% 
                do(school= randomForest(formula=Rf_formula, data=., importance = TRUE))

La lista se puede almacenar en algún lugar y puedo llamarlos cuando necesito usarlos como a continuación:

save(Rf_model.school,file=paste0(Modelpath,"Rf_model.dat"))
load(file=paste0(Modelpath,"Rf_model.dat"))
pred <-  predict(Rf_model.school$school[school_index][[1]], newdata=test)

Me preguntaba cómo hacer eso en Spark, si necesito o no dividir los datos por grupo primero y cómo hacerlo de manera eficiente si es necesario.

Pude dividir el archivo por School_ID según el código siguiente, pero parece que crea un trabajo individual para subconjugar para cada iteración y toma mucho tiempo terminar los trabajos. ¿Hay alguna manera de hacerlo en una sola pasada?

model_input.cache()

val schools = model_input.select("School_ID").distinct.collect.flatMap(_.toSeq)
val bySchoolArray = schools.map(School_ID => model_input.where($"School_ID" <=> School_ID))

for( i <- 0 to programs.length - 1 ){
  bySchoolArray(i).
    write.format("com.databricks.spark.csv").
    option("header", "true").
    save("model_input_bySchool/model_input_"+ schools(i))
}

Fuente:¿Cómo puedo dividir un marco de datos en marcos de datos con los mismos valores de columna en SCALA y SPARK?

Editar 24/08/2015 Estoy tratando de convertir mi marco de datos en un formato aceptado por el modelo de bosque aleatorio. Estoy siguiendo las instrucciones en este hiloCómo crear el marco de datos correcto para la clasificación en Spark ML

Básicamente, creo una nueva "etiqueta" variable y almaceno mi clase en Double. Luego combino todas mis características usando la función VectorAssembler y transformo mis datos de entrada de la siguiente manera:

val assembler = new VectorAssembler().
  setInputCols(Array("COL1", "COL2", "COL3")).
  setOutputCol("features")

val model_input = assembler.transform(model_input_raw).
  select("SCHOOL_ID", "label", "features")

Mensaje de error parcial (avíseme si necesita el mensaje de registro completo) -

scala.MatchError: StringType (de la clase org.apache.spark.sql.types.StringType $) en org.apache.spark.ml.feature.VectorAssembler $ anonfun $ 2.apply (VectorAssembler.scala: 57)

Esto se resuelve después de convertir todas las variables a tipos numéricos.

Editar 25/08/2015 El modelo ml no acepta la etiqueta que codifiqué manualmente, así que necesito usar StringIndexer para solucionar el problema como se indicaaquí. De acuerdo con ladocumentación oficial, la etiqueta más frecuente obtiene 0. Causa etiquetas inconsistentes en School_ID. Me preguntaba si hay una manera de crear las etiquetas sin restablecer el orden de los valores.

val indexer = new StringIndexer().
  setInputCol("label_orig").
  setOutputCol("label")

Cualquier sugerencia o dirección sería útil y no dude en plantear cualquier pregunta. ¡Gracias!

Respuestas a la pregunta(1)

Su respuesta a la pregunta