Можно ли обрабатывать входные данные хранилища данных в конвейере потока данных Google в пакете из N записей одновременно?
Я пытаюсь выполнить работу конвейера потока данных, которая будет выполнять одну функцию наN записей одновременно из хранилища данных. В моем случае эта функция отправляет пакет из 100 записей в какой-либо сервис REST в качестве полезной нагрузки. Это означает, что я хочу просмотреть все записи из одного объекта хранилища данных и отправить100 пакетных записей одновременно к какой-то внешней службе REST.
Мое текущее решение
Читать входные данные из хранилища данныхСоздайте столько ключей, сколько рабочих указано в параметрах конвейера (1 рабочий = 1 ключ).Группировка по ключу, так что мы получаем итератор в качестве вывода (ввод итератора в шаге № 4)Программно пакетируйте пользователей во временный список и отправляйте их как пакет на конечную точку REST.Вышеописанный сценарий в псевдокоде (игнорируя детали):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// 4. Programatically batch users
.apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
List<EntryPOJO> batchedEntries = new ArrayList<>();
for (EntryPOJO entry : c.element().getValue()) {
if (batchedEntries.size() >= BATCH_SIZE) {
sendToRESTEndpoint(batchedEntries);
batchedEntries = new ArrayList<>();
}
batchedEntries.add(entry);
}
sendToRESTEndpoint(batchedEntries);
}
}));
Основная проблема с моим текущим решением
GroupByKey блокирует выполнение последнего ParDo (блокирует шаг № 4) до тех пор, пока все записи не будут назначены ключу.
Решение вообще работает, но хотелось бы все делать параллельно (отправить пакет из 100 записей в конечную точку REST сразу после их загрузки из хранилища данных), что невозможно в моем текущем решении, поскольку GroupByKey не выводит никаких данных, пока каждая запись из базы данных не будет извлечена и вставлена в пару ключ-значение. Таким образом, выполнение на самом деле состоит из 2 шагов: 1. Извлеките все данные из хранилища данных и назначьте ему ключ, 2. Обработайте записи как пакетные
Вопрос
Так что я хотел бы знать, есть ли какая-то существующая функция, чтобы быть в состоянии сделать это. Или, по крайней мере, получить Iterable без шага GroupByKey, чтобы задача пакетной функции не ожидала сброса данных.