Я приветствовал бы обновление подхода к динамическим именам таблиц для Python BigQuerySink. Это мешает мне создавать поэтапные шаблоны, так как имя таблицы не оценивается во время выполнения.

льку у меня работает работающий статический поток данных, я хотел бы создать шаблон из этого, чтобы позволить мне легко повторно использовать поток данных без какой-либо командной строки.

ПослеСоздание шаблонов Учебник от чиновника не дает образец для временного вывода.

Мой поток данных заканчивается приемником BigQuery, который принимает несколько аргументов, например, целевую таблицу для хранения. Именно этот параметр я хотел бы сделать доступным в своем шаблоне, позволяя мне выбрать целевое хранилище после запуска потока.

Но я не могу заставить это работать. Ниже я вставляю некоторые фрагменты кода, которые могут помочь объяснить точную проблему, с которой я столкнулся.

class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--input',
            default='gs://my-source-bucket/file.json')
        parser.add_value_provider_argument(
            '--table',
            default='my-project-id:some-dataset.some-table')

pipeline_options = PipelineOptions()

pipe = beam.Pipeline(options=pipeline_options)

custom_options = pipeline_options.view_as(CustomOptions)

(...)

# store
processed_pipe | beam.io.Write(BigQuerySink(
    table=custom_options.table.get(),
    schema='a_column:STRING,b_column:STRING,etc_column:STRING',
    create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=BigQueryDisposition.WRITE_APPEND
))

При создании шаблона я не дал никаких параметров с ним. Через долю секунды я получаю следующее сообщение об ошибке:

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: table, type: str, default_value: 'my-project-id:some-dataset.some-table').get() not called from a runtime context

Когда я добавляю--table параметр при создании шаблона, шаблон создается, но--table значение параметра затем жестко кодируется в шаблоне и не переопределяется каким-либо заданным значением шаблона дляtable потом.

Я получаю ту же ошибку при заменеtable=custom_options.table.get(), с участиемtable=StaticValueProvider(str, custom_options.table.get()).

Есть ли кто-то, кто уже создал временный поток данных с настраиваемыми параметрами BigQuerySink? Я хотел бы получить некоторые намеки на это.

 Pablo06 нояб. 2017 г., 22:59
Я дам вам ответ через некоторое время.

Ответы на вопрос(1)

Решение Вопроса

ля операций ввода-вывода FileBasedSource. Вы можете увидеть это, нажав на вкладку Python по ссылке, которую вы упомянули:https://cloud.google.com/dataflow/docs/templates/creating-templates

в разделе «Трубопроводный ввод-вывод и параметры времени выполнения».

В отличие от того, что происходит в Java, BigQuery в Python не использует пользовательский источник. Другими словами, он не полностью реализован в SDK, но также содержит части в бэкэнде (и поэтому является «собственным источником»). Только пользовательские источники могут использовать шаблоны. Есть планы добавить BigQuery в качестве пользовательского источника: questions.apache.org/jira/browse/BEAM-1440

 Ian Lewis13 мар. 2019 г., 15:35
Я приветствовал бы обновление подхода к динамическим именам таблиц для Python BigQuerySink. Это мешает мне создавать поэтапные шаблоны, так как имя таблицы не оценивается во время выполнения.
 jmoore25510 сент. 2018 г., 22:45
Любое обновление по этому поводу? Мне нужна эта функция для проекта, над которым я работаю ...

Ваш ответ на вопрос