Dynamischer Tabellenname beim Schreiben von Datenfluss-Pipelines in BQ

Als Folgefrage zu folgender Frage und Antwort:

https: //stackoverflow.com/questions/31156774/about-key-grouping-with-groupbyke

Ich möchte mit dem google dataflow engineering team bestätigen @ jkff) Wenn die dritte von Eugene vorgeschlagene Option mit Google Dataflow überhaupt möglich ist:

"Haben Sie ein ParDo, das diese Schlüssel annimmt und die BigQuery-Tabellen erstellt, und ein anderes ParDo, das die Daten und Streams annimmt, schreibt in die Tabellen"

Mein Verständnis ist, dass ParDo / DoFn jedes Element verarbeitet. Wie können wir einen Tabellennamen (Funktion der von Seiteneingaben übergebenen Schlüssel) angeben, wenn wir aus processElement eines ParDo / DoFn ausschreiben?

Vielen Dank

Aktualisier mit einer DoFn, die offensichtlich nicht funktioniert, da c.element (). value keine pcollection ist.

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

Antworten auf die Frage(2)

Ihre Antwort auf die Frage