Kafka - Implementación de cola retrasada usando consumidores de alto nivel

Desea implementar un consumidor retrasado utilizando la API de consumidor de alto nivel

idea principal:

producir mensajes por clave (cada mensaje contiene la marca de tiempo de creación) esto asegura que cada partición haya ordenado mensajes por tiempo producido.auto.commit.enable = false (se confirmará explícitamente después de cada proceso de mensaje)consumir un mensajeverifique la marca de tiempo del mensaje y verifique si ha pasado suficiente tiempomensaje de proceso (esta operación nunca fallará)

cometer 1 desplazamiento

while (it.hasNext()) {
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg)) { 
     waitSomeTime() //Thread.sleep or something....
  }
  //certain that the msg was delayed and can now be handled
  Try { process(msg) } //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg
}

Algunas preocupaciones sobre esta implementación:

cometer cada desplazamiento podría ralentizar ZK¿Puede consumer.commitOffsets lanzar una excepción? en caso afirmativo, consumiré el mismo mensaje dos veces (se puede resolver con mensajes idempotentes)problema para esperar mucho tiempo sin comprometer el desplazamiento, por ejemplo, el período de retraso es de 24 horas, será el siguiente desde el iterador, dormirá durante 24 horas, procesará y confirmará (¿Tiempo de espera de sesión ZK?)¿Cómo puede la sesión ZK mantenerse viva sin comprometer nuevas compensaciones? (configurar una colmena zookeeper.session.timeout.ms puede resolverse en un consumidor muerto sin reconocerlo)¿Me falta algún otro problema?

¡Gracias!

Respuestas a la pregunta(4)

Su respuesta a la pregunta