Kafka - Probleme mit TimestampExtractor
Ich benutzeorg.apache.kafka:kafka-streams:0.10.0.1
Ich versuche, mit einem zeitreihenbasierten Stream zu arbeiten, der anscheinend kein @ auslösKStream.Process()
auslösen ("punctuate"). (sehenHie als Referenz
In einemKafkaStreams
config Ich übergebe in diesem Parameter (unter anderem):
config.put(
StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
EventTimeExtractor.class.getName());
Hier,EventTimeExtractor
ist ein benutzerdefinierter Zeitstempel-Extraktor (der @ implementieorg.apache.kafka.streams.processor.TimestampExtractor
), um die Zeitstempelinformationen aus JSON-Daten zu extrahieren.
Ich würde erwarten, dass dies mein Objekt aufruft (abgeleitet vonTimestampExtractor
) wenn jeder neue Datensatz eingezogen wird. Der betreffende Stream ist 2 * 10 ^ 6 Datensätze / Minute. Ich habepunctuate()
auf 60 Sekunden eingestellt und es wird nie ausgelöst. Ich weiß, dass die Daten diese Zeitspanne sehr häufig durchlaufen, da sie alte Werte abrufen, um aufzuholen.
In der Tat wird es nie aufgerufen.
Ist dies der falsche Weg, um Zeitstempel in KStream-Datensätzen zu setzen?Ist dies der falsche Weg, um diese Konfiguration zu deklarieren?