Спасибо! Прошу об обновлениях в GlobalKTable от agg_data_in? Согласно моим знаниям, обновления в GlobalKTable перезаписываются (если новые данные приходят и находят там свой ключ, они перезаписывают старые данные / значение).

спользовании KTable потоки Kafka не позволяют экземплярам читать из нескольких разделов определенной темы, когда количество экземпляров / потребителей равно числу разделов. Я пытался добиться этого с помощью GlobalKTable, проблема в том, что данные будут перезаписаны, а также агрегирование не может быть применено к нему.

Предположим, у меня есть тема с именем «data_in» с 3 разделами (P1, P2, P3). Когда я запускаю 3 экземпляра (I1, I2, I3) потокового приложения Kafka, я хочу, чтобы каждый экземпляр считывал данные со всех разделов «data_in». Я имею в виду, что I1 может читать с P1, P2 и P3, I2 может читать с P1, P2 и P3, I2 и так далее.

РЕДАКТИРОВАТЬ: Имейте в виду, что производитель может опубликовать два одинаковых идентификатора в двух разных разделах в «data_in». Поэтому при запуске двух разных экземпляров GlobalKtable будет перезаписан.

Пожалуйста, как этого добиться? Это часть моего кода

private KTable<String, theDataList> globalStream() {

    // KStream of records from data-in topic using String and theDataSerde deserializers
    KStream<String, Data> trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

    // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
    KGroupedStream<String, Data> KGS = trashStream.groupByKey();

    Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
    materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

    // Return a KTable
    return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
        if (!value.getValideData())
            aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
        else
            aggregate.getList().add(value);
        return aggregate;
    }, materialized);
}

Ответы на вопрос(1)

Ваш ответ на вопрос