Lidando com mensagens ruins usando a API do Kafka Streams

Eu tenho um fluxo básico de processamento de stream que parece

master topic -> my processing in a mapper/filter -> output topics

e eu estou pensando sobre a melhor maneira de lidar com "mensagens ruins". Isso pode ser algo como mensagens que não consigo desserializar corretamente, ou talvez a lógica de processamento / filtragem falhe de alguma maneira inesperada (não tenho dependências externas, portanto não deve haver erros transitórios desse tipo).

Eu estava pensando em agrupar todo o meu código de processamento / filtragem em uma captura de tentativa e, se uma exceção foi levantada, o roteamento para um "tópico de erro" Depois, posso estudar a mensagem e modificá-la ou corrigir meu código, conforme apropriado, e depois reproduzi-la no mestre. Se eu permitir que algumas exceções se propaguem, o fluxo parece ficar congestionado e nenhuma outra mensagem é captada.

Essa abordagem é considerada uma boa prática?Existe uma maneira conveniente de fluxos Kafka para lidar com isso? Não acho que exista um conceito de DLQ ...Quais são as formas alternativas de impedir Kafka de tocar em uma "mensagem ruim"?Que abordagens alternativas de tratamento de erros existem?

Para completar, aqui está o meu código (pseudo-ish):

class Document {
    // Fields
}

class AnalysedDocument {

    Document document;
    String rawValue;
    Exception exception;
    Analysis analysis;

    // All being well
    AnalysedDocument(Document document, Analysis analysis) {...}

    // Analysis failed
    AnalysedDocument(Document document, Exception exception) {...}

    // Deserialisation failed
    AnalysedDocument(String rawValue, Exception exception) {...}
}

KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
    .stream(Serdes.String(), Serdes.String(), "master")
    .mapValues(new ValueMapper<String, AnalysedDocument>() {
         @Override
         public AnalysedDocument apply(String rawValue) {
             Document document;
             try {
                 // Deserialise
                 document = ...
             } catch (Exception e) {
                 return new AnalysedDocument(rawValue, exception);
             }
             try {
                 // Perform analysis
                 Analysis analysis = ...
                 return new AnalysedDocument(document, analysis);
             } catch (Exception e) {
                 return new AnalysedDocument(document, exception);
             }
         }
    });

// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

Qualquer ajuda muito apreciada.

questionAnswers(2)

yourAnswerToTheQuestion