Можно ли обрабатывать входные данные хранилища данных в конвейере потока данных Google в пакете из N записей одновременно?

Я пытаюсь выполнить работу конвейера потока данных, которая будет выполнять одну функцию наN записей одновременно из хранилища данных. В моем случае эта функция отправляет пакет из 100 записей в какой-либо сервис REST в качестве полезной нагрузки. Это означает, что я хочу просмотреть все записи из одного объекта хранилища данных и отправить100 пакетных записей одновременно к какой-то внешней службе REST.

Мое текущее решение

Читать входные данные из хранилища данныхСоздайте столько ключей, сколько рабочих указано в параметрах конвейера (1 рабочий = 1 ключ).Группировка по ключу, так что мы получаем итератор в качестве вывода (ввод итератора в шаге № 4)Программно пакетируйте пользователей во временный список и отправляйте их как пакет на конечную точку REST.

Вышеописанный сценарий в псевдокоде (игнорируя детали):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // 4. Programatically batch users
    .apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            List<EntryPOJO> batchedEntries = new ArrayList<>();
            for (EntryPOJO entry : c.element().getValue()) {
                if (batchedEntries.size() >= BATCH_SIZE) {
                    sendToRESTEndpoint(batchedEntries);
                    batchedEntries = new ArrayList<>();
                }
                batchedEntries.add(entry);
            }
            sendToRESTEndpoint(batchedEntries);
        }
    }));

Основная проблема с моим текущим решением

GroupByKey блокирует выполнение последнего ParDo (блокирует шаг № 4) до тех пор, пока все записи не будут назначены ключу.

Решение вообще работает, но хотелось бы все делать параллельно (отправить пакет из 100 записей в конечную точку REST сразу после их загрузки из хранилища данных), что невозможно в моем текущем решении, поскольку GroupByKey не выводит никаких данных, пока каждая запись из базы данных не будет извлечена и вставлена ​​в пару ключ-значение. Таким образом, выполнение на самом деле состоит из 2 шагов: 1. Извлеките все данные из хранилища данных и назначьте ему ключ, 2. Обработайте записи как пакетные

Вопрос

Так что я хотел бы знать, есть ли какая-то существующая функция, чтобы быть в состоянии сделать это. Или, по крайней мере, получить Iterable без шага GroupByKey, чтобы задача пакетной функции не ожидала сброса данных.

Ответы на вопрос(1)

Ваш ответ на вопрос