Настройка параметров для неявной модели факторизации матрицы ALS pyspark.ml через pyspark.ml CrossValidator

Я пытаюсь настроить параметры модели факторизации матрицы ALS, которая использует неявные данные. Для этого я пытаюсь использовать pyspark.ml.tuning.CrossValidator, чтобы запустить сетку параметров и выбрать лучшую модель. Я считаю, что моя проблема в оценщике, но я не могу понять это.

Я могу заставить это работать для явной модели данных с оценщиком RMSE регрессии следующим образом:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.sql.functions import rand


conf = SparkConf() \
  .setAppName("MovieLensALS") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

dfRatings = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
                                 ["user", "item", "rating"])
dfRatingsTest = sqlContext.createDataFrame([(0, ,0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsExplicit = ALS()
defaultModel = alsExplicit.fit(dfRatings)

paramMapExplicit = ParamGridBuilder() \
                    .addGrid(alsExplicit.rank, [8, 12]) \
                    .addGrid(alsExplicit.maxIter, [10, 15]) \
                    .addGrid(alsExplicit.regParam, [1.0, 10.0]) \
                    .build()

evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cvExplicit = CrossValidator(estimator=alsExplicit, estimatorParamMaps=paramMapExplicit, evaluator=evaluatorR)
cvModelExplicit = cvExplicit.fit(dfRatings)

predsExplicit = cvModelExplicit.bestModel.transform(dfRatingsTest)
predsExplicit.show()

Когда я пытаюсь сделать это для неявных данных (скажем, количества просмотров, а не оценок), я получаю ошибку, которую не могу понять. Вот код (очень похожий на приведенный выше):

dfCounts = sqlContext.createDataFrame([(0,0,0), (0,1,12), (0,2,3), (1,0,5), (1,1,9), (1,2,0), (2,0,0), (2,1,11), (2,2,25)],
                                 ["user", "item", "rating"])
dfCountsTest = sqlContext.createDataFrame([(0, 0), (0, 1), (1, 1), (1, 2), (2, 1), (2, 2)], ["user", "item"])

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCounts)

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

evaluatorB = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol="rating")
evaluatorR = RegressionEvaluator(metricName="rmse", labelCol="rating")

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR)
cvModel = cv.fit(dfCounts)

predsImplicit = cvModel.bestModel.transform(dfCountsTest)
predsImplicit.show()

Я попытался сделать это с помощью оценщика RMSE, и я получил ошибку. Как я понимаю, я также должен иметь возможность использовать метрику AUC для оценщика двоичной классификации, поскольку предсказания неявной факторизации матрицы являются доверительной матрицей c_ui для предсказаний двоичной матрицы p_uiпо этой статье, на которую ссылается документация для pyspark ALS.

Использование любого оценщика дает мне ошибку, и я не могу найти никаких плодотворных дискуссий о перекрестной проверке неявных моделей ALS онлайн. Я просматриваю исходный код CrossValidator, чтобы попытаться выяснить, в чем дело, но у меня возникли проблемы. Одна из моих мыслей заключается в том, что после того, как процесс преобразует неявную матрицу данных r_ui в двоичную матрицу p_ui и матрицу достоверности c_ui, я не уверен, с чем сравнивается предсказанная матрица c_ui на этапе оценки.

Вот ошибка:

Traceback (most recent call last):

  File "<ipython-input-16-6c43b997005e>", line 1, in <module>
    cvModel = cv.fit(dfCounts)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 69, in fit
    return self._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\tuning.py", line 239, in _fit
    model = est.fit(train, epm[j])

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\pipeline.py", line 67, in fit
    return self.copy(params)._fit(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 133, in _fit
    java_model = self._fit_java(dataset)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\ml\wrapper.py", line 130, in _fit_java
    return self._java_obj.fit(dataset._jdf)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)

  File "C:/spark-1.6.1-bin-hadoop2.6/python\pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)

  File "C:\spark-1.6.1-bin-hadoop2.6\python\lib\py4j-0.9-src.zip\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)

etc.......

ОБНОВИТЬ

Я попытался масштабировать входные данные, чтобы они находились в диапазоне от 0 до 1, и использовал оценщик RMSE. Кажется, он работает хорошо, пока я не попытаюсь вставить его в CrossValidator.

Следующий код работает. Я получаю прогнозы и получаю RMSE-значение от моего оценщика.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator


conf = SparkConf() \
  .setAppName("ALSPractice") \
  .set("spark.executor.memory", "2g")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

# Users 0, 1, 2, 3 - Items 0, 1, 2, 3, 4, 5 - Ratings 0.0-5.0
dfCounts2 = sqlContext.createDataFrame([(0,0,5.0), (0,1,5.0),            (0,3,0.0), (0,4,0.0), 
                                        (1,0,5.0),            (1,2,4.0), (1,3,0.0), (1,4,0.0),
                                        (2,0,0.0),            (2,2,0.0), (2,3,5.0), (2,4,5.0),
                                        (3,0,0.0), (3,1,0.0),            (3,3,4.0)            ],
                                       ["user", "item", "rating"])

dfCountsTest2 = sqlContext.createDataFrame([(0,0), (0,1), (0,2), (0,3), (0,4),
                                            (1,0), (1,1), (1,2), (1,3), (1,4),
                                            (2,0), (2,1), (2,2), (2,3), (2,4),
                                            (3,0), (3,1), (3,2), (3,3), (3,4)], ["user", "item"])

# Normalize rating data to [0,1] range based on max rating
colmax = dfCounts2.select(F.max('rating')).collect()[0].asDict().values()[0]
normalize = udf(lambda x: x/colmax, FloatType())
dfCountsNorm = dfCounts2.withColumn('ratingNorm', normalize(col('rating')))

alsImplicit = ALS(implicitPrefs=True)
defaultModelImplicit = alsImplicit.fit(dfCountsNorm)
preds = defaultModelImplicit.transform(dfCountsTest2)

evaluatorR2 = RegressionEvaluator(metricName="rmse", labelCol="ratingNorm")
evaluatorR2.evaluate(defaultModelImplicit.transform(dfCountsNorm))

preds = defaultModelImplicit.transform(dfCountsTest2)

Я не понимаю, почему следующее не работает. Я использую тот же оценщик, тот же оценщик и подгоняю те же данные. Почему они работают выше, но не в CrossValidator:

paramMapImplicit = ParamGridBuilder() \
                    .addGrid(alsImplicit.rank, [8, 12]) \
                    .addGrid(alsImplicit.maxIter, [10, 15]) \
                    .addGrid(alsImplicit.regParam, [1.0, 10.0]) \
                    .addGrid(alsImplicit.alpha, [2.0,3.0]) \
                    .build()

cv = CrossValidator(estimator=alsImplicit, estimatorParamMaps=paramMapImplicit, evaluator=evaluatorR2)
cvModel = cv.fit(dfCountsNorm)

Ответы на вопрос(2)

Ваш ответ на вопрос