Kafka - реализация отложенной очереди с использованием потребителя высокого уровня

Хотите реализовать отложенного потребителя, используя пользовательский API высокого уровня

смысл:

генерировать сообщения по ключу (каждое сообщение содержит метку времени создания), это гарантирует, что каждый раздел имеет упорядоченные сообщения по произведенному времени.auto.commit.enable = false (явно фиксируется после каждого процесса обработки сообщений)потреблять сообщениепроверьте метку времени сообщения и проверьте, прошло ли достаточно времениобработать сообщение (эта операция никогда не завершится ошибкой)

совершить 1 смещение

while (it.hasNext()) {
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg)) { 
     waitSomeTime() //Thread.sleep or something....
  }
  //certain that the msg was delayed and can now be handled
  Try { process(msg) } //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg
}

некоторые опасения по поводу этой реализации:

фиксация каждого смещения может замедлить ZKМожет ли customer.commitOffsets выдать исключение? если да, я буду использовать одно и то же сообщение дважды (могу решить с помощью идемпотентных сообщений)проблема ожидания долгое время без фиксации смещения, например, период задержки составляет 24 часа, будет следующий от итератора, сон в течение 24 часов, обработка и принятие (время ожидания сеанса ZK?)Как сохранить сеанс ZK без фиксации новых смещений? (установка улья zookeeper.session.timeout.ms может разрешить мертвого потребителя без его распознавания)какие-либо другие проблемы, которых я пропускаю?

Спасибо!

Ответы на вопрос(4)

Ваш ответ на вопрос