A modelagem de fluxo de dados suporta a entrada de modelo para opções de coletor do BigQuery?
Como tenho um Dataflow estático em execução, gostaria de criar um modelo a partir deste, para que eu possa reutilizar facilmente o Dataflow sem nenhuma digitação na linha de comando.
Seguindo oCriando modelos O tutorial do funcionário não fornece uma amostra para saída modelável.
Meu fluxo de dados termina com um coletor do BigQuery, que usa alguns argumentos como a tabela de destino para armazenamento. Esse parâmetro exato é o que eu gostaria de disponibilizar no meu modelo, permitindo-me escolher o armazenamento de destino depois de executar o fluxo.
Mas não consigo fazer isso funcionar. Abaixo colo alguns trechos de código que podem ajudar a explicar o problema exato que tenho.
class CustomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my-source-bucket/file.json')
parser.add_value_provider_argument(
'--table',
default='my-project-id:some-dataset.some-table')
pipeline_options = PipelineOptions()
pipe = beam.Pipeline(options=pipeline_options)
custom_options = pipeline_options.view_as(CustomOptions)
(...)
# store
processed_pipe | beam.io.Write(BigQuerySink(
table=custom_options.table.get(),
schema='a_column:STRING,b_column:STRING,etc_column:STRING',
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND
))
Ao criar o modelo, não forneci nenhum parâmetro. Em uma fração de segundo, recebo a seguinte mensagem de erro:
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context
Quando adiciono um--table
parâmetro na criação do modelo, o modelo está sendo criado, mas o--table
O valor do parâmetro é codificado no modelo e não é substituído por nenhum valor de modelo especificado paratable
mais tarde.
Eu recebo o mesmo erro quando substituí otable=custom_options.table.get(),
comtable=StaticValueProvider(str, custom_options.table.get())
.
Existe alguém que já criou um Dataflow modelável com parâmetros personalizáveis do BigQuerySink? Eu adoraria ter algumas dicas sobre isso.