Problemas de configuración / rendimiento del consumidor de Kafka

Estoy probando kafka como alternativa a AWS SQS. La motivación es principalmente mejorar el rendimiento donde kafka eliminaría la restricción de extraer 10 mensajes a la vez con un límite de 256kb. Aquí hay un escenario de alto nivel de mi caso de uso. Tengo un montón de rastreadores que envían documentos para indexar. El tamaño de la carga útil es de alrededor de 1 mb en promedio. Los rastreadores llaman a un punto final SOAP que a su vez ejecuta un código de productor para enviar los mensajes a una cola kafka. La aplicación del consumidor recoge los mensajes y los procesa. Para mi cuadro de prueba, configuré el tema con 30 particiones con 2 réplicas. Las dos instancias de kafka se ejecutan con 1 instancia de zookeeper. La versión kafka es 0.10.0.

Para mi prueba, publiqué 7 millones de mensajes en la cola. Creé un grupo de consumidores con 30 hilos de consumo, uno por partición. Inicialmente tenía la impresión de que esto aceleraría sustancialmente la potencia de procesamiento en comparación con lo que recibía a través de SQS. Desafortunadamente, ese no fue el caso. En mi caso, el procesamiento de datos es complejo y demora de 1 a 2 minutos en promedio en completarse, lo que conduce a una ráfaga de reequilibrio de la partición ya que los hilos no pudieron latir a tiempo. Pude ver un montón de mensajes en el registro citando

La confirmación de compensación automática falló para el grupo full_group: la confirmación no puede completarse ya que el grupo ya ha reequilibrado y asignado las particiones a otro miembro. Esto significa que el tiempo entre las llamadas posteriores a sondeo () fue más largo que el session.timeout.ms configurado, lo que generalmente implica que el ciclo de sondeo está pasando demasiado tiempo procesando los mensajes. Puede abordar esto aumentando el tiempo de espera de la sesión o reduciendo el tamaño máximo de lotes devueltos en la encuesta () con max.poll.records.

Esto lleva a que el mismo mensaje se procese varias veces. Intenté jugar con el tiempo de espera de la sesión, max.poll.records y tiempo de sondeo para evitar esto, pero eso desaceleró el procesamiento general a lo grande. Aquí hay algunos de los parámetros de configuración.

metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest
Reduje el tiempo de encuesta al consumidor a 100 ms. Redujo los problemas de reequilibrio, eliminó el procesamiento duplicado pero ralentizó significativamente el proceso general. Terminó tardando 35 horas en completar el procesamiento de los 6 millones de mensajes en comparación con 25 horas utilizando la solución basada en SQS. Cada subproceso de consumidor en promedio recuperó 50-60 mensajes por encuesta, aunque algunos de ellos encuestaron 0 registros a veces. No estoy seguro de este comportamiento cuando hay una gran cantidad de mensajes disponibles en la partición. El mismo hilo pudo recoger mensajes durante la iteración posterior. ¿Podría esto ser debido al reequilibrio?

Aquí está mi código de consumidor

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                }
            }
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker", ex);
    }
Entiendo que la parte de procesamiento de registros es un cuello de botella en mi caso. Pero estoy seguro de que algunas personas aquí tienen un caso de uso similar al tratar con un gran tiempo de procesamiento. Pensé en hacer un procesamiento asíncrono girando cada procesador en su hilo dedicado o usar un grupo de hilos con gran capacidad, pero no estoy seguro de si crearía una gran carga en el sistema. Al mismo tiempo, he visto un par de instancias en las que las personas han utilizado la pausa y reanudar la API para realizar el procesamiento a fin de evitar problemas de reequilibrio.

Realmente estoy buscando algunos consejos / mejores prácticas en esta circunstancia. Particularmente, la configuración de configuración recomendada alrededor de escuchar, tiempo de espera de solicitud, registros de sondeo máximos, intervalo de confirmación automática, intervalo de sondeo, etc. si kafka no es la herramienta adecuada para mi caso de uso, hágamelo saber también.

Respuestas a la pregunta(1)

Su respuesta a la pregunta