luxos @Kafka: leia TODAS as partições em todas as instâncias de um aplicativo
Ao usar o KTable, os fluxos Kafka não permitem que as instâncias leiam de várias partições de um tópico específico quando o número de instâncias / consumidores é igual ao número de partições. Tentei fazer isso usando o GlobalKTable, o problema é que os dados serão substituídos e a agregação não pode ser aplicada a el
Vamos supor que eu tenha um tópico chamado "data_in" com 3 partições (P1, P2, P3). Quando executo 3 instâncias (I1, I2, I3) de um aplicativo de streaming Kafka, desejo que cada instância leia dados de todas as partições de "data_in". Quero dizer que I1 pode ler de P1, P2 e P3, I2 pode ler de P1, P2 e P3, I2 e assim por diant
EDIT: Lembre-se de que o produtor pode publicar dois IDs semelhantes em duas partições diferentes em "data_in". Portanto, ao executar duas instâncias diferentes, o GlobalKtable será substituído.
Por favor, como conseguir isso? Esta é uma parte do meu código
private KTable<String, theDataList> globalStream() {
// KStream of records from data-in topic using String and theDataSerde deserializers
KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));
// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
KGroupedStream<String, Data> KGS = trashStream.groupByKey();
Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);
// Return a KTable
return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
if (!value.getValideData())
aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
else
aggregate.getList().add(value);
return aggregate;
}, materialized);
}