¿Los mensajes de piedra sepulcral no eliminan el registro de la tienda de estado de KTable?

Estoy creando datos de procesamiento de KTable desde KStream. Pero cuando disparo mensajes de lápida con clave y carga útil nula, no se elimina el mensaje de KTable.

sample -

public KStream<String, GenericRecord> processRecord(@Input(Channel.TEST) KStream<GenericRecord, GenericRecord> testStream,
KTable<String, GenericRecord> table = testStream
                .map((genericRecord, genericRecord2) -> KeyValue.pair(genericRecord.get("field1") + "", genericRecord2))
                .groupByKey()
                reduce((genericRecord, v1) -> v1, Materialized.as("test-store"));


GenericRecord genericRecord = new GenericData.Record(getAvroSchema(keySchema));
genericRecord.put("field1", Long.parseLong(test.getField1()));
ProducerRecord record = new ProducerRecord(Channel.TEST, genericRecord, null);
kafkaTemplate.send(record);

Al activar un mensaje con valor nulo, puedo depurar en la función de mapa testStream con carga útil nula, pero no elimina el registro en el registro de cambios de KTable "test-store". Parece que ni siquiera alcanza el método de reducción, no estoy seguro de lo que me estoy perdiendo aquí.

¡Agradezco cualquier ayuda en esto!

Gracias

Respuestas a la pregunta(1)

Su respuesta a la pregunta