Iterar un RDD y actualizar una colección mutable devuelve una colección vacía

Soy nuevo en Scala y Spark y me gustaría recibir ayuda para comprender por qué el siguiente código no produce el resultado deseado.

Estoy comparando dos tablas

Mi esquema de salida deseado es:

case class DiscrepancyData(fieldKey:String, fieldName:String, val1:String, val2:String, valExpected:String)

Cuando ejecuto el siguiente código paso a paso manualmente, en realidad termino con el resultado deseado. El cual es unList[DiscrepancyData] completamente poblado con mi salida deseada. Sin embargo, me falta algo en el código a continuación porque devuelve una lista vacía (antes de que se llame a este código, hay otros códigos que intervienen en la lectura de tablas de HIVE, mapeo, agrupación, filtrado, etc., etc.):

val compareCols  = Set(year, nominal, adjusted_for_inflation, average_private_nonsupervisory_wage)

val key = "year"

def compare(table:RDD[(String, Iterable[Row])]): List[DiscrepancyData] = {
    var discs: ListBuffer[DiscrepancyData] = ListBuffer()
    def compareFields(fieldOne:String, fieldTwo:String, colName:String, row1:Row, row2:Row): DiscrepancyData = {
        if (fieldOne != fieldTwo){
            DiscrepancyData(
                row1.getAs(key).toString, //fieldKey
                colName, //fieldName
                row1.getAs(colName).toString, //table1Value
                row2.getAs(colName).toString, //table2Value
                row2.getAs(colName).toString) //expectedValue
        }
        else null
    }
    def comparison() {
        for(row <- table){
            var elem1 = row._2.head //gets the first element in the iterable
            var elem2 = row._2.tail.head //gets the second element in the iterable

            for(col <- compareCols){
                var value1 = elem1.getAs(col).toString
                var value2 = elem2.getAs(col).toString

                var disc = compareFields(value1, value2, col, elem1, elem2)

                if (disc != null) discs += disc
            }
        }
    }

    comparison()

    discs.toList
}

Estoy llamando a la función anterior como tal:

var outcome = compare(groupedFiltered)

Aquí están los datos en groupedFiltered:

(1991,CompactBuffer([1991,7.14,5.72,39%], [1991,4.14,5.72,39%]))
(1997,CompactBuffer([1997,4.88,5.86,39%], [1997,3.88,5.86,39%]))
(1999,CompactBuffer([1999,5.15,5.96,39%], [1999,5.15,5.97,38%]))
(1947,CompactBuffer([1947,0.9,2.94,35%], [1947,0.4,2.94,35%]))
(1980,CompactBuffer([1980,3.1,6.88,45%], [1980,3.1,6.88,48%]))
(1981,CompactBuffer([1981,3.15,6.8,45%], [1981,3.35,6.8,45%]))

El esquema de tabla para groupedFiltered:

(year String, 
nominal Double,
adjusted_for_inflation Double, 
average_provate_nonsupervisory_wage String)

Respuestas a la pregunta(2)

Su respuesta a la pregunta