Spark UDAF con ArrayType como problemas de rendimiento de bufferSchema

Estoy trabajando en un UDAF que devuelve una variedad de elementos.

La entrada para cada actualización es una tupla de índice y valor.

Lo que hace el UDAF es sumar todos los valores bajo el mismo índice.

Ejemplo:

Para entrada (índice, valor): (2,1), (3,1), (2,3)

debería volver (0,0,4,1, ..., 0)

La lógica funciona bien, pero tengo un problema con elmétodo de actualización, solo mi implementaciónactualiza 1 celda para cada fila, pero la última asignación en ese método en realidadcopia toda la matriz - que es redundante y consume mucho tiempo.

Esta tarea sola es responsable de98% del tiempo de ejecución de mi consulta.

Mi pregunta es, ¿cómo puedo reducir ese tiempo? ¿Es posible asignar 1 valor en la matriz del búfer sin tener que reemplazar todo el búfer?

P.S .: Estoy trabajando con Spark 1.6, y no puedo actualizarlo en el corto plazo, por lo tanto, quédese con la solución que funcionaría con esta versión.

class SumArrayAtIndexUDAF() extends UserDefinedAggregateFunction{

  val bucketSize = 1000

  def inputSchema: StructType =  StructType(StructField("index",LongType) :: StructField("value",LongType) :: Nil)

  def dataType: DataType = ArrayType(LongType)

  def deterministic: Boolean = true

  def bufferSchema: StructType = {
    StructType(
      StructField("buckets", ArrayType(LongType)) :: Nil  
    )
  }

  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = new Array[Long](bucketSize)
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    val index = input.getLong(0)
    val value = input.getLong(1)

    val arr = buffer.getAs[mutable.WrappedArray[Long]](0)

    buffer(0) = arr   // TODO THIS TAKES WAYYYYY TOO LONG - it actually copies the entire array for every call to this method (which essentially updates only 1 cell)
  }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    val arr1 = buffer1.getAs[mutable.WrappedArray[Long]](0)
    val arr2 = buffer2.getAs[mutable.WrappedArray[Long]](0)

    for(i <- arr1.indices){
      arr1.update(i, arr1(i) + arr2(i))
    }

    buffer1(0) = arr1
  }

  override def evaluate(buffer: Row): Any = {
    buffer.getAs[mutable.WrappedArray[Long]](0)
  }
}

Respuestas a la pregunta(1)

Su respuesta a la pregunta