¿Cómo agregar un StateStore personalizado al procesador DSL de Kafka Streams?

Para una de mis aplicaciones de transmisión de Kafka, necesito usar las funciones de DSL y la API del procesador. El flujo de mi aplicación de transmisión es

source -> selectKey -> filter -> aggregate (on a window) -> sink

Después de la agregación, necesito enviar un SOLO mensaje agregado al sumidero. Así que defino mi topología como a continuación

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(source_stream);
source.selectKey(new MyKeyValueMapper())
      .filterNot((k,v) -> k.equals("UnknownGroup"))
      .process(() -> new MyProcessor());

Defino una costumbreStateStore y registrarlo con mi procesador de la siguiente manera

public class MyProcessor implements Processor<String, String> {

    private ProcessorContext context = null;
    Serde<HashMapStore> invSerde = Serdes.serdeFrom(invJsonSerializer, invJsonDeserializer);


    KeyValueStore<String, HashMapStore> invStore = (KeyValueStore) Stores.create("invStore")
        .withKeys(Serdes.String())
        .withValues(invSerde)
        .persistent()
        .build()
        .get();

    public MyProcessor() {
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        this.context.register(invStore, false, null); // register the store
        this.context.schedule(10 * 60 * 1000L);
    }

    @Override
    public void process(String partitionKey, String message) {
        try {
            MessageModel smb = new MessageModel(message);
            HashMapStore oldStore = invStore.get(partitionKey);
            if (oldStore == null) {
                oldStore = new HashMapStore();
            }
            oldStore.addSmb(smb);
            invStore.put(partitionKey, oldStore);
        } catch (Exception e) {
           e.printStackTrace();
        }
    }

    @Override
    public void punctuate(long timestamp) {
       // processes all the messages in the state store and sends single aggregate message
    }


    @Override
    public void close() {
        invStore.close();
    }
}

Cuando ejecuto la aplicación, obtengojava.lang.NullPointerException

Excepción en el hilo "StreamThread-18" java.lang.NullPointerException en org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush (MeteredKeyValueStore.java:167) en org.apache.kafka.streams.processor.internals.ProcessorStateManager .flush (ProcessorStateManager.java:332) en org.apache.kafka.streams.processor.internals.StreamTask.commit (StreamTask.java:252) en org.apache.kafka.streams.processor.internals.StreamThread.commitOne (StreamThread .java: 446) en org.apache.kafka.streams.processor.internals.StreamThread.commitAll (StreamThread.java:434) en org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit (StreamThread.java:422 ) en org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:340) en org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:218)

¿Alguna idea de lo que está pasando aquí?

Respuestas a la pregunta(1)

Su respuesta a la pregunta