PySpark zeilenweise Funktionszusammensetzung

Als vereinfachtes Beispiel habe ich einen Datenrahmen "df" mit den Spalten "col1, col2" und möchte nach dem Anwenden einer Funktion auf jede Spalte ein zeilenweises Maximum berechnen:

def f(x):
    return (x+1)

max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))

Also wenn df:

col1   col2
1      2
3      0

Dan

df2:

col1   col2  result
1      2     3
3      0     4

Obige scheint nicht zu funktionieren und erzeugt "Ausdruck kann nicht ausgewertet werden: PythonUDF # f ..."

Ich bin absolut sicher, dass "f_udf" auf meinem Tisch gut funktioniert, und das Hauptproblem ist das mit dem max_udf.

Wenn Sie keine zusätzlichen Spalten erstellen oder eine einfache Karte / Verkleinerung verwenden, gibt es eine Möglichkeit, dies vollständig mit DataFrames und UdFS zu tun? Wie soll ich "max_udf" ändern?

Ich habe es auch versucht:

max_udf=udf(max, IntegerType())

Das erzeugt den gleichen Fehler.

Ich habe auch bestätigt, dass das folgende funktioniert:

df2=(df.withColumn("temp1", f_udf(df.col1))
       .withColumn("temp2", f_udf(df.col2))

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))

Warum kann ich das nicht auf einmal machen?

Ich würde gerne eine Antwort sehen, die auf die Funktionen "f_udf" und "max_udf" verallgemeinert wird.

Antworten auf die Frage(4)

Ihre Antwort auf die Frage