Atributos de referencia de Spark SQL de UDT

Estoy tratando de implementar un UDT personalizado y poder hacer referencia a él desde Spark SQL (como se explica en el documento técnico de Spark SQL, sección 4.4.2).

El ejemplo real es tener un UDT personalizado respaldado por una estructura de datos fuera del montón utilizando Cap'n Proto, o similar.

Para esta publicación, he inventado un ejemplo artificial. Sé que podría usar las clases de caso Scala y no tener que hacer ningún trabajo, pero ese no es mi objetivo.

Por ejemplo, tengo unPerson que contiene varios atributos y quiero poderSELECT person.first_name FROM person. Me encuentro con el errorCan't extract value from person#1 Y no estoy seguro de por qué.

Aquí está la fuente completa (también disponible enhttps://github.com/andygrove/spark-sql-udt)

package com.theotherandygrove

import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object Example {

  def main(arg: Array[String]): Unit = {

    val conf = new SparkConf()
      .setAppName("Example")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val schema = StructType(List(
      StructField("person_id", DataTypes.IntegerType, true),
      StructField("person", new MockPersonUDT, true)))

    // load initial RDD
    val rdd = sc.parallelize(List(
      MockPersonImpl(1),
      MockPersonImpl(2)
    ))

    // convert to RDD[Row]
    val rowRdd = rdd.map(person => Row(person.getAge, person))

    // convert to DataFrame (RDD + Schema)
    val dataFrame = sqlContext.createDataFrame(rowRdd, schema)

    // register as a table
    dataFrame.registerTempTable("person")

    // selecting the whole object works fine
    val results = sqlContext.sql("SELECT person.first_name FROM person WHERE person.age < 100")

    val people = results.collect

    people.map(row => {
      println(row)
    })

  }

}
trait MockPerson {
  def getFirstName: String
  def getLastName: String
  def getAge: Integer
  def getState: String
}

class MockPersonUDT extends UserDefinedType[MockPerson] {

  override def sqlType: DataType = StructType(List(
    StructField("firstName", StringType, nullable=false),
    StructField("lastName", StringType, nullable=false),
    StructField("age", IntegerType, nullable=false),
    StructField("state", StringType, nullable=false)
  ))

  override def userClass: Class[MockPerson] = classOf[MockPerson]

  override def serialize(obj: Any): Any = obj.asInstanceOf[MockPersonImpl].getAge

  override def deserialize(datum: Any): MockPerson = MockPersonImpl(datum.asInstanceOf[Integer])
}

@SQLUserDefinedType(udt = classOf[MockPersonUDT])
@SerialVersionUID(123L)
case class MockPersonImpl(n: Integer) extends MockPerson with Serializable {
  def getFirstName = "First" + n
  def getLastName = "Last" + n
  def getAge = n
  def getState = "AK"
}

Si yo simplementeSELECT person FROM person entonces la consulta funciona. Simplemente no puedo hacer referencia a los atributos en SQL, aunque estén definidos en el esquema.

Respuestas a la pregunta(1)

Su respuesta a la pregunta