Mensagens de marca de exclusão não removendo registro do armazenamento de estado do KTable?

Estou criando os dados de processamento do KTable a partir do KStream. Mas quando eu aciono uma lápide com chave e carga nula, ele não está removendo a mensagem do KTable.

amostra -

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);

Ao acionar uma mensagem com valor nulo, posso depurar na função de mapa testStream com carga nula, mas ela não remove o registro no log de alterações "armazenamento de teste" do KTable. Parece que nem sequer chega ao método de redução, não tenho certeza do que estou perdendo aqui.

Agradeço qualquer ajuda nisto!

Obrigado.

questionAnswers(1)

yourAnswerToTheQuestion