Kafka - Implementação de fila atrasada usando consumidor de alto nível
Deseja implementar um consumidor atrasado usando a API de alto nível do consumidor
Ideia principal:
produzir mensagens por chave (cada msg contém o carimbo de data / hora da criação), isso garante que cada partição tenha ordenado mensagens por tempo produzido.auto.commit.enable = false (será confirmado explicitamente após cada processo de mensagem)consumir uma mensagemverifique o registro de data e hora da mensagem e verifique se já passou tempo suficientemensagem de processo (esta operação nunca falhará)confirmar 1 deslocamento
while (it.hasNext()) {
val msg = it.next().message()
//checks timestamp in msg to see delay period exceeded
while (!delayedPeriodPassed(msg)) {
waitSomeTime() //Thread.sleep or something....
}
//certain that the msg was delayed and can now be handled
Try { process(msg) } //the msg process will never fail the consumer
consumer.commitOffsets //commit each msg
}
algumas preocupações sobre esta implementação:
confirmar cada deslocamento pode desacelerar o ZKconsumer.commitOffsets pode lançar uma exceção? se sim, consumirei a mesma mensagem duas vezes (pode resolver com mensagens idempotentes)problema que espera muito tempo sem confirmar o deslocamento, por exemplo, o período de atraso é de 24 horas, será o próximo do iterador, será suspenso por 24 horas, processado e confirmado (tempo limite da sessão ZK?)como a sessão do ZK pode se manter viva sem cometer novas compensações? (definir uma seção zookeeper.session.timeout.ms pode ser resolvido no consumidor morto sem reconhecê-lo)faltam outros problemas?Obrigado!