Maneira ideal de criar um pipeline de ml no Apache Spark para conjunto de dados com alto número de colunas

Estou trabalhando com o Spark 2.1.1 em um conjunto de dados com recursos ~ 2000 e tentando criar um pipeline básico de ML, composto por alguns transformadores e um classificador.

Vamos supor, por uma questão de simplicidade, que o Pipeline com o qual estou trabalhando consiste em um VectorAssembler, StringIndexer e um Classificador, que seria uma base de dados bastante comum.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

Se as etapas do pipeline forem separadas em um pipeline de transformador (VectorAssembler + StringIndexer) e um segundo pipeline de classificação, e se as colunas desnecessárias forem descartadas entre os dois pipelines, o treinamento será bem-sucedido. Isso significa que, para reutilizar os modelos, dois PipelineModels precisam ser salvos após o treinamento e uma etapa intermediária de pré-processamento deve ser introduzida.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

A solução (imho) muito mais limpa seria mesclar todos os estágios do pipeline em um único pipeline.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

No entanto, colocar todos os PipelineStages em um pipeline leva à seguinte exceção, provavelmente devido ao problemaesta O PR acabará por resolver:

ERRO CodeGenerator: falha ao compilar: org.codehaus.janino.JaninoRuntimeException: pool constante da classe org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection ultrapassou o limite de 0xFFFF da JVM

A razão para isso é que o VectorAssembler dobra efetivamente (neste exemplo) a quantidade de dados no DataFrame, pois não há transformador que possa eliminar as colunas desnecessárias. (Vejomontador de vetor de pipeline de faísca soltar outras colunas)

Para o exemplo funciona noconjunto de dados golub e as seguintes etapas de pré-processamento são necessárias:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

Como sou novo no Spark, não tenho certeza de qual seria a melhor maneira de resolver esse problema. Você sugeriria ...

criar um novo transformador, que descarta colunas e que pode ser incorporado ao pipeline?divida os dois pipelines e introduza a etapa intermediáriaalgo mais? :)

Ou estou perdendo alguma coisa importante (etapas do pipeline, relações públicas, etc.) que resolveria esse problema?

Editar:

Eu implementei um novo TransformerDroppingVectorAssembler, que descarta colunas desnecessárias; no entanto, a mesma exceção é lançada.

Além disso, definirspark.sql.codegen.wholeStage parafalse não resolve o problema.