Forma óptima de crear una tubería ml en Apache Spark para el conjunto de datos con un alto número de columnas

Estoy trabajando con Spark 2.1.1 en un conjunto de datos con ~ 2000 características y estoy tratando de crear una Tubería ML básica, que consta de algunos Transformadores y un Clasificador.

Supongamos, por simplicidad, que el Pipeline con el que estoy trabajando consiste en un VectorAssembler, StringIndexer y un Clasificador, que sería un caso de uso bastante común.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

Si los pasos de la tubería se separan en una tubería de transformador (VectorAssembler + StringIndexer) y una segunda tubería de clasificador, y si las columnas innecesarias se sueltan entre ambas tuberías, la capacitación tendrá éxito. Esto significa que para reutilizar los modelos, se deben guardar dos PipelineModels después del entrenamiento y se debe introducir un paso de preprocesamiento intermedio.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

La solución (en mi humilde opinión) mucho más limpia sería fusionar todas las etapas de la tubería en una sola.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

Sin embargo, poner todos los PipelineStage en un Pipeline conduce a la siguiente excepción, probablemente debido al problemaesta PR eventualmente resolverá:

ERROR CodeGenerator: error al compilar: org.codehaus.janino.JaninoRuntimeException: grupo constante para la clase org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection ha crecido más allá del límite JVM de 0xFFFF

La razón de esto es que VectorAssembler efectivamente duplica (en este ejemplo) la cantidad de datos en el DataFrame, ya que no hay transformador que pueda eliminar las columnas innecesarias. (Verensamblador de vector de tubería de chispa soltar otras columnas)

Para el ejemplo funciona en elconjunto de datos de golub y los siguientes pasos de preprocesamiento son necesarios:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

Como soy nuevo en Spark, no estoy seguro de cuál sería la mejor manera de resolver este problema. ¿Sugerirías ...

para crear un nuevo transformador, que suelta columnas y que se puede incorporar a la tubería?dividir ambas tuberías e introducir el paso intermedio¿Algo más? :)

¿O me estoy perdiendo algo importante (pasos de la tubería, relaciones públicas, etc.) que resolvería este problema?

Editar:

Implementé un nuevo transformadorDroppingVectorAssembler, que elimina columnas innecesarias, sin embargo, se produce la misma excepción.

Además de eso, establecerspark.sql.codegen.wholeStage afalse No resuelve el problema.

Respuestas a la pregunta(2)

Su respuesta a la pregunta