Ошибка StackOverflow при применении pyspark ALS «рекомендуемые продукты для пользователей» (хотя доступен кластер> 300 ГБ оперативной памяти)

Ищите экспертизу, чтобы вести меня в проблеме ниже.

Фон:

Я пытаюсь начать работу с базовым скриптом PySpark, вдохновленнымэтот примерВ качестве инфраструктуры развертывания я использую кластер Google Cloud Dataproc.Краеугольным камнем в моем коде является задокументированная функция «RecommendedProductsForUsers»Вот который возвращает мне лучшие продукты X для всех пользователей модели

Проблема, которую я беру на себя

Скрипт ALS.Train работает без сбоев и хорошо масштабируется на GCP (легко> 1 млн клиентов).

Однако применение прогнозов: т. Е. С помощью функций «PredictAll» или «рекомендуемые продукты для пользователей» не масштабируется вообще. Мой сценарий работает гладко для небольшого набора данных (<100 клиентов с <100 продуктами). Однако при доведении его до значимого для бизнеса размера мне не удается его масштабировать (например,> 50 000 клиентов и> 10 000 продуктов)

Ошибка, которую я тогда получаю ниже:

 16/08/16 14:38:56 WARN org.apache.spark.scheduler.TaskSetManager:
   Lost task 22.0 in stage 411.0 (TID 15139,
   productrecommendation-high-w-2.c.main-nova-558.internal):
   java.lang.StackOverflowError
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
        at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
        at scala.collection.immutable.$colon$colon.readObject(List.scala:362)

Я даже дошел до получения кластера 300 ГБ (1 основной узел 108 ГБ + 2 узла 108 ГБ ОЗУ) для его запуска; это работает для клиентов 50 000, но не для чего-то большего

У меня есть амбиции, чтобы я мог работать на> 800 000 клиентов

подробности

Строка кода, где она терпит неудачу

predictions = model.recommendProductsForUsers(10).flatMap(lambda p: p[1]).map(lambda p: (str(p[0]), str(p[1]), float(p[2])))
pprint.pprint(predictions.take(10))
schema = StructType([StructField("customer", StringType(), True), StructField("sku", StringType(), True), StructField("prediction", FloatType(), True)])
dfToSave = sqlContext.createDataFrame(predictions, schema).dropDuplicates()

Как вы предлагаете продолжить? Я чувствую, что часть слияния в конце моего скрипта (т.е. когда я пишу это в dfToSave) вызывает ошибку; Есть ли способ обойти это и сохранить по частям?

Ответы на вопрос(1)

Ваш ответ на вопрос