¿Cómo ejecutar una función en todos los trabajadores de Spark antes de procesar datos en PySpark?

Estoy ejecutando una tarea de Spark Streaming en un clúster usando YARN. Cada nodo en el clúster ejecuta múltiples trabajadores de chispa. Antes de que comience la transmisión, quiero ejecutar una función de "configuración" en todos los trabajadores en todos los nodos del clúster.

La tarea de transmisión clasifica los mensajes entrantes como spam o no spam, pero antes de que pueda hacerlo necesita descargar los últimos modelos previamente entrenados de HDFS al disco local, como este ejemplo de pseudocódigo:

def fetch_models():
    if hadoop.version > local.version:
        hadoop.download()

He visto los siguientes ejemplos aquí en SO:

sc.parallelize().map(fetch_models)

Pero en Spark 1.6parallelize() requiere que se utilicen algunos datos, como esta solución de mierda que estoy haciendo ahora:

sc.parallelize(range(1, 1000)).map(fetch_models)

Solo para estar bastante seguro de que la función se ejecuta en TODOS los trabajadores, configuré el rango en 1000. Tampoco sé exactamente cuántos trabajadores hay en el clúster cuando se ejecutan.

He leído la documentación de programación y busqué en Google sin descanso, pero parece que no puedo encontrar ninguna manera de distribuir nada a todos los trabajadores sin ningún dato.

Después de que se realiza esta fase de inicialización, la tarea de transmisión es como de costumbre, operando con los datos entrantes de Kafka.

La forma en que uso los modelos es ejecutando una función similar a esta:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
    .repartition(spark_partitions)\
    .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))

Teóricamente podría comprobar si los modelos están actualizados o no en elon_partition función, aunque sería realmente un desperdicio hacer esto en cada lote. Me gustaría hacerlo antes de que Spark comience a recuperar lotes de Kafka, ya que la descarga desde HDFS puede tomar un par de minutos ...

ACTUALIZAR:

Para ser claros: no se trata de cómo distribuir los archivos o cómo cargarlos, se trata de cómo ejecutar un método arbitrario en todos los trabajadores sin operar con ningún dato.

Para aclarar qué significa actualmente cargar modelos:

def on_partition(config, partition):
    if not MyClassifier.is_loaded():
        MyClassifier.load_models(config)

    handle_partition(config, partition)

Si bien MyClassifier es algo como esto:

class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        MyClassifier.clf = load_from_file(config)

Métodos estáticos ya que PySpark no parece ser capaz de serializar clases con métodos no estáticos (el estado de la clase es irrelevante en relación con otro trabajador). Aquí solo tenemos que llamar a load_models () una vez, y en todos los lotes futuros se establecerá MyClassifier.clf. Esto es algo que realmente no debe hacerse para cada lote, es una cosa de una sola vez. Lo mismo con la descarga de archivos desde HDFS usando fetch_models ().

Respuestas a la pregunta(2)

Su respuesta a la pregunta