Динамическое имя таблицы при записи в BQ из конвейеров потока данных

В качестве дополнительного вопроса на следующий вопрос и ответ:

https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey

Я хотел бы подтвердить с командой инженеров потока данных Google (@jkff) если 3-й вариант, предложенный Евгением, вообще возможен с потоком данных Google:

«есть ParDo, который берет эти ключи и создает таблицы BigQuery, и другой ParDo, который принимает данные и записывает потоки в таблицы»

Насколько я понимаю, ParDo / DoFn будет обрабатывать каждый элемент, как мы можем указать имя таблицы (функция ключей, передаваемых с боковых входов) при записи из processElement ParDo / DoFn?

Благодарю.

обновленный с DoFn, который явно не работает, поскольку c.element (). value не является коллекцией.

PCollection<KV<String, Iterable<String>>> output = ...;

public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {

private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
        this.keysAsSideinputs = keysAsSideinputs;
    }

@Override
    public void processElement(ProcessContext c) {
        List<String> keys = c.sideInput(keysAsSideinputs);
        String key = c.element().getKey();

        //the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
        c.element().getValue().apply(Pardo.of(new FormatLineFn()))
                .apply(TextIO.Write.to(key));

        c.output(1);
    }    
}    

Ответы на вопрос(1)

Ваш ответ на вопрос