Issue with UDF auf einer Vektorspalte in PySpark DataFrame
ch habe Probleme mit der Verwendung einer UDF für eine Vektorspalte in PySpark, die hier dargestellt werden kan
from pyspark import SparkContext
from pyspark.sql import Row
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
from pyspark.mllib.linalg import Vectors
FeatureRow = Row('id', 'features')
data = sc.parallelize([(0, Vectors.dense([9.7, 1.0, -3.2])),
(1, Vectors.dense([2.25, -11.1, 123.2])),
(2, Vectors.dense([-7.2, 1.0, -3.2]))])
df = data.map(lambda r: FeatureRow(*r)).toDF()
vector_udf = udf(lambda vector: sum(vector), DoubleType())
df.withColumn('feature_sums', vector_udf(df.features)).first()
Dies schlägt mit dem folgenden Stack-Trace fehl:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 31.0 failed 1 times, most recent failure: Lost task 5.0 in stage 31.0 (TID 95, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
x1 File "/Users/colin/src/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/Users/colin/src/spark/python/pyspark/sql/functions.py", line 469, in <lambda>
func = lambda _, it: map(lambda x: f(*x), it)
File "/Users/colin/pokitdok/spark_mapper/spark_mapper/filters.py", line 143, in <lambda>
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
Beobachtet man, was an die UDF weitergereicht wird, scheint es etwas Seltsames zu geben. Das übergebene Argument sollte ein Vector sein, stattdessen wird ein Python-Tupel wie das folgende übergeben:
(1, None, None, [9.7, 1.0, -3.2])
Ist es nicht möglich, UDFs für DataFrame-Spalten von Vektoren zu verwenden?
BEARBEITE
So wurde auf der Mailingliste darauf hingewiesen, dass dies ein @ ibekanntes Proble. Ich werde die Antwort von @hyim akzeptieren, da dies eine vorübergehende Problemumgehung für dichte Vektoren darstellt.