если вы хотите, вы можете внести это изменение локально, чтобы оно сразу заработало для вас, не дожидаясь выхода Python и Anaconda.

ема

Я готов заниматься проектированием объектов с использованием многопроцессорного модуля.(multiprocessing.Pool.starmap(), Тем не менее, он выдает сообщение об ошибке следующим образом. Я предполагаю, что это сообщение об ошибке о размере входных данных (2147483647 = 2 ^ 31 - 1?), Так как тот же код работал плавно для дроби(frac=0.05) входных фреймов данных (train_scala, test, ts). Я конвертирую типы фреймов данных как можно меньше, но это не становится лучше.

Версия anaconda - 4.3.30, а версия Python - 3.6 (64-разрядная). И объем памяти системы составляет более 128 ГБ с более чем 20 ядрами. Хотите ли вы предложить какой-либо указатель или решение для преодоления этой проблемы? Если эта проблема вызвана большими данными для многопроцессорного модуля, насколько меньше данных я должен использовать, чтобы использовать многопроцессорный модуль на Python3?

Код:

from multiprocessing import Pool, cpu_count
from itertools import repeat    
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)

Сообщение об ошибке:

Traceback (most recent call last):
  File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
    print('----Pool starmap start----')
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
    put(task)
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
    header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Дополнительная информацияhistoryCutoffs - это список целых чиселtrain_scala - фрейм данных для панд (377MB)Тест представляет собой панду DataFrame (15 МБ)ts - это датафрейм для панд (547MB)ul_parts_path - список каталогов (строка)is_train_seq - список логических значений

Дополнительный код: метод multiprocess_FE

def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
    train_dict = {}
    ts_dict = {}
    msno_dict = {}
    ul_dict = {}
    if is_train == True:
        train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
    else:
        train_dict[historyCutoff] = test
    msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
    print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
    ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
    print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))    
    ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt"))  ##.sample(frac=0.01, replace=False)
    ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
    train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)

Ответы на вопрос(2)

Ваш ответ на вопрос