Zurückgeben mehrerer Arrays aus einer benutzerdefinierten Aggregatfunktion (UDAF) in Apache Spark SQL

Ich versuche, eine benutzerdefinierte Aggregatfunktion (UDAF) in Java mit Apache Spark SQL zu erstellen, die nach Abschluss mehrere Arrays zurückgibt. Ich habe online gesucht und kann keine Beispiele oder Vorschläge dazu finden.

Ich kann ein einzelnes Array zurückgeben, kann jedoch nicht herausfinden, wie die Daten mit der Methode valu () für die Rückgabe mehrerer Arrays im richtigen Format abgerufen werden.

Die UDAF funktioniert, da ich die Arrays mit der Methode valu () ausdrucken kann. Ich kann nur nicht herausfinden, wie diese Arrays an den aufrufenden Code zurückgegeben werden (der unten als Referenz gezeigt wird).

UserDefinedAggregateFunction customUDAF = new CustomUDAF();
DataFrame resultingDataFrame = dataFrame.groupBy().agg(customUDAF.apply(dataFrame.col("long_col"), dataFrame.col("double_col"))).as("processed_data");

Ich habe die gesamte benutzerdefinierte UDAF-Klasse unten eingeschlossen, aber die wichtigsten Methoden sind dataType () und evaluation methods (), die zuerst angezeigt werden.

Jede Hilfe oder Beratung wäre sehr dankbar. Vielen Dank

public class CustomUDAF extends UserDefinedAggregateFunction {

    @Override
    public DataType dataType() {
        // TODO: Is this the correct way to return 2 arrays?
        return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
            .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
    }

    @Override
    public Object evaluate(Row buffer) {
        // Data conversion
        List<Long> longList = new ArrayList<Long>(buffer.getList(0));
        List<Double> dataList = new ArrayList<Double>(buffer.getList(1));

        // Processing of data (omitted)

        // TODO: How to get data into format needed to return 2 arrays?
        return dataList;
    }

    @Override
    public StructType inputSchema() {
        return new StructType().add("long", DataTypes.LongType).add("data", DataTypes.DoubleType);
    }

    @Override
    public StructType bufferSchema() {
        return new StructType().add("longArray", DataTypes.createArrayType(DataTypes.LongType, false))
            .add("dataArray", DataTypes.createArrayType(DataTypes.DoubleType, false));
    }

    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0, new ArrayList<Long>());
        buffer.update(1, new ArrayList<Double>());
    }

    @Override
    public void update(MutableAggregationBuffer buffer, Row row) {
        ArrayList<Long> longList = new ArrayList<Long>(buffer.getList(0));
        longList.add(row.getLong(0));

        ArrayList<Double> dataList = new ArrayList<Double>(buffer.getList(1));
        dataList.add(row.getDouble(1));

        buffer.update(0, longList);
        buffer.update(1, dataList);
    }

    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        ArrayList<Long> longList = new ArrayList<Long>(buffer1.getList(0));
        longList.addAll(buffer2.getList(0));

        ArrayList<Double> dataList = new ArrayList<Double>(buffer1.getList(1));
        dataList.addAll(buffer2.getList(1));

        buffer1.update(0, longList);
        buffer1.update(1, dataList);
    }

    @Override
    public boolean deterministic() {
        return true;
    }
}

Aktualisiere: Basierend auf der Antwort von zero323 konnte ich zwei Arrays zurückgeben mit:

return new Tuple2<>(longArray, dataArray);

Das Herausholen der Daten war etwas mühsam, erforderte jedoch das Dekonstruieren des DataFrame in Java-Listen und das anschließende Zurücksetzen in einen DataFrame.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage