Spark UDF para StructType / Row
Eu tenho uma coluna "StructType" no Spark Dataframe que possui uma matriz e uma string como subcampos. Gostaria de modificar a matriz e retornar a nova coluna do mesmo tipo. Posso processá-lo com UDF? Ou quais são as alternativas?
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val sub_schema = StructType(StructField("col1",ArrayType(IntegerType,false),true) :: StructField("col2",StringType,true)::Nil)
val schema = StructType(StructField("subtable", sub_schema,true) :: Nil)
val data = Seq(Row(Row(Array(1,2),"eb")), Row(Row(Array(3,2,1), "dsf")) )
val rd = sc.parallelize(data)
val df = spark.createDataFrame(rd, schema)
df.printSchema
root
|-- subtable: struct (nullable = true)
| |-- col1: array (nullable = true)
| | |-- element: integer (containsNull = false)
| |-- col2: string (nullable = true)
Parece que eu preciso de uma UDF do tipo Row, algo como
val u = udf((x:Row) => x)
>> Schema for type org.apache.spark.sql.Row is not supported
Isso faz sentido, pois o Spark não conhece o esquema para o tipo de retorno. Infelizmente, o udf.register também falha:
spark.udf.register("foo", (x:Row)=> Row, sub_schema)
<console>:30: error: overloaded method value register with alternatives: ...