Spark Streaming mapWithState scheint in regelmäßigen Abständen den vollständigen Zustand wiederherzustellen

Ich arbeite an einem Scala (2.11) / Spark (1.6.1) -Streaming-Projekt und benutzemapWithState(), um die angezeigten Daten früherer Stapel zu verfolgen.

Der Status ist in 20 Partitionen auf mehrere Knoten verteilt, die mit @ erstellt wurdeStateSpec.function(trackStateFunc _).numPartitions(20). In diesem Zustand sind nur einige Tasten (~ 100) @ zugeordneSets mit bis zu 160.000 Einträgen, die in der gesamten Anwendung wachsen. Der gesamte Staat ist bis zu3GB, das von jedem Knoten im Cluster verarbeitet werden kann. In jedem Stapel werden einige Daten zu einem Status hinzugefügt, aber erst ganz am Ende des Prozesses gelöscht, d. H. ~ 15 Minute

Während der Benutzeroberfläche der Anwendung folgt, ist die Verarbeitungszeit jeder zehnten Charge im Vergleich zu den anderen Chargen sehr hoch. Siehe Bilder:

Die gelben Felder stehen für die hohe Verarbeitungszeit.

Eine detailliertere Jobansicht zeigt, dass in diesen Stapeln zu einem bestimmten Zeitpunkt genau dann vorkommt, wenn alle 20 Partitionen "übersprungen" werden. Oder das sagt die Benutzeroberfläche.

Mein Verständnis vonskipped ist, dass jede Statuspartition eine mögliche Aufgabe ist, die nicht ausgeführt wird, da sie nicht neu berechnet werden muss. Allerdings verstehe ich nicht, warum die Menge vonskips variiert in jedem Job und warum der letzte Job so viel Verarbeitung erfordert. Die höhere Verarbeitungszeit ist unabhängig von der Größe des Status und wirkt sich nur auf die Dauer aus.

Ist dies ein Fehler in dermapWithState() Funktionalität oder ist dies beabsichtigtes Verhalten? Erfordert die zugrunde liegende Datenstruktur ein UmgruppiereSet im Stand müssen Daten kopiert werden? Oder ist es wahrscheinlicher, dass meine Bewerbung fehlerhaft ist?

Antworten auf die Frage(4)

Ihre Antwort auf die Frage