A modelagem de fluxo de dados suporta a entrada de modelo para opções de coletor do BigQuery?

Como tenho um Dataflow estático em execução, gostaria de criar um modelo a partir deste, para que eu possa reutilizar facilmente o Dataflow sem nenhuma digitação na linha de comando.

Seguindo oCriando modelos O tutorial do funcionário não fornece uma amostra para saída modelável.

Meu fluxo de dados termina com um coletor do BigQuery, que usa alguns argumentos como a tabela de destino para armazenamento. Esse parâmetro exato é o que eu gostaria de disponibilizar no meu modelo, permitindo-me escolher o armazenamento de destino depois de executar o fluxo.

Mas não consigo fazer isso funcionar. Abaixo colo alguns trechos de código que podem ajudar a explicar o problema exato que tenho.

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            default='gs://my-source-bucket/file.json')
        parser.add_value_provider_argument(
            '--table',
            default='my-project-id:some-dataset.some-table')

pipeline_options = PipelineOptions()

pipe = beam.Pipeline(options=pipeline_options)

custom_options = pipeline_options.view_as(CustomOptions)

(...)

# store
processed_pipe | beam.io.Write(BigQuerySink(
    table=custom_options.table.get(),
    schema='a_column:STRING,b_column:STRING,etc_column:STRING',
    create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=BigQueryDisposition.WRITE_APPEND
))

Ao criar o modelo, não forneci nenhum parâmetro. Em uma fração de segundo, recebo a seguinte mensagem de erro:

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context

Quando adiciono um--table parâmetro na criação do modelo, o modelo está sendo criado, mas o--table O valor do parâmetro é codificado no modelo e não é substituído por nenhum valor de modelo especificado paratable mais tarde.

Eu recebo o mesmo erro quando substituí otable=custom_options.table.get(), comtable=StaticValueProvider(str, custom_options.table.get()).

Existe alguém que já criou um Dataflow modelável com parâmetros personalizáveis do BigQuerySink? Eu adoraria ter algumas dicas sobre isso.

questionAnswers(1)

yourAnswerToTheQuestion