Nombre de tabla dinámica al escribir en BQ desde tuberías de flujo de datos
Como una pregunta de seguimiento a la siguiente pregunta y respuesta:
https://stackoverflow.com/questions/31156774/about-key-grouping-with-groupbykey
Me gustaría confirmar con el equipo de ingeniería de flujo de datos de Google (@jkff) si la tercera opción propuesta por Eugene es posible con el flujo de datos de Google:
"tenga un ParDo que tome estas claves y cree las tablas BigQuery, y otro ParDo que tome los datos y las secuencias escritas en las tablas"
Entiendo que ParDo / DoFn procesará cada elemento, ¿cómo podríamos especificar un nombre de tabla (función de las teclas que se pasan desde las entradas laterales) al escribir desde processElement de un ParDo / DoFn?
Gracias.
Actualizado con un DoFn, que obviamente no funciona ya que c.element (). value no es una colección.
PCollection<KV<String, Iterable<String>>> output = ...;
public class DynamicOutput2Fn extends DoFn<KV<String, Iterable<String>>, Integer> {
private final PCollectionView<List<String>> keysAsSideinputs;
public DynamicOutput2Fn(PCollectionView<List<String>> keysAsSideinputs) {
this.keysAsSideinputs = keysAsSideinputs;
}
@Override
public void processElement(ProcessContext c) {
List<String> keys = c.sideInput(keysAsSideinputs);
String key = c.element().getKey();
//the below is not working!!! How could we write the value out to a sink, be it gcs file or bq table???
c.element().getValue().apply(Pardo.of(new FormatLineFn()))
.apply(TextIO.Write.to(key));
c.output(1);
}
}