Composición de la función de fila de PySpark
Como ejemplo simplificado, tengo un marco de datos "df" con las columnas "col1, col2" y quiero calcular un máximo en filas después de aplicar una función a cada columna:
def f(x):
return (x+1)
max_udf=udf(lambda x,y: max(x,y), IntegerType())
f_udf=udf(f, IntegerType())
df2=df.withColumn("result", max_udf(f_udf(df.col1),f_udf(df.col2)))
Entonces si df:
col1 col2
1 2
3 0
Entonces
df2:
col1 col2 result
1 2 3
3 0 4
Lo anterior no parece funcionar y produce "No se puede evaluar la expresión: PythonUDF # f ..."
Estoy absolutamente seguro de que "f_udf" funciona bien en mi mesa, y el problema principal es con max_udf.
Sin crear columnas adicionales o usar map / reduce básico, ¿hay alguna manera de hacer lo anterior usando marcos de datos y udfs? ¿Cómo debo modificar "max_udf"?
También he intentado:
max_udf=udf(max, IntegerType())
que produce el mismo error.
También he confirmado que lo siguiente funciona:
df2=(df.withColumn("temp1", f_udf(df.col1))
.withColumn("temp2", f_udf(df.col2))
df2=df2.withColumn("result", max_udf(df2.temp1,df2.temp2))
¿Por qué no puedo hacer esto de una vez?
Me gustaría ver una respuesta que generalice a cualquier función "f_udf" y "max_udf".