Nome da tabela dinâmica ao gravar no BQ a partir de pipelines de fluxo de dados
Como uma pergunta de acompanhamento para a seguinte pergunta e resposta:
https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey
Gostaria de confirmar com a equipe de engenharia do Google Dataflow (@jkff) se a terceira opção proposta por Eugene for possível com o fluxo de dados do Google:
"tem um ParDo que pega essas chaves e cria as tabelas do BigQuery e outro ParDo que leva os dados e os fluxos de gravação para as tabelas"
Meu entendimento é que o ParDo / DoFn processará cada elemento. Como podemos especificar um nome de tabela (função das chaves passadas pelas entradas laterais) ao escrever a partir do processElement de um ParDo / DoFn?
Obrigado.
Atualizada com um DoFn, que não está funcionando obviamente, pois o valor c.element (). não é uma coleção de pc.
PCollection<KV<String, Iterable<String>>> output = ...;
public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {
private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
this.keysAsSideinputs = keysAsSideinputs;
}
@Override
public void processElement(ProcessContext c) {
List<String> keys = c.sideInput(keysAsSideinputs);
String key = c.element().getKey();
//the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
c.element().getValue().apply(Pardo.of(new FormatLineFn()))
.apply(TextIO.Write.to(key));
c.output(1);
}
}