Como executar uma função em todos os trabalhadores do Spark antes de processar dados no PySpark?

Estou executando uma tarefa Spark Streaming em um cluster usando o YARN. Cada nó no cluster executa vários trabalhadores spark. Antes do início da transmissão, desejo executar uma função de "configuração" em todos os trabalhadores em todos os nós no cluster.

A tarefa de streaming classifica as mensagens recebidas como spam ou não, mas para fazer isso, é necessário baixar os modelos pré-treinados mais recentes do HDFS para o disco local, como este exemplo de pseudo-código:

def fetch_models():
    if hadoop.version > local.version:
        hadoop.download()

Eu vi os seguintes exemplos aqui no SO:

sc.parallelize().map(fetch_models)

Mas no Spark 1.6parallelize() requer que alguns dados sejam usados, como esta solução de merda que estou fazendo agora:

sc.parallelize(range(1, 1000)).map(fetch_models)

Apenas para ter certeza de que a função é executada em TODOS os trabalhadores, defino o intervalo como 1000. Também não sei exatamente quantos trabalhadores estão no cluster durante a execução.

Eu li a documentação de programação e pesquisei incansavelmente no Google, mas não consigo encontrar nenhuma maneira de realmente distribuir qualquer coisa a todos os trabalhadores sem nenhum dado.

Após a conclusão dessa fase de inicialização, a tarefa de streaming fica como de costume, operando com os dados recebidos do Kafka.

A maneira como estou usando os modelos é executando uma função semelhante a esta:

spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
    .repartition(spark_partitions)\
    .foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))

Teoricamente, eu poderia verificar se os modelos estão atualizados noon_partition função, embora seria realmente um desperdício fazer isso em cada lote. Gostaria de fazer isso antes que o Spark comece a recuperar lotes do Kafka, já que o download do HDFS pode demorar alguns minutos ...

ATUALIZAR:

Para ficar claro: não se trata de como distribuir os arquivos ou carregá-los; trata-se de executar um método arbitrário em todos os trabalhadores sem operar com nenhum dado.

Para esclarecer o que realmente significa carregar modelos atualmente:

def on_partition(config, partition):
    if not MyClassifier.is_loaded():
        MyClassifier.load_models(config)

    handle_partition(config, partition)

Enquanto MyClassifier é algo como isto:

class MyClassifier:
    clf = None

    @staticmethod
    def is_loaded():
        return MyClassifier.clf is not None

    @staticmethod
    def load_models(config):
        MyClassifier.clf = load_from_file(config)

Métodos estáticos, já que o PySpark não parece capaz de serializar classes com métodos não estáticos (o estado da classe é irrelevante em relação a outro trabalhador). Aqui só precisamos chamar load_models () uma vez e, em todos os lotes futuros, MyClassifier.clf será definido. Isso é algo que realmente não deve ser feito para cada lote, é algo único. O mesmo acontece com o download dos arquivos do HDFS usando fetch_models ().

questionAnswers(2)

yourAnswerToTheQuestion