если вы хотите, вы можете внести это изменение локально, чтобы оно сразу заработало для вас, не дожидаясь выхода Python и Anaconda.

ема

Я готов заниматься проектированием объектов с использованием многопроцессорного модуля.(multiprocessing.Pool.starmap(), Тем не менее, он выдает сообщение об ошибке следующим образом. Я предполагаю, что это сообщение об ошибке о размере входных данных (2147483647 = 2 ^ 31 - 1?), Так как тот же код работал плавно для дроби(frac=0.05) входных фреймов данных (train_scala, test, ts). Я конвертирую типы фреймов данных как можно меньше, но это не становится лучше.

Версия anaconda - 4.3.30, а версия Python - 3.6 (64-разрядная). И объем памяти системы составляет более 128 ГБ с более чем 20 ядрами. Хотите ли вы предложить какой-либо указатель или решение для преодоления этой проблемы? Если эта проблема вызвана большими данными для многопроцессорного модуля, насколько меньше данных я должен использовать, чтобы использовать многопроцессорный модуль на Python3?

Код:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

Сообщение об ошибке:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Дополнительная информацияhistoryCutoffs - это список целых чиселtrain_scala - фрейм данных для панд (377MB)Тест представляет собой панду DataFrame (15 МБ)ts - это датафрейм для панд (547MB)ul_parts_path - список каталогов (строка)is_train_seq - список логических значений

Дополнительный код: метод multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

Ответы на вопрос(2)

Решение Вопроса

маринование, а для маринованных данных ставится префикс с размером маринованных данных. Для вашего метода,все аргументы вместе маринованные как один объект.

Вы создали объект, который при мариновании больше, чем умещается вi struct formatter (четырехбайтовое целое число со знаком), нарушающее допущения, сделанные в коде.

Вместо этого вы можете делегировать чтение ваших фреймов данных дочернему процессу, отправляя только метаданные, необходимые для загрузки фрейма данных. Их объединенный размер приближается к 1 ГБ, слишком много данных для совместного использования между процессами.

Цитируя изРуководство по программированию раздел:

Лучше наследовать, чем мариновать

При использованииspawn или жеforkserver методы запуска многих типов изmultiprocessing должны быть доступны для выбора, чтобы их могли использовать дочерние процессы.Однако, как правило, следует избегать отправки общих объектов другим процессам с использованием каналов или очередей. Вместо этого вы должны расположить программу так, чтобы процесс, которому необходим доступ к общему ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.

Если вы не работаете в Windows и используете либоspawn или жеforkserver методы, вы могли бы загрузить свои кадры данных как глобальныедо запуск ваших подпроцессов, после чего дочерние процессы будут «наследовать» данные через обычные механизмы совместного использования страниц памяти при копировании в ОС.

 SUNDONG12 дек. 2017 г., 16:58
Что именно представляет собой sys.maxsize в этом случае? 2147483647 = 2,147 ГБ? Могу ли я контролировать порог размера?
 Martijn Pieters♦12 дек. 2017 г., 17:00
@SUNDONG: Извините, это не такsys.maxsize, этоi struct formatter, поэтому 4-байтовое целое число подписано. Вы не можете контролировать этот порог размера. Вы передаете объекты, которые действительно слишком велики для такого совместного использования.
 dpb19 мар. 2018 г., 14:11
@MartijnPieters Отличные ответы, спасибо! Просто комментарий, хотя - разве это не массово расстраивает? Очень старое мышление. Например, если я передаю данные подпроцессам по сети, я понимаю проблему; но делать это между процессами с локальной и более 50 ГБ ОЗУ, общими шинами и т. д. - кого это волнует. Должно быть масштабируемым. Выдать предупреждение ради Пита. Не сильно ломать struct.error.
 Martijn Pieters♦10 янв. 2018 г., 11:39
@ Эммануэль-лин: если ваши результаты настолько велики, запишите их в какое-то общее хранилище. Файл или база данных.
 SUNDONG12 дек. 2017 г., 17:00
Хорошо, я попробую загрузить данные в дочерний методmultiprocess_FE вместо. Тем не менее, я мог без проблем передавать меньшие кадры данных (размером примерно 1000–10 000).

https://github.com/python/cpython/pull/10305

если вы хотите, вы можете внести это изменение локально, чтобы оно сразу заработало для вас, не дожидаясь выхода Python и Anaconda.

Ваш ответ на вопрос