Kafka - Implementação de fila atrasada usando consumidor de alto nível

Deseja implementar um consumidor atrasado usando a API de alto nível do consumidor

Ideia principal:

produzir mensagens por chave (cada msg contém o carimbo de data / hora da criação), isso garante que cada partição tenha ordenado mensagens por tempo produzido.auto.commit.enable = false (será confirmado explicitamente após cada processo de mensagem)consumir uma mensagemverifique o registro de data e hora da mensagem e verifique se já passou tempo suficientemensagem de processo (esta operação nunca falhará)

confirmar 1 deslocamento

while (it.hasNext()) {
  val msg = it.next().message()
  //checks timestamp in msg to see delay period exceeded
  while (!delayedPeriodPassed(msg)) { 
     waitSomeTime() //Thread.sleep or something....
  }
  //certain that the msg was delayed and can now be handled
  Try { process(msg) } //the msg process will never fail the consumer
  consumer.commitOffsets //commit each msg
}

algumas preocupações sobre esta implementação:

confirmar cada deslocamento pode desacelerar o ZKconsumer.commitOffsets pode lançar uma exceção? se sim, consumirei a mesma mensagem duas vezes (pode resolver com mensagens idempotentes)problema que espera muito tempo sem confirmar o deslocamento, por exemplo, o período de atraso é de 24 horas, será o próximo do iterador, será suspenso por 24 horas, processado e confirmado (tempo limite da sessão ZK?)como a sessão do ZK pode se manter viva sem cometer novas compensações? (definir uma seção zookeeper.session.timeout.ms pode ser resolvido no consumidor morto sem reconhecê-lo)faltam outros problemas?

Obrigado!

questionAnswers(4)

yourAnswerToTheQuestion