Kafka streams: Leer desde TODAS las particiones en cada instancia de una aplicación

Cuando se usa KTable, las transmisiones de Kafka no permiten que las instancias lean desde múltiples particiones de un tema en particular cuando la cantidad de instancias / consumidores es igual a la cantidad de particiones. Intenté lograr esto usando GlobalKTable, el problema con esto es que los datos se sobrescribirán, tampoco se puede aplicar la agregación.

Supongamos que tengo un tema llamado "data_in" con 3 particiones (P1, P2, P3). Cuando ejecuto 3 instancias (I1, I2, I3) de una aplicación de transmisión Kafka, quiero que cada instancia lea datos de todas las particiones de "data_in". Quiero decir que I1 puede leer de P1, P2 y P3, I2 puede leer de P1, P2 y P3, I2 y así sucesivamente.

EDIT: tenga en cuenta que el productor puede publicar dos ID similares en dos particiones diferentes en "data_in". Entonces, cuando se ejecutan dos instancias diferentes, se sobrescribirá GlobalKtable.

Por favor, ¿cómo lograr esto? Esta es una parte de mi código

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

Respuestas a la pregunta(1)

Su respuesta a la pregunta