Obteniendo el último mensaje enviado a un tema kafka
Soy nuevo en Kafka y estoy trabajando en un prototipo para conectar un servicio de transmisión patentado a Kafka.
Estoy buscando obtener la clave del último mensaje enviado sobre un tema, ya que nuestro consumidor interno de transmisiones debe iniciar sesión con la ID del último mensaje que recibió al conectarse.
¿Es posible usar KafkaProducer o KafkaConsumer para hacer esto?
Intenté hacer lo siguiente usando un consumidor, pero cuando también ejecuto el consumidor de la consola, veo mensajes reproducidos.
// Poll so we know we're connected
consumer.poll(100);
// Get the assigned partitions
Set<TopicPartition> assignedPartitions = consumer.assignment();
// Seek to the end of those partitions
consumer.seekToEnd(assignedPartitions);
for(TopicPartition partition : assignedPartitions) {
final long offset = consumer.committed(partition).offset();
// Seek to the previous message
consumer.seek(partition,offset - 1);
}
// Now get the last message
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
lastKey = record.key();
}
consumer.close();
¿Es este comportamiento esperado o estoy en el camino equivocado?