Maneira ideal de criar um pipeline de ml no Apache Spark para conjunto de dados com alto número de colunas
Estou trabalhando com o Spark 2.1.1 em um conjunto de dados com recursos ~ 2000 e tentando criar um pipeline básico de ML, composto por alguns transformadores e um classificador.
Vamos supor, por uma questão de simplicidade, que o Pipeline com o qual estou trabalhando consiste em um VectorAssembler, StringIndexer e um Classificador, que seria uma base de dados bastante comum.
// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("featuresRaw")
val labelIndexer: StringIndexer = new StringIndexer()
.setInputCol("TARGET")
.setOutputCol("indexedLabel")
// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featuresRaw")
.setMaxBins(30)
// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(5))
.addGrid(rf.maxDepth, Array(5))
.build()
// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setLabelCol("indexedLabel")
Se as etapas do pipeline forem separadas em um pipeline de transformador (VectorAssembler + StringIndexer) e um segundo pipeline de classificação, e se as colunas desnecessárias forem descartadas entre os dois pipelines, o treinamento será bem-sucedido. Isso significa que, para reutilizar os modelos, dois PipelineModels precisam ser salvos após o treinamento e uma etapa intermediária de pré-processamento deve ser introduzida.
// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)
val mainPipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator()
.setEstimator(mainPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]
A solução (imho) muito mais limpa seria mesclar todos os estágios do pipeline em um único pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, assmbleFeatures, rf))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
// This will fail!
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]
No entanto, colocar todos os PipelineStages em um pipeline leva à seguinte exceção, provavelmente devido ao problemaesta O PR acabará por resolver:
ERRO CodeGenerator: falha ao compilar: org.codehaus.janino.JaninoRuntimeException: pool constante da classe org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection ultrapassou o limite de 0xFFFF da JVM
A razão para isso é que o VectorAssembler dobra efetivamente (neste exemplo) a quantidade de dados no DataFrame, pois não há transformador que possa eliminar as colunas desnecessárias. (Vejomontador de vetor de pipeline de faísca soltar outras colunas)
Para o exemplo funciona noconjunto de dados golub e as seguintes etapas de pré-processamento são necessárias:
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)
// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)
// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))
// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")
Como sou novo no Spark, não tenho certeza de qual seria a melhor maneira de resolver esse problema. Você sugeriria ...
criar um novo transformador, que descarta colunas e que pode ser incorporado ao pipeline?divida os dois pipelines e introduza a etapa intermediáriaalgo mais? :)Ou estou perdendo alguma coisa importante (etapas do pipeline, relações públicas, etc.) que resolveria esse problema?
Editar:Eu implementei um novo TransformerDroppingVectorAssembler
, que descarta colunas desnecessárias; no entanto, a mesma exceção é lançada.
Além disso, definirspark.sql.codegen.wholeStage
parafalse
não resolve o problema.