Kafka - Probleme mit TimestampExtractor

Ich benutzeorg.apache.kafka:kafka-streams:0.10.0.1

Ich versuche, mit einem zeitreihenbasierten Stream zu arbeiten, der anscheinend kein @ auslösKStream.Process() auslösen ("punctuate"). (sehenHie als Referenz

In einemKafkaStreamsconfig Ich übergebe in diesem Parameter (unter anderem):

config.put(
  StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
  EventTimeExtractor.class.getName());

Hier,EventTimeExtractor ist ein benutzerdefinierter Zeitstempel-Extraktor (der @ implementieorg.apache.kafka.streams.processor.TimestampExtractor), um die Zeitstempelinformationen aus JSON-Daten zu extrahieren.

Ich würde erwarten, dass dies mein Objekt aufruft (abgeleitet vonTimestampExtractor) wenn jeder neue Datensatz eingezogen wird. Der betreffende Stream ist 2 * 10 ^ 6 Datensätze / Minute. Ich habepunctuate() auf 60 Sekunden eingestellt und es wird nie ausgelöst. Ich weiß, dass die Daten diese Zeitspanne sehr häufig durchlaufen, da sie alte Werte abrufen, um aufzuholen.

In der Tat wird es nie aufgerufen.

Ist dies der falsche Weg, um Zeitstempel in KStream-Datensätzen zu setzen?Ist dies der falsche Weg, um diese Konfiguration zu deklarieren?

Antworten auf die Frage(6)

Ihre Antwort auf die Frage