Zurückgeben mehrerer Arrays aus einer benutzerdefinierten Aggregatfunktion (UDAF) in Apache Spark SQL
Ich versuche, eine benutzerdefinierte Aggregatfunktion (UDAF) in Java mit Apache Spark SQL zu erstellen, die nach Abschluss mehrere Arrays zurückgibt. Ich habe online gesucht und kann keine Beispiele oder Vorschläge dazu finden.
Ich kann ein einzelnes Array zurückgeben, kann jedoch nicht herausfinden, wie die Daten mit der Methode valu () für die Rückgabe mehrerer Arrays im richtigen Format abgerufen werden.
Die UDAF funktioniert, da ich die Arrays mit der Methode valu () ausdrucken kann. Ich kann nur nicht herausfinden, wie diese Arrays an den aufrufenden Code zurückgegeben werden (der unten als Referenz gezeigt wird).
UserDefinedAggregateFunction customUDAF = new CustomUDAF();
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data");
Ich habe die gesamte benutzerdefinierte UDAF-Klasse unten eingeschlossen, aber die wichtigsten Methoden sind dataType () und evaluation methods (), die zuerst angezeigt werden.
Jede Hilfe oder Beratung wäre sehr dankbar. Vielen Dank
public class CustomUDAF extends UserDefinedAggregateFunction {
@Override
public DataType dataType() {
// TODO: Is this the correct way to return 2 arrays?
return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
.add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
}
@Override
public Object evaluate(Row buffer) {
// Data conversion
List<Long> longList = new ArrayList<Long>(buffer.getList(0));
List<Double> dataList = new ArrayList<Double>(buffer.getList(1));
// Processing of data (omitted)
// TODO: How to get data into format needed to return 2 arrays?
return dataList;
}
@Override
public StructType inputSchema() {
return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType);
}
@Override
public StructType bufferSchema() {
return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
.add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
}
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, new ArrayList<Long>());
buffer.update(1, new ArrayList<Double>());
}
@Override
public void update(MutableAggregationBuffer buffer, Row row) {
ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0));
longList.add(row.getLong(0));
ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1));
dataList.add(row.getDouble(1));
buffer.update(0, longList);
buffer.update(1, dataList);
}
@Override
public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0));
longList.addAll(buffer2.getList(0));
ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1));
dataList.addAll(buffer2.getList(1));
buffer1.update(0, longList);
buffer1.update(1, dataList);
}
@Override
public boolean deterministic() {
return true;
}
}
Aktualisiere: Basierend auf der Antwort von zero323 konnte ich zwei Arrays zurückgeben mit:
return new Tuple2<>(longArray, dataArray);
Das Herausholen der Daten war etwas mühsam, erforderte jedoch das Dekonstruieren des DataFrame in Java-Listen und das anschließende Zurücksetzen in einen DataFrame.