Pyspark py4j PickleException: "erwartete keine Argumente für die Erstellung von ClassDict"

Diese Frage richtet sich an Personen, die mit py4j vertraut sind - und kann zur Behebung eines Beizfehlers beitragen. Ich versuche, dem Pyspark PythonMLLibAPI eine Methode hinzuzufügen, die eine RDD eines namedtuple akzeptiert, etwas Arbeit leistet und ein Ergebnis in Form einer RDD zurückgibt.

Diese Methode ist der Methode PYthonMLLibAPI.trainALSModel () nachempfunden, deren analogesbestehende relevante Teile sind:

  def trainALSModel(
    ratingsJRDD: JavaRDD[Rating],
    .. )

Dasbestehendeie zum Modellieren des neuen Codes verwendete @ python-Bewertungsklasse lautet:

class Rating(namedtuple("Rating", ["user", "product", "rating"])):
    def __reduce__(self):
        return Rating, (int(self.user), int(self.product), float(self.rating))

Hier ist der Versuch Also hier sind die relevanten Klassen:

Ne Python-Klasse pyspark.mllib.clustering.MatrixEntry:

from collections import namedtuple
class MatrixEntry(namedtuple("MatrixEntry", ["x","y","weight"])):
    def __reduce__(self):
        return MatrixEntry, (long(self.x), long(self.y), float(self.weight))

Ne Methode foobarRDD In PythonMLLibAPI:

  def foobarRdd(
    data: JavaRDD[MatrixEntry]): RDD[FooBarResult] = {
    val rdd = data.rdd.map { d => FooBarResult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)}
    rdd
  }

Jetzt lass es uns ausprobieren:

from pyspark.mllib.clustering import MatrixEntry

def convert_to_MatrixEntry(tuple):
  return MatrixEntry(*tuple)

from pyspark.mllib.clustering import *
pic = PowerIterationClusteringModel(2)
tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)]
trdd = sc.parallelize(map(convert_to_MatrixEntry,tups))

# print out the RDD on python side just for validation
print "%s" %(repr(trdd.collect()))

from pyspark.mllib.common import callMLlibFunc
pic = callMLlibFunc("foobar", trdd)

Relevante Teile der Ergebnisse:

[(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5]

was zeigt, dass der Eingang rdd 'ganz' ist. Allerdings war das Beizen unglücklich:

5/04/27 21:15:44 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 14)
net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict
(for pyspark.mllib.clustering.MatrixEntry)
    at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
    at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:617)
    at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:170)
    at net.razorvine.pickle.Unpickler.load(Unpickler.java:84)
    at net.razorvine.pickle.Unpickler.loads(Unpickler.java:97)
    at org.apache.spark.mllib.api.python.SerDe$anonfun$pythonToJava$1$anonfun$apply$1.apply(PythonMLLibAPI.scala:1167)
    at org.apache.spark.mllib.api.python.SerDe$anonfun$pythonToJava$1$anonfun$apply$1.apply(PythonMLLibAPI.scala:1166)
    at scala.collection.Iterator$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$anonfun$17.apply(RDD.scala:819)
    at org.apache.spark.rdd.RDD$anonfun$17.apply(RDD.scala:819)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1523)
    at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:1523)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:212)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

Below ist eine visuelle Darstellung des Python-Aufruf-Stack-Trace:

Antworten auf die Frage(6)

Ihre Antwort auf die Frage