Iterar un RDD y actualizar una colección mutable devuelve una colección vacía
Soy nuevo en Scala y Spark y me gustaría recibir ayuda para comprender por qué el siguiente código no produce el resultado deseado.
Estoy comparando dos tablas
Mi esquema de salida deseado es:
case class DiscrepancyData(fieldKey:String, fieldName:String, val1:String, val2:String, valExpected:String)
Cuando ejecuto el siguiente código paso a paso manualmente, en realidad termino con el resultado deseado. El cual es unList[DiscrepancyData]
completamente poblado con mi salida deseada. Sin embargo, me falta algo en el código a continuación porque devuelve una lista vacía (antes de que se llame a este código, hay otros códigos que intervienen en la lectura de tablas de HIVE, mapeo, agrupación, filtrado, etc., etc.):
val compareCols = Set(year, nominal, adjusted_for_inflation, average_private_nonsupervisory_wage)
val key = "year"
def compare(table:RDD[(String, Iterable[Row])]): List[DiscrepancyData] = {
var discs: ListBuffer[DiscrepancyData] = ListBuffer()
def compareFields(fieldOne:String, fieldTwo:String, colName:String, row1:Row, row2:Row): DiscrepancyData = {
if (fieldOne != fieldTwo){
DiscrepancyData(
row1.getAs(key).toString, //fieldKey
colName, //fieldName
row1.getAs(colName).toString, //table1Value
row2.getAs(colName).toString, //table2Value
row2.getAs(colName).toString) //expectedValue
}
else null
}
def comparison() {
for(row <- table){
var elem1 = row._2.head //gets the first element in the iterable
var elem2 = row._2.tail.head //gets the second element in the iterable
for(col <- compareCols){
var value1 = elem1.getAs(col).toString
var value2 = elem2.getAs(col).toString
var disc = compareFields(value1, value2, col, elem1, elem2)
if (disc != null) discs += disc
}
}
}
comparison()
discs.toList
}
Estoy llamando a la función anterior como tal:
var outcome = compare(groupedFiltered)
Aquí están los datos en groupedFiltered:
(1991,CompactBuffer([1991,7.14,5.72,39%], [1991,4.14,5.72,39%]))
(1997,CompactBuffer([1997,4.88,5.86,39%], [1997,3.88,5.86,39%]))
(1999,CompactBuffer([1999,5.15,5.96,39%], [1999,5.15,5.97,38%]))
(1947,CompactBuffer([1947,0.9,2.94,35%], [1947,0.4,2.94,35%]))
(1980,CompactBuffer([1980,3.1,6.88,45%], [1980,3.1,6.88,48%]))
(1981,CompactBuffer([1981,3.15,6.8,45%], [1981,3.35,6.8,45%]))
El esquema de tabla para groupedFiltered:
(year String,
nominal Double,
adjusted_for_inflation Double,
average_provate_nonsupervisory_wage String)