, Вы уже столкнетесь с проблемой при локальном построении графика выполнения конвейера (> 1 часа, задание еще не отправлено) и может превысить ограничение в 10 МБ для API потока данных. Я бы использовал этот пример для небольшого количества больших файлов и альтернатив, которые я привел в своем ответе для многих маленьких файлов. Если в вашем случае использования много больших файлов, я бы попытался преобразовать его в одну из других управляемых ситуаций.
ел, что на этот вопрос ответили ранее о переполнении стека (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), но не с тех пор, как Apache Beam добавил в Python функциональность splitable dofn. Как получить доступ к имени файла текущего файла, обрабатываемого при передаче шаблона файла в корзину gcs?
Я хочу передать имя файла в мою функцию преобразования:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText('gs://url to file')
data = (
lines
| 'Jsonify' >> beam.Map(jsonify)
| 'Unnest' >> beam.FlatMap(unnest)
| 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(
'project_id:dataset_id.table_name', schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
В конечном итоге, я хочу передать имя файла в мою функцию преобразования, когда я преобразую каждую строку json (см.это а затем используйте имя файла для поиска в другой таблице BQ, чтобы получить значение). Я думаю, что, как только мне удастся узнать, как получить имя файла, я смогу определить боковую часть ввода, чтобы выполнить поиск в таблице bq и получить уникальное значение.