Python 3: capturando avisos durante o multiprocessamento

Demasiado longo; não leu

owarnings.catch_warnings() gerenciador de contexto énão é seguro. Como faço para usá-lo em um ambiente de processamento paralelo?

fundo

O código abaixo resolve um problema de maximização usando processamento paralelo com o Pythonmultiprocessing módulo. Ele pega uma lista de widgets (imutáveis), particiona-os (vejaMultiprocessamento eficiente de maximização massiva de força bruta no Python 3), encontra o maxima ("finalistas") de todas as partições e, em seguida, encontra o máximo ("campeão") desses "finalistas". Se eu entendi meu próprio código corretamente (e eu não estaria aqui se o fizesse), estou compartilhando memória com todos os processos filhos para dar a eles os widgets de entrada emultiprocessing usa um tubo de nível do sistema operacional e decapagem para enviar os widgets finalistas de volta ao processo principal quando os trabalhadores estiverem prontos.

Fonte do problema

Eu quero pegar os avisos de widget redundantes causados ​​pela re-instanciação dos widgets após o desparelhamento isso acontece quando os widgets saem do canal entre processos. Quando objetos de widget instanciam, eles validam seus próprios dados, emitindo avisos do padrão Pythonwarnings módulo para informar ao usuário do aplicativo que o widget suspeita que há um problema com os dados de entrada do usuário. Como o unpickling faz com que os objetos instanciem, meu entendimento do código implica que cada objeto de widget é reinstanciado exatamente uma vez se e somente se for um finalista depois que ele sai do canal - veja a próxima seção para ver porque isso não está correto .

Os widgets já foram criados antes de serem fracionados, então o usuário já está dolorosamente ciente de qual entrada ele errou e não quer ouvir sobre isso novamente. Estes são os avisos que eu gostaria de pegar com owarnings módulo decatch_warnings() gestor de contexto (ou seja, umwith declaração).

Soluções com falha

Em meus testes eu reduzi quando os avisos supérfluos estão sendo emitidos para qualquer lugar entre o que eu rotulei abaixo comoLinha A eLinha B. O que me surpreende é que os avisos estão sendo emitidos em outros lugares que não apenas pertooutput_queue.get(). Isso implica para mim quemultiprocessing envia os widgets para os trabalhadores usando decapagem.

O resultado é que colocar um gerenciador de contexto criado porwarnings.catch_warnings() mesmo em torno de tudo, desdeLinha A paraLinha B e definir o filtro de avisos correto dentro deste contexto não captura os avisos. Isso implica para mim que os avisos estão sendo emitidos nos processos de trabalho. Colocar esse gerenciador de contexto em torno do código do trabalhador também não captura os avisos.

O código

Este exemplo omite o código para decidir se o tamanho do problema é pequeno demais para se preocupar com processos de bifurcação, importação de multiprocessamento e definiçãomy_frobnal_counteremy_load_balancer.

"Call `frobnicate(list_of_widgets)` to get the widget with the most frobnals"

def frobnicate_parallel_worker(widgets, output_queue):
    resultant_widget = max(widgets, key=my_frobnal_counter)
    output_queue.put(resultant_widget)

def frobnicate_parallel(widgets):
    output_queue = multiprocessing.Queue()
    # partitions: Generator yielding tuples of sets
    partitions = my_load_balancer(widgets)
    processes = []
    # Line A: Possible start of where the warnings are coming from.
    for partition in partitions:
        p = multiprocessing.Process(
                 target=frobnicate_parallel_worker,
                 args=(partition, output_queue))
        processes.append(p)
        p.start()
    finalists = []
    for p in processes:
        finalists.append(output_queue.get())
    # Avoid deadlocks in Unix by draining queue before joining processes
    for p in processes:
        p.join()
    # Line B: Warnings no longer possible after here.
    return max(finalists, key=my_frobnal_counter)

questionAnswers(2)

yourAnswerToTheQuestion