ошибка даже около 400 столбцов функций.
отаю со Spark 2.1.1 над набором данных с ~ 2000 функциями и пытаюсь создать базовый ML Pipeline, состоящий из нескольких трансформаторов и классификатора.
Давайте для простоты предположим, что конвейер, с которым я работаю, состоит из VectorAssembler, StringIndexer и Classifier, который был бы довольно распространенным вариантом использования.
// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("featuresRaw")
val labelIndexer: StringIndexer = new StringIndexer()
.setInputCol("TARGET")
.setOutputCol("indexedLabel")
// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("featuresRaw")
.setMaxBins(30)
// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
.addGrid(rf.numTrees, Array(5))
.addGrid(rf.maxDepth, Array(5))
.build()
// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setLabelCol("indexedLabel")
Если шаги конвейера разделены на конвейер преобразователя (VectorAssembler + StringIndexer) и второй конвейер классификатора, и если ненужные столбцы помещены между обоими конвейерами, обучение завершается успешно. Это означает, что для повторного использования моделей после обучения необходимо сохранить две модели PipelineMode и ввести промежуточный этап предварительной обработки.
// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)
val mainPipeline = new Pipeline().setStages(Array(rf))
val cv = new CrossValidator()
.setEstimator(mainPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]
(Imho) гораздо более чистое решение состояло бы в том, чтобы объединить все стадии трубопровода в один трубопровод.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, assmbleFeatures, rf))
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
// This will fail!
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]
Однако размещение всех PipelineStage в одном конвейере приводит к следующему исключению, вероятно, из-за проблемыэтот PR в итоге решит:
ОШИБКА CodeGenerator: не удалось скомпилировать: org.codehaus.janino.JaninoRuntimeException: постоянный пул для класса org.apache.spark.sql.catalyst.expressions.GeneratedClass $ SpecificUnsafeProjection превысил предел JVM, равный 0xFFFF
Причина этого заключается в том, что VectorAssembler эффективно удваивает (в этом примере) объем данных в DataFrame, поскольку нет преобразователя, который мог бы отбрасывать ненужные столбцы. (Увидетьсборщик вектора искрового трубопровода отбросить другие столбцы)
К примеру работает наГолубой набор данных и следующие шаги предварительной обработки необходимы:
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)
// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)
// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))
// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")
Поскольку я новичок в Spark, я не уверен, что будет лучшим способом решить эту проблему. Не могли бы вы предложить ...
создать новый трансформатор, который отбрасывает столбцы и который может быть включен в трубопровод?разделить оба трубопровода и ввести промежуточный шагчто-нибудь еще? :)Или я упускаю что-то важное (шаги конвейера, PR и т. Д.), Которое решило бы эту проблему?
Редактировать:Я реализовал новый ТрансформерDroppingVectorAssembler
, который удаляет ненужные столбцы, однако, выдается то же исключение.
Помимо этого, настройкаspark.sql.codegen.wholeStage
вfalse
не решает проблему.