luxos @Kafka: leia TODAS as partições em todas as instâncias de um aplicativo

Ao usar o KTable, os fluxos Kafka não permitem que as instâncias leiam de várias partições de um tópico específico quando o número de instâncias / consumidores é igual ao número de partições. Tentei fazer isso usando o GlobalKTable, o problema é que os dados serão substituídos e a agregação não pode ser aplicada a el

Vamos supor que eu tenha um tópico chamado "data_in" com 3 partições (P1, P2, P3). Quando executo 3 instâncias (I1, I2, I3) de um aplicativo de streaming Kafka, desejo que cada instância leia dados de todas as partições de "data_in". Quero dizer que I1 pode ler de P1, P2 e P3, I2 pode ler de P1, P2 e P3, I2 e assim por diant

EDIT: Lembre-se de que o produtor pode publicar dois IDs semelhantes em duas partições diferentes em "data_in". Portanto, ao executar duas instâncias diferentes, o GlobalKtable será substituído.

Por favor, como conseguir isso? Esta é uma parte do meu código

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

questionAnswers(1)

yourAnswerToTheQuestion