Como executar uma função em todos os trabalhadores do Spark antes de processar dados no PySpark?
Estou executando uma tarefa Spark Streaming em um cluster usando o YARN. Cada nó no cluster executa vários trabalhadores spark. Antes do início da transmissão, desejo executar uma função de "configuração" em todos os trabalhadores em todos os nós no cluster.
A tarefa de streaming classifica as mensagens recebidas como spam ou não, mas para fazer isso, é necessário baixar os modelos pré-treinados mais recentes do HDFS para o disco local, como este exemplo de pseudo-código:
def fetch_models():
if hadoop.version > local.version:
hadoop.download()
Eu vi os seguintes exemplos aqui no SO:
sc.parallelize().map(fetch_models)
Mas no Spark 1.6parallelize()
requer que alguns dados sejam usados, como esta solução de merda que estou fazendo agora:
sc.parallelize(range(1, 1000)).map(fetch_models)
Apenas para ter certeza de que a função é executada em TODOS os trabalhadores, defino o intervalo como 1000. Também não sei exatamente quantos trabalhadores estão no cluster durante a execução.
Eu li a documentação de programação e pesquisei incansavelmente no Google, mas não consigo encontrar nenhuma maneira de realmente distribuir qualquer coisa a todos os trabalhadores sem nenhum dado.
Após a conclusão dessa fase de inicialização, a tarefa de streaming fica como de costume, operando com os dados recebidos do Kafka.
A maneira como estou usando os modelos é executando uma função semelhante a esta:
spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
.repartition(spark_partitions)\
.foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))
Teoricamente, eu poderia verificar se os modelos estão atualizados noon_partition
função, embora seria realmente um desperdício fazer isso em cada lote. Gostaria de fazer isso antes que o Spark comece a recuperar lotes do Kafka, já que o download do HDFS pode demorar alguns minutos ...
ATUALIZAR:
Para ficar claro: não se trata de como distribuir os arquivos ou carregá-los; trata-se de executar um método arbitrário em todos os trabalhadores sem operar com nenhum dado.
Para esclarecer o que realmente significa carregar modelos atualmente:
def on_partition(config, partition):
if not MyClassifier.is_loaded():
MyClassifier.load_models(config)
handle_partition(config, partition)
Enquanto MyClassifier é algo como isto:
class MyClassifier:
clf = None
@staticmethod
def is_loaded():
return MyClassifier.clf is not None
@staticmethod
def load_models(config):
MyClassifier.clf = load_from_file(config)
Métodos estáticos, já que o PySpark não parece capaz de serializar classes com métodos não estáticos (o estado da classe é irrelevante em relação a outro trabalhador). Aqui só precisamos chamar load_models () uma vez e, em todos os lotes futuros, MyClassifier.clf será definido. Isso é algo que realmente não deve ser feito para cada lote, é algo único. O mesmo acontece com o download dos arquivos do HDFS usando fetch_models ().