Извините - не уверен, куда это поставить - я не парень из потоков; Я просто взломал ваши материалы, чтобы исключить встроенного брокера (так как это в вашем названии вопроса). Рад попробовать для вас, если вы можете быть более откровенным с тем, что вы хотите, чтобы я сделал. Но это будет завтра; поздно.

хожу в поисках знания тайного.

Во-первых, у меня есть две пары тем, по одной теме в каждой паре, которая входит в другую тему. Два KTables формируются последними темами, которые используются в KTable + KTable leftJoin. Проблема в том, что leftJoin создает ТРИ записи, когда я записываю одну запись в KTable. Я ожидал бы две записи в форме (A-ноль, A-B), но вместо этого я получаю (A-ноль, A-B, A-ноль). Я подтвердил, что KTables получают по одной записи каждый.

Я возился с CACHE_MAX_BYTES_BUFFERING_CONFIG, чтобы включить / отключить кэширование в хранилище состояний. Поведение выше с CACHE_MAX_BYTES_BUFFERING_CONFIG, установленным в 0. Когда я использую значение по умолчанию для CACHE_MAX_BYTES_BUFFERING_CONFIG, я вижу следующие записи, выводимые из объединения: (A-B, A-B, A-null)

Вот конфигурации для потоков, потребителей, производителей:

properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapUrls);
properties.put(StreamsConfig.STATE_DIR_CONFIG, String.format("/tmp/kafka-streams/%s/%s",
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); // fiddled with
properties.put(StreamsConfig.CLIENT_ID_CONFIG, appName);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
properties.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, appName);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.cla
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

Код API процессора (санированный), который испытывает такое поведение, приведен ниже, обратите внимание на сопряженные темы [A1, A2] и [B1, B2]:

    KTable<Long, Value> kTableA =
        kstreamBuilder.table(longSerde, valueSerde, topicA2);

    kstreamBuilder.stream(keySerde, envelopeSerde, topicA1)
        .to(longSerde, valueSerde, topicA2);

    kstreamBuilder.stream(keySerde, envelopeSerde, topicB1)
        .to(longSerde, valueSerde, topicB2.topicName);

    KTable<Long, Value> kTableB =
        kstreamBuilder.table(longSerde, valueSerde, topicB2.topicName);

    KTable<Long, Result> joinTable = kTableA.leftJoin(kTableB, (a,b) -> {
        // value joiner called three times with only a single record input
        // into topicA1 and topicB1
    });

    joinTable.groupBy(...)
    .aggregate(...)
    .to(longSerde, aggregateSerde, outputTopic);

Заранее спасибо за любую помощь, о доброжелательные.

Обновить: Я работал с одним сервером kafka и 1 разделом на тему и испытал такое поведение. Когда я увеличил количество серверов до 2 и количество разделов до 3, мой вывод становится (A-null).

Мне кажется, мне нужно потратить еще немного времени с руководством Кафки ...

Ответы на вопрос(0)

Ваш ответ на вопрос