Kann eine Datenspeichereingabe in der Google-Datenfluss-Pipeline in einem Stapel von N Einträgen gleichzeitig verarbeitet werden?

Ich versuche, einen Datenfluss-Pipeline-Job auszuführen, der eine Funktion auf @ ausführen würdN Einträge gleichzeitig vom Datenspeicher. In meinem Fall sendet diese Funktion einen Stapel von 100 Einträgen als Nutzlast an einen REST-Service. Dies bedeutet, dass ich alle Einträge einer Datenspeicherentität durchgehen und @ senden möcht 100 gestapelte Einträge auf einmal zu einem externen REST-Service.

Meine aktuelle Lösung

Eingabe vom Datenspeicher lesenErstellen Sie so viele Schlüssel, wie Worker in den Pipeline-Optionen angegeben sind (1 Worker = 1 Schlüssel).Group by key, damit wir den Iterator als Ausgabe erhalten (Iterator-Eingabe in Schritt Nr. 4)Stapeln Sie Benutzer programmgesteuert in eine temporäre Liste und senden Sie sie als Stapel an den REST-Endpunkt.

Über beschriebenes Szenario im Pseudocode (Details ignorieren):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // 4. Programatically batch users
    .apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            List<EntryPOJO> batchedEntries = new ArrayList<>();
            for (EntryPOJO entry : c.element().getValue()) {
                if (batchedEntries.size() >= BATCH_SIZE) {
                    sendToRESTEndpoint(batchedEntries);
                    batchedEntries = new ArrayList<>();
                }
                batchedEntries.add(entry);
            }
            sendToRESTEndpoint(batchedEntries);
        }
    }));

Hauptproblem mit meiner aktuellen Lösung

GroupByKey blockiert die Ausführung des letzten ParDo (blockiert Schritt Nr. 4), bis alle Einträge einem Schlüssel zugewiesen sind.

Lösung funktioniert in der Regel, aber ich möchte alles parallel machen senden Sie einen Stapel von 100 Einträgen unmittelbar nach dem Laden aus dem Datenspeicher @ an den REST-Endpunk), was mit meiner aktuellen Lösung nicht möglich ist, da GroupByKey keine Daten ausgibt, bis jeder Eintrag aus der Datenbank abgerufen und in das Schlüssel-Wert-Paar eingefügt wird. Die Ausführung erfolgt also in zwei Schritten: 1. Holen Sie alle Daten aus dem Datenspeicher und weisen Sie ihm einen Schlüssel zu. 2. Verarbeiten Sie die Einträge als Batch

Frag

Also, ich würde gerne wissen, ob es ein vorhandenes Feature gibt, um dies zu tun. Oder zumindest, um ohne den GroupByKey-Schritt iterierbar zu werden, damit die Stapelverarbeitungsfunktion nicht darauf warten muss, dass Daten gesichert werden.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage