Manejo de mensajes incorrectos con la API de Streams de Kafka

Tengo un flujo de procesamiento de flujo básico que se parece a

master topic -> my processing in a mapper/filter -> output topics

y me pregunto cuál es la mejor manera de manejar "mensajes malos". Esto podría ser cosas como mensajes que no puedo deserializar correctamente, o tal vez la lógica de procesamiento / filtrado falla de alguna manera inesperada (no tengo dependencias externas, por lo que no debería haber errores transitorios de ese tipo).

Estaba considerando envolver todo mi código de procesamiento / filtrado en un intento de captura y, si se producía una excepción, enrutamiento a un "tema de error". Luego puedo estudiar el mensaje y modificarlo o corregir mi código según corresponda y luego reproducirlo en master. Si dejo que se propaguen las excepciones, la transmisión parece estar atascada y no se recogen más mensajes.

¿Este enfoque se considera la mejor práctica?¿Hay alguna manera conveniente de manejar Kafka para manejar esto? No creo que haya un concepto de DLQ ...¿Cuáles son las formas alternativas de evitar que Kafka se atasque en un "mal mensaje"?¿Qué enfoques alternativos de manejo de errores existen?

Para completar aquí está mi código (pseudo-ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() {
         @Override
         public AnalysedDocument apply(String rawValue) {
             Document document;
             try {
                 // Deserialise
                 document = ...
             } catch (Exception e) {
                 return new AnalysedDocument(rawValue, exception);
             }
             try {
                 // Perform analysis
                 Analysis analysis = ...
                 return new AnalysedDocument(document, analysis);
             } catch (Exception e) {
                 return new AnalysedDocument(document, exception);
             }
         }
    });

// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

Cualquier ayuda muy apreciada.

Respuestas a la pregunta(2)

Su respuesta a la pregunta