Composición de la función de fila de PySpark

Como ejemplo simplificado, tengo un marco de datos "df" con las columnas "col1, col2" y quiero calcular un máximo en filas después de aplicar una función a cada columna:

def f(x):
    return (x+1)

max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())

df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))

Entonces si df:

col1   col2
1      2
3      0

Entonces

df2:

col1   col2  result
1      2     3
3      0     4

Lo anterior no parece funcionar y produce "No se puede evaluar la expresión: PythonUDF # f ..."

Estoy absolutamente seguro de que "f_udf" funciona bien en mi mesa, y el problema principal es con max_udf.

Sin crear columnas adicionales o usar map / reduce básico, ¿hay alguna manera de hacer lo anterior usando marcos de datos y udfs? ¿Cómo debo modificar "max_udf"?

También he intentado:

max_udf=udf(max, IntegerType())

que produce el mismo error.

También he confirmado que lo siguiente funciona:

df2=(df.withColumn("temp1", f_udf(df.col1))
       .withColumn("temp2", f_udf(df.col2))

df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))

¿Por qué no puedo hacer esto de una vez?

Me gustaría ver una respuesta que generalice a cualquier función "f_udf" y "max_udf".

Respuestas a la pregunta(2)

Su respuesta a la pregunta