ошибка даже около 400 столбцов функций.

отаю со Spark 2.1.1 над набором данных с ~ 2000 функциями и пытаюсь создать базовый ML Pipeline, состоящий из нескольких трансформаторов и классификатора.

Давайте для простоты предположим, что конвейер, с которым я работаю, состоит из VectorAssembler, StringIndexer и Classifier, который был бы довольно распространенным вариантом использования.

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

Если шаги конвейера разделены на конвейер преобразователя (VectorAssembler + StringIndexer) и второй конвейер классификатора, и если ненужные столбцы помещены между обоими конвейерами, обучение завершается успешно. Это означает, что для повторного использования моделей после обучения необходимо сохранить две модели PipelineMode и ввести промежуточный этап предварительной обработки.

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

(Imho) гораздо более чистое решение состояло бы в том, чтобы объединить все стадии трубопровода в один трубопровод.

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

Однако размещение всех PipelineStage в одном конвейере приводит к следующему исключению, вероятно, из-за проблемыэтот PR в итоге решит:

ОШИБКА CodeGenerator: не удалось скомпилировать: org.codehaus.janino.JaninoRuntimeException: постоянный пул для класса org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection превысил предел JVM, равный 0xFFFF

Причина этого заключается в том, что VectorAssembler эффективно удваивает (в этом примере) объем данных в DataFrame, поскольку нет преобразователя, который мог бы отбрасывать ненужные столбцы. (Увидетьсборщик вектора искрового трубопровода отбросить другие столбцы)

К примеру работает наГолубой набор данных и следующие шаги предварительной обработки необходимы:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

Поскольку я новичок в Spark, я не уверен, что будет лучшим способом решить эту проблему. Не могли бы вы предложить ...

создать новый трансформатор, который отбрасывает столбцы и который может быть включен в трубопровод?разделить оба трубопровода и ввести промежуточный шагчто-нибудь еще? :)

Или я упускаю что-то важное (шаги конвейера, PR и т. Д.), Которое решило бы эту проблему?

Редактировать:

Я реализовал новый ТрансформерDroppingVectorAssembler, который удаляет ненужные столбцы, однако, выдается то же исключение.

Помимо этого, настройкаspark.sql.codegen.wholeStage вfalse не решает проблему.

Ответы на вопрос(2)

Ваш ответ на вопрос