python struct.error: el formato 'i' requiere -2147483648 <= número <= 2147483647
Estoy dispuesto a hacer una ingeniería de características usando el módulo de multiprocesamiento(multiprocessing.Pool.starmap()
. Sin embargo, da un mensaje de error de la siguiente manera. Supongo que este mensaje de error es aproximadamente del tamaño de las entradas (2147483647 = 2 ^ 31 - 1?), Ya que el mismo código funcionó sin problemas durante una fracción(frac=0.05)
de marcos de datos de entrada (train_scala, test, ts). Convierto los tipos de marco de datos lo más pequeños posible, sin embargo, no mejora.
La versión anaconda es 4.3.30 y la versión Python es 3.6 (64 bit). Y el tamaño de la memoria del sistema es superior a 128 GB con más de 20 núcleos. ¿Le gustaría sugerir algún puntero o solución para superar este problema? Si este problema es causado por una gran cantidad de datos para un módulo de multiprocesamiento, ¿cuántos datos más pequeños debo usar para utilizar el módulo de multiprocesamiento en Python3?
Código:
from multiprocessing import Pool, cpu_count
from itertools import repeat
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)
Mensaje de error:
Traceback (most recent call last):
File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
print('----Pool starmap start----')
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Informaciones extrahistoryCutoffs es una lista de enterostrain_scala es un DataFrame de pandas (377MB)prueba es un marco de datos de pandas (15 MB)ts es un DataFrame de pandas (547MB)ul_parts_path es una lista de directorios (cadena)is_train_seq es una lista de booleanosCódigo adicional: Método multiproceso_FE
def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
train_dict = {}
ts_dict = {}
msno_dict = {}
ul_dict = {}
if is_train == True:
train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
else:
train_dict[historyCutoff] = test
msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))
ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt")) ##.sample(frac=0.01, replace=False)
ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)