¿Se puede procesar la entrada del almacén de datos en la tubería de flujo de datos de Google en un lote de N entradas a la vez?

Estoy tratando de ejecutar un trabajo de canalización de flujo de datos que ejecutaría una función enN entradas a la vez del almacén de datos. En mi caso, esta función está enviando un lote de 100 entradas a algún servicio REST como carga útil. Esto significa que quiero revisar todas las entradas de una entidad de almacén de datos y enviar100 entradas por lotes a la vez a algunos fuera del servicio REST.

Mi solución actual

Leer la entrada del almacén de datosCree tantas claves como haya trabajadores especificados en las opciones de canalización (1 trabajador = 1 clave).Agrupe por clave, de modo que obtengamos el iterador como salida (entrada del iterador en el paso 4)Programe por lotes a los usuarios en la lista temporal y envíelos como un lote al punto final REST.

Escenario descrito anteriormente en pseudocódigo (ignorando detalles):

final int BATCH_SIZE = 100;

// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))

    // 2. create keys to be used in group by so we get iterator in next task
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            String key = generateKey(c);
            EntryPOJO entry = processEntity(c);
            c.output(KV.of(key, entry));
        }
    }))

    // 3. Group by key
    .apply(GroupByKey.create())

    // 4. Programatically batch users
    .apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
        @Override
        public void processElement(ProcessContext c) throws Exception {
            List<EntryPOJO> batchedEntries = new ArrayList<>();
            for (EntryPOJO entry : c.element().getValue()) {
                if (batchedEntries.size() >= BATCH_SIZE) {
                    sendToRESTEndpoint(batchedEntries);
                    batchedEntries = new ArrayList<>();
                }
                batchedEntries.add(entry);
            }
            sendToRESTEndpoint(batchedEntries);
        }
    }));

Problema principal con mi solución actual

GroupByKey bloquea la ejecución del último ParDo (bloquea el paso 4) hasta que todas las entradas se asignen a una clave.

La solución generalmente funciona, pero me gustaría hacer todo en paralelo (envíe un lote de 100 entradas al punto final REST inmediatamente después de que se carguen desde el almacén de datos), lo que no es posible con mi solución actual, ya que GroupByKey no genera ningún dato hasta que cada entrada de la base de datos se obtiene e inserta en un par clave-valor. Entonces, la ejecución es en realidad en 2 pasos: 1. Obtener todos los datos del almacén de datos y asignarle una clave, 2. Procesar las entradas como lotes

Pregunta

Entonces, lo que me gustaría saber es si hay alguna característica existente para poder hacer esto. O al menos para obtener Iterable sin el paso GroupByKey para que la tarea de la función de procesamiento por lotes no necesite esperar a que se descarguen los datos.

Respuestas a la pregunta(1)

Su respuesta a la pregunta