Manejo de mensajes incorrectos con la API de Streams de Kafka
Tengo un flujo de procesamiento de flujo básico que se parece a
master topic -> my processing in a mapper/filter -> output topics
y me pregunto cuál es la mejor manera de manejar "mensajes malos". Esto podría ser cosas como mensajes que no puedo deserializar correctamente, o tal vez la lógica de procesamiento / filtrado falla de alguna manera inesperada (no tengo dependencias externas, por lo que no debería haber errores transitorios de ese tipo).
Estaba considerando envolver todo mi código de procesamiento / filtrado en un intento de captura y, si se producía una excepción, enrutamiento a un "tema de error". Luego puedo estudiar el mensaje y modificarlo o corregir mi código según corresponda y luego reproducirlo en master. Si dejo que se propaguen las excepciones, la transmisión parece estar atascada y no se recogen más mensajes.
¿Este enfoque se considera la mejor práctica?¿Hay alguna manera conveniente de manejar Kafka para manejar esto? No creo que haya un concepto de DLQ ...¿Cuáles son las formas alternativas de evitar que Kafka se atasque en un "mal mensaje"?¿Qué enfoques alternativos de manejo de errores existen?Para completar aquí está mi código (pseudo-ish):
class Document {
// Fields
}
class AnalysedDocument {
Document document;
String rawValue;
Exception exception;
Analysis analysis;
// All being well
AnalysedDocument(Document document, Analysis analysis) {...}
// Analysis failed
AnalysedDocument(Document document, Exception exception) {...}
// Deserialisation failed
AnalysedDocument(String rawValue, Exception exception) {...}
}
KStreamBuilder builder = new KStreamBuilder();
KStream<String, AnalysedPolecatDocument> analysedDocumentStream = builder
.stream(Serdes.String(), Serdes.String(), "master")
.mapValues(new ValueMapper<String, AnalysedDocument>() {
@Override
public AnalysedDocument apply(String rawValue) {
Document document;
try {
// Deserialise
document = ...
} catch (Exception e) {
return new AnalysedDocument(rawValue, exception);
}
try {
// Perform analysis
Analysis analysis = ...
return new AnalysedDocument(document, analysis);
} catch (Exception e) {
return new AnalysedDocument(document, exception);
}
}
});
// Branch based on whether analysis mapping failed to produce errorStream and successStream
errorStream.to(Serdes.String(), customPojoSerde(), "error");
successStream.to(Serdes.String(), customPojoSerde(), "analysed");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Cualquier ayuda muy apreciada.