Я приветствовал бы обновление подхода к динамическим именам таблиц для Python BigQuerySink. Это мешает мне создавать поэтапные шаблоны, так как имя таблицы не оценивается во время выполнения.

льку у меня работает работающий статический поток данных, я хотел бы создать шаблон из этого, чтобы позволить мне легко повторно использовать поток данных без какой-либо командной строки.

ПослеСоздание шаблонов Учебник от чиновника не дает образец для временного вывода.

Мой поток данных заканчивается приемником BigQuery, который принимает несколько аргументов, например, целевую таблицу для хранения. Именно этот параметр я хотел бы сделать доступным в своем шаблоне, позволяя мне выбрать целевое хранилище после запуска потока.

Но я не могу заставить это работать. Ниже я вставляю некоторые фрагменты кода, которые могут помочь объяснить точную проблему, с которой я столкнулся.

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            default='gs://my-source-bucket/file.json')
        parser.add_value_provider_argument(
            '--table',
            default='my-project-id:some-dataset.some-table')

pipeline_options = PipelineOptions()

pipe = beam.Pipeline(options=pipeline_options)

custom_options = pipeline_options.view_as(CustomOptions)

(...)

# store
processed_pipe | beam.io.Write(BigQuerySink(
    table=custom_options.table.get(),
    schema='a_column:STRING,b_column:STRING,etc_column:STRING',
    create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=BigQueryDisposition.WRITE_APPEND
))

При создании шаблона я не дал никаких параметров с ним. Через долю секунды я получаю следующее сообщение об ошибке:

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context

Когда я добавляю--table параметр при создании шаблона, шаблон создается, но--table значение параметра затем жестко кодируется в шаблоне и не переопределяется каким-либо заданным значением шаблона дляtable потом.

Я получаю ту же ошибку при заменеtable=custom_options.table.get(), с участиемtable=StaticValueProvider(str, custom_options.table.get()).

Есть ли кто-то, кто уже создал временный поток данных с настраиваемыми параметрами BigQuerySink? Я хотел бы получить некоторые намеки на это.

Ответы на вопрос(1)

Ваш ответ на вопрос