Kafka-Consumer-Konfiguration / Leistungsprobleme

Ich probiere kafka als Alternative zu AWS SQS aus. Die Motivation ist in erster Linie, die Leistung dort zu verbessern, wo kafka die Einschränkung eliminieren würde, 10 Nachrichten gleichzeitig mit einer Obergrenze von 256 KB zu empfangen. Hier ist ein allgemeines Szenario meines Anwendungsfalls. Ich habe eine Reihe von Crawlern, die Dokumente zur Indizierung senden. Die Größe der Nutzlast beträgt im Durchschnitt etwa 1 MB. Die Crawler rufen einen SOAP-Endpunkt auf, der wiederum einen Produzentencode ausführt, um die Nachrichten an eine kafka-Warteschlange zu senden. Die Consumer-App nimmt die Nachrichten auf und verarbeitet sie. Für meine Testbox habe ich das Thema mit 30 Partitionen mit 2 Replikationen konfiguriert. Die beiden kafka-Instanzen werden mit einer zookeeper-Instanz ausgeführt. Die kafka-Version ist 0.10.0.

Zum Testen habe ich 7 Millionen Nachrichten in der Warteschlange veröffentlicht. Ich habe eine Consumer-Gruppe mit 30 Consumer-Threads erstellt, eine pro Partition. Ich hatte anfangs den Eindruck, dass dies die Verarbeitungsleistung im Vergleich zu dem, was ich über SQS erhalte, erheblich beschleunigen würde. Dies sollte leider nicht der Fall sein. In meinem Fall ist die Datenverarbeitung komplex und dauert im Durchschnitt 1 bis 2 Minuten. Dies führte zu einem schnellen Ausgleich der Partitionen, da die Threads nicht pünktlich reagieren konnten. Ich konnte eine Reihe von Nachrichten im Protokoll sehen, die auf @ verweise

Auto Offset Commit fehlgeschlagen für Gruppe full_group: Commit kann nicht abgeschlossen werden, da die Gruppe bereits neu ausgeglichen und die Partitionen einem anderen Mitglied zugewiesen hat. Dies bedeutet, dass die Zeit zwischen aufeinanderfolgenden Aufrufen von poll () länger war als die konfigurierte session.timeout.ms, was normalerweise impliziert, dass die Polling-Schleife zu viel Zeit für die Verarbeitung von Nachrichten benötigt. Sie können dies entweder durch Erhöhen des Sitzungszeitlimits oder durch Verringern der maximalen Größe der in poll () mit max.poll.records zurückgegebenen Stapel beheben.

Dies führt dazu, dass dieselbe Nachricht mehrmals verarbeitet wird. Ich habe versucht, mit Sitzungs-Timeout, max.poll.records und Abfragezeit herumzuspielen, um dies zu vermeiden, aber das hat die gesamte Verarbeitungszeit verlangsamt. Hier sind einige Konfigurationsparameter.

metadata.max.age.ms = 300000
max.partition.fetch.bytes = 1048576
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092]
enable.auto.commit = true
max.poll.records = 10000
request.timeout.ms = 310000
heartbeat.interval.ms = 100000
auto.commit.interval.ms = 1000
receive.buffer.bytes = 65536
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer
group.id = full_group
retry.backoff.ms = 100
fetch.max.wait.ms = 500
connections.max.idle.ms = 540000
session.timeout.ms = 300000
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
metrics.sample.window.ms = 30000
auto.offset.reset = latest
Ich habe die Verbraucher-Abfragezeit auf 100 ms reduziert. Dadurch wurden die Probleme bei der Neuverteilung verringert, doppelte Verarbeitungen beseitigt und der Gesamtprozess erheblich verlangsamt. Die Verarbeitung aller 6 Millionen Nachrichten dauerte 35 Stunden, verglichen mit 25 Stunden bei Verwendung der SQS-basierten Lösung. Jeder Consumer-Thread hat im Durchschnitt 50-60 Nachrichten pro Umfrage abgerufen, obwohl einige von ihnen zeitweise 0 Datensätze abgefragt haben. Ich bin mir über dieses Verhalten nicht sicher, wenn in der Partition "," eine große Menge an Nachrichten verfügbar sind. Derselbe Thread konnte während der nachfolgenden Iteration Nachrichten abrufen. Könnte dies an einem Ausgleich liegen?

Hier ist mein Kundencode

while (true) {
    try{
        ConsumerRecords records = consumer.poll(100);
        for (ConsumerRecord record : records) {
            if(record.value()!=null){
                TextAnalysisRequest textAnalysisObj = record.value();
                if(textAnalysisObj!=null){
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj);
                }
            }
        }
    }catch(Exception ex){
        LOGGER.error("Error in Full Consumer group worker", ex);
    }
Ich verstehe, dass der Datensatzverarbeitungsteil in meinem Fall ein Engpass ist. Aber ich bin mir sicher, dass einige Leute hier einen ähnlichen Anwendungsfall für den Umgang mit großer Verarbeitungszeit haben. Ich dachte an eine asynchrone Verarbeitung, indem ich jeden Prozessor in seinem dedizierten Thread drehte oder einen Thread-Pool mit großer Kapazität verwendete, aber nicht sicher, ob dies eine große Auslastung des Systems verursachen würde. Zur gleichen Zeit habe ich einige Fälle erlebt, in denen Benutzer die API zum Anhalten und Fortsetzen verwendet haben, um die Verarbeitung durchzuführen, um ein erneutes Ausgleichen des Problems zu vermeiden.

Ich bin wirklich auf der Suche nach Ratschlägen / Best Practices unter diesen Umständen. Insbesondere die empfohlenen Konfigurationseinstellungen für Hearbeat, Anforderungszeitlimit, maximale Abfragedatensätze, Intervall für automatisches Festschreiben, Abfrageintervall usw. Wenn kafka nicht das richtige Tool für meinen Anwendungsfall ist, lassen Sie es mich bitte ebenfalls wissen.

Antworten auf die Frage(2)

Ihre Antwort auf die Frage