Wie führe ich eine Funktion für alle Spark-Worker aus, bevor ich Daten in PySpark verarbeite?

Ich führe eine Spark-Streaming-Aufgabe in einem Cluster mit YARN aus. Auf jedem Knoten im Cluster werden mehrere Spark Worker ausgeführt. Bevor das Streaming startet, möchte ich eine "Setup" -Funktion für alle Worker auf allen Knoten im Cluster ausführen.

Die Streaming-Aufgabe klassifiziert eingehende Nachrichten als Spam oder nicht als Spam. Bevor dies jedoch möglich ist, müssen die neuesten vorab trainierten Modelle von HDFS auf die lokale Festplatte heruntergeladen werden, wie im folgenden Pseudocodebeispiel:

def fetch_models():
    if hadoop.version > local.version:
        hadoop.download()

Ich habe die folgenden Beispiele hier auf SO gesehen:

sc.parallelize().map(fetch_models)

Aber in Spark 1.6parallelize() erfordert, dass einige Daten verwendet werden, wie diese beschissene Abhilfe, die ich gerade mache:

sc.parallelize(range(1, 1000)).map(fetch_models)

Um ziemlich sicher zu sein, dass die Funktion auf ALLEN Workern ausgeführt wird, habe ich den Bereich auf 1000 gesetzt. Ich weiß auch nicht genau, wie viele Worker im Cluster ausgeführt werden.

Ich habe die Programmierungsdokumentation gelesen und unerbittlich gegoogelt, aber ich kann scheinbar keine Möglichkeit finden, etwas ohne Daten an alle Mitarbeiter zu verteilen.

Nach Abschluss dieser Initialisierungsphase wird der Streaming-Task wie gewohnt ausgeführt und verarbeitet eingehende Daten von Kafka.

Die Art und Weise, wie ich die Modelle verwende, besteht darin, eine Funktion auszuführen, die der folgenden ähnelt:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
    .repartition(spark_partitions)\
    .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))

heoretisch könnte ich im @ prüfen, ob die Modelle auf dem neuesten Stand sinon_partition -Funktion, obwohl es wirklich verschwenderisch wäre, dies bei jeder Charge zu tun. Ich würde es gerne tun, bevor Spark anfängt, Stapel von Kafka abzurufen, da das Herunterladen von HDFS einige Minuten dauern kann ...

AKTUALISIEREN

Um klar zu sein: Es geht nicht darum, wie die Dateien verteilt oder geladen werden, sondern darum, wie eine beliebige Methode für alle Worker ausgeführt wird, ohne Daten zu verarbeiten.

Um zu klären, was das Laden von Modellen aktuell bedeutet:

def on_partition(config, partition):
    if not MyClassifier.is_loaded():
        MyClassifier.load_models(config)

    handle_partition(config, partition)

Während MyClassifier ungefähr so aussieht:

class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        MyClassifier.clf = load_from_file(config)

Statische Methoden, da PySpark keine Klassen mit nicht statischen Methoden serialisieren kann (der Status der Klasse ist in Bezug auf einen anderen Worker irrelevant). Hier müssen wir load_models () nur einmal aufrufen und bei allen zukünftigen Batches wird MyClassifier.clf gesetzt. Dies sollte eigentlich nicht für jede Charge gemacht werden, es ist eine einmalige Sache. Dasselbe gilt für das Herunterladen der Dateien von HDFS mit fetch_models ().

Antworten auf die Frage(4)

Ihre Antwort auf die Frage