Как запустить функцию на всех рабочих Spark перед обработкой данных в PySpark?

Я запускаю задачу Spark Streaming в кластере, используя YARN. Каждый узел в кластере запускает несколько искровых рабочих. Перед началом потоковой передачи я хочу выполнить функцию «setup» для всех рабочих на всех узлах кластера.

Задача потоковой передачи классифицирует входящие сообщения как спам или не спам, но прежде чем она сможет это сделать, ей необходимо загрузить последние предварительно обученные модели из HDFS на локальный диск, как в следующем примере с псевдокодом:

def fetch_models():
    if hadoop.version > local.version:
        hadoop.download()

Я видел следующие примеры здесь на SO:

sc.parallelize().map(fetch_models)

Но в Спарк 1.6parallelize() требует использования некоторых данных, как этот дерьмовый обходной путь, который я сейчас делаю:

sc.parallelize(range(1, 1000)).map(fetch_models)

Просто чтобы быть уверенным, что функция запускается на ВСЕХ рабочих, я установил диапазон на 1000. Я также точно не знаю, сколько рабочих в кластере работает.

Я прочитал документацию по программированию и безжалостно гуглил, но, похоже, я не могу найти способ на самом деле просто распространять что-либо среди всех работников без каких-либо данных.

После завершения этой фазы инициализации задача потоковой передачи, как обычно, обрабатывает входящие данные из Kafka.

Я использую модели, выполняя функцию, подобную этой:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
    .repartition(spark_partitions)\
    .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))

Теоретически я мог бы проверить, обновлены ли модели вon_partition функции, хотя было бы действительно расточительно делать это в каждой партии. Я хотел бы сделать это до того, как Spark начнет загружать пакеты из Kafka, поскольку загрузка из HDFS может занять пару минут ...

ОБНОВИТЬ:

Чтобы было ясно: вопрос не в том, как распространять файлы или как их загружать, а в том, как запустить произвольный метод на всех рабочих без обработки каких-либо данных.

Чтобы уточнить, что на самом деле означает загрузка моделей:

def on_partition(config, partition):
    if not MyClassifier.is_loaded():
        MyClassifier.load_models(config)

    handle_partition(config, partition)

Хотя MyClassifier выглядит примерно так:

class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        MyClassifier.clf = load_from_file(config)

Статические методы, поскольку PySpark, по-видимому, не способен сериализовать классы нестатическими методами (состояние класса не имеет отношения к отношению к другому работнику). Здесь нам нужно вызвать load_models () только один раз, и во всех будущих пакетах будет установлен MyClassifier.clf. Это то, что на самом деле не следует делать для каждой партии, это разовая вещь. То же самое с загрузкой файлов из HDFS с использованием fetch_models ().

Ответы на вопрос(2)

Ваш ответ на вопрос