Вариант 3: обработка в пакетном режиме, запускаемая из внешнего источника. Этот подход вводит задержку по сравнению с вариантами 1 и 2, поскольку конвейер должен запуститься до начала обработки. Здесь вы можете запустить событие из вашей исходной файловой системы, чтобы запланировать или немедленно запустить процесс потока данных. Этот вариант лучше всего подходит для низкочастотных обновлений файлов большого размера.

я есть каталог в GCS или другой поддерживаемой файловой системе, в которую новые файлы записываются внешним процессом.

Я хотел бы написать потоковый конвейер Apache Beam, который постоянно следит за этим каталогом на предмет новых файлов, читает и обрабатывает каждый новый файл по мере его поступления. Это возможно?

Ответы на вопрос(2)

начиная с Apache Beam 2.2.0. Несколько API поддерживают этот вариант использования:

Если вы используетеTextIO или жеAvroIO, они поддерживают это явно черезTextIO.read().watchForNewFiles() и то же самое наreadAll(), например:

PCollection<String> lines = p.apply(TextIO.read()
    .from("gs://path/to/files/*")
    .watchForNewFiles(
        // Check for new files every 30 seconds
        Duration.standardSeconds(30),
        // Never stop checking for new files
        Watch.Growth.<String>never()));

Если вы используете другой формат файла, вы можете использоватьFileIO.match().continuously() а такжеFileIO.matchAll().continuously() которые поддерживают один и тот же API, в сочетании сFileIO.readMatches().

API-интерфейсы поддерживают указание того, как часто проверять наличие новых файлов и когда прекращать проверку (например, поддерживаются следующие условия: «если в течение определенного времени не появляется никаких новых выходных данных», «после наблюдения N выходных данных», «по истечении заданного времени с момента начала проверки»). и их комбинации).

Обратите внимание, что в настоящее время эта функция в настоящее время работает только в Direct Runner и Dataflow Runner, и только в Java SDK. В общем, это будет работать в любом бегунке, который поддерживаетСплитбольный DoFn (видетьматрица возможностей).

 jkff04 янв. 2018 г., 03:40
Вы можете попытаться устранить дублирование между несвязанными конвейерами, отфильтровав возвращенные файлы с помощью ParDo. Однако если вы хотите что-то вроде «исключить файлы, которые уже были полностью обработаны предыдущим экземпляром конвейера», вам необходимо определить, что означает «полностью обработанный», и использовать какое-то внешнее хранилище, например, Облако Bigtable, и пусть ваш конвейер явно пишет в него, когда что-то «полностью обрабатывается» и читает из него, прежде чем принять решение об их обработке.
 jkff04 янв. 2018 г., 03:36
Пока ваш конвейер работает, этот метод будет возвращать каждый файл (существующие и входящие новые) ровно один раз - в этом смысле он устраняет уведомления GCS и pubsub. Если вы отмените свой конвейер и запустите новый экземпляр того же конвейера, он снова увидит старые файлы - невозможно перенести состояние между двумя несвязанными конвейерами. Однако я считаю, что если вы используете функцию обновления конвейера, она должна работать правильно и не иметь дублирования.
 user17915604 янв. 2018 г., 03:28
Также мне любопытно использовать эту функцию, как мы можем обработать состояние, т.е. если мой потоковый конвейер не работает, или я развернул или переключился на новый, как я могу сказать, какие файлы были обработаны. У меня есть корзина gcs, в которой уже обработаны данные за последние несколько месяцев, но при развертывании нового конвейера после удаления старого, как эта функция может помочь нам узнать, что было обработано, и избежать чтения всех прошлых файлов, соответствующих шаблону?
 user17915604 янв. 2018 г., 02:38
Насколько надежен этот метод часов? Можно ли настроить его так, чтобы файлы обрабатывались ровно один раз? Нужны ли нам gcs-уведомления или gcs pubsub-уведомления (как минимум один раз и нет SLA о том, насколько задерживается этот сигнал)? Устраняет ли эта функция необходимость в pubsub и системе уведомлений?
 user17915604 янв. 2018 г., 04:04
Интересно, не могли бы вы пролить некоторый свет на то, как работает функция обновления, я думаю, что она внутренне пытается сохранить состояние, а затем возобновить, мы бы хотели создать что-то похожее на проблемы поддержки, например, если существующий потоковый конвейер не работает из-за какой-либо проблемы (квота, исключение usercodeexception). , сбои gcs). Беда в том, что у нас есть миллионы файлов, поэтому чтение их из BT для фильтрации также займет довольно много времени, когда readall, вероятно, перечислит миллионы файлов.

а также к опциям watchfornewfiles, есть пара других вариантов;

Существует несколько вариантов решения этого требования в зависимости от ваших требований к задержке. Начиная с SDK 2.9.0:

Вариант 1: режим непрерывного чтения:

Java: FileIO, TextIO и несколько других источников ввода-вывода поддерживают непрерывное чтение источника новых файлов.

Класс FileIO поддерживает возможность непрерывного просмотра шаблона одного файла. Этот пример сопоставляет один шаблон файла каждые 30 секунд, непрерывно возвращает новые совпадающие файлы как неограниченную коллекцию PC и останавливается, если новые файлы не появляются в течение 1 часа.

 PCollection<Metadata> matches = p.apply(FileIO.match()
     .filepattern("...")
     .continuously(
       Duration.standardSeconds(30), afterTimeSinceNewOutput(Duration.standardHours(1))));

Класс TextIO поддерживает потоковое сопоставление нового файла с помощью свойства watchForNewFiles.

PCollection<String> lines = p.apply(TextIO.read()
     .from("/local/path/to/files/*")
     .watchForNewFiles(
       // Check for new files every minute
       Duration.standardMinutes(1),
       // Stop watching the filepattern if no new files appear within an hour
       afterTimeSinceNewOutput(Duration.standardHours(1))));

Важно отметить, что список файлов не сохраняется при перезапуске конвейера. Чтобы справиться с этим сценарием, вы можете перемещать файлы либо через процесс ниже по потоку, либо как часть самого конвейера. Другой вариант - сохранить обработанные имена файлов во внешнем файле и сбросить списки при следующем преобразовании.

Python: опция непрерывно недоступна с SDK 2.9.0 для python.

Вариант 2. Потоковая обработка, инициированная из внешнего источника. У вас может быть конвейер Beam, работающий в потоковом режиме, который имеет неограниченный источник, например PubSub. Когда поступают новые файлы, вы можете использовать внешний процесс Beam, чтобы определить прибытие файла, а затем отправить сообщение PubSub с URI в качестве полезной нагрузки в файл. В DoFn, которому предшествует источник PubSub, вы можете использовать этот URI для обработки файла.

Java: используйте неограниченный источник ввода-вывода (PubSubIO, KafakIO и т. Д.)

Python: используйте неограниченный источник ввода-вывода (PubSubIO и т. Д.)

Вариант 3: обработка в пакетном режиме, запускаемая из внешнего источника. Этот подход вводит задержку по сравнению с вариантами 1 и 2, поскольку конвейер должен запуститься до начала обработки. Здесь вы можете запустить событие из вашей исходной файловой системы, чтобы запланировать или немедленно запустить процесс потока данных. Этот вариант лучше всего подходит для низкочастотных обновлений файлов большого размера.

Ваш ответ на вопрос