Многопроцессорность и память Python

я используюmultiprocessing.imap_unordered выполнить вычисления по списку значений:

def process_parallel(fnc, some_list):
    pool = multiprocessing.Pool()
    for result in pool.imap_unordered(fnc, some_list):
        for x in result:
            yield x
    pool.terminate()

Каждый звонокfnc возвращает ОГРОМНЫЙ объект в результате, по замыслу. Я могу хранить N экземпляров такого объекта в оперативной памяти, где N ~ cpu_count, но не намного (не сотни).

Теперь использование этой функции занимает слишком много памяти. Память целиком и полностью расходуется на основной процесс, а не на рабочих.

Какimap_unordered сохранить готовые результаты? Я имею в виду результаты, которые уже были возвращены работниками, но еще не переданы пользователю. Я думал, что это было умно и только вычислил их "лениво" по мере необходимости, но, видимо, нет.

Похоже, так как я не могу потреблять результатыprocess_parallel достаточно быстро, бассейн продолжает стоять в очереди эти огромные объекты изfnc где-то внутри, а потом взрывается. Есть ли способ избежать этого? Ограничить свою внутреннюю очередь как-нибудь?

Я использую Python2.7. Приветствия.

 Felix24 июн. 2012 г., 07:33
Ну из того что я вижуyield находится в основном процессе, а не внутриfnc (т. е. функция, выполняемая рабочими). являетсяfnc сам делаешь ленивую оценку?
 Eric des Courtis29 июн. 2012 г., 01:48
Просто ограничение скорости на основе доступной памяти.
 user12411424 июн. 2012 г., 09:50
@FelixBonkoski Нет,fnc берет один предмет изsome_listи вычисляет и возвращает огромный объект из него.

Ответы на вопрос(2)

Самое простое решение, о котором я могу подумать, - это добавить закрытие, чтобы обернутьfnc функция, которая будет использовать семафор для управления общим числом одновременных выполнений заданий, которые могут выполняться одновременно (я предполагаю, что основной процесс / поток будет увеличивать семафор). Значение семафора может быть рассчитано на основе размера задания и доступной памяти.

Решение Вопроса

Как вы можете видеть, заглянув в соответствующий исходный файл (python2.7/multiprocessing/pool.py), IMapUnorderedIterator используетcollections.deque экземпляр для хранения результатов. Если появляется новый элемент, он добавляется и удаляется в итерации.

Как вы предположили, если во время обработки основного объекта еще один огромный объект появится, он также будет сохранен в памяти.

Вы можете попробовать что-то вроде этого:

it = pool.imap_unordered(fnc, some_list)
for result in it:
    it._cond.acquire()
    for x in result:
        yield x
    it._cond.release()

Это должно привести к блокировке потока задачи-результата-получателя при обработке элемента, если он пытается поместить следующий объект в очередь. Таким образом, в памяти не должно быть более двух огромных объектов. Если это работает для вашего случая, я не знаю;)

 30 июн. 2012 г., 17:24
Да, я понял, поэтому я сказал, что семафор, уменьшенный из основного потока, будет лучшим выбором, потому что вы получите более полное использование системы без использования всей памяти.
 27 июн. 2012 г., 16:16
Я не следую этому примеру, неit просто генератор и как таковой он не будет иметь_cond.acquire() а такжеrelease методы? Если вам нужно написать их самостоятельно, какой тип объекта._cond нужно быть?
 29 июн. 2012 г., 02:17
Похоже, что пользователь заботится о производительности, зачем ограничивать ее небольшим числом с помощью простой блокировки?
 29 июн. 2012 г., 08:28
@EricdesCourtis: кажется, что ограничением является не количество выполняемых заданий, которые уже контролируются через пул, а размер результата, который основной поток не может одновременно хранить в памяти.
 29 июн. 2012 г., 08:23
@Hooked:imap_unordered возвращаетIMapUnorderedIterator, который имеет эти функции, как видно из взгляда в соответствующий исходный код. Поскольку поток-получатель-результат (после получения результата) потребует блокировки для ввода результата в deque, это заблокирует поток и не позволит ему потреблять больше памяти.

Ваш ответ на вопрос