Przyspiesz próbkowanie oszacowań jądra
Tutaj jestMWE
znacznie większego kodu, którego używam. Zasadniczo wykonuje integrację Monte Carlo nad KDE (oszacowanie gęstości jądra) dla wszystkich wartości znajdujących się poniżej pewnego progu (sugerowano metodę integracji przy tym pytaniu BTW:Zintegruj oszacowanie gęstości jądra 2D).
import numpy as np
from scipy import stats
import time
# Generate some random two-dimensional data:
def measure(n):
"Measurement model, return two coupled measurements."
m1 = np.random.normal(size=n)
m2 = np.random.normal(scale=0.5, size=n)
return m1+m2, m1-m2
# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()
# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)
# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5
# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik
# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik
# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik
# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik
Dane wyjściowe wyglądają mniej więcej tak:
iso: 0.00259208679199
resample: 0.000817060470581
filter/sample: 2.10829401016
integral: 4.2200088501e-05
co wyraźnie oznacza, żefiltr / próbka wywołanie zużywa prawie cały czas, którego kod używa do uruchomienia. Muszę uruchomić ten blok kodu iteracyjnie kilka tysięcy razy, aby mogło to zająć dużo czasu.
Czy jest jakiś sposób na przyspieszenie procesu filtrowania / próbkowania?
DodajOto nieco bardziej realistycznyMWE
mojego aktualnego kodu zapisanego w wielowątkowym rozwiązaniu Ophiona:
import numpy as np
from scipy import stats
from multiprocessing import Pool
def kde_integration(m_list):
m1, m2 = [], []
for item in m_list:
# Color data.
m1.append(item[0])
# Magnitude data.
m2.append(item[1])
# Define limits.
xmin, xmax = min(m1), max(m1)
ymin, ymax = min(m2), max(m2)
# Perform a kernel density estimate on the data:
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)
out_list = []
for point in m_list:
# Compute the point below which to integrate.
iso = kernel((point[0], point[1]))
# Sample KDE distribution
sample = kernel.resample(size=1000)
#Create definition.
def calc_kernel(samp):
return kernel(samp)
#Choose number of cores and split input array.
cores = 4
torun = np.array_split(sample, cores, axis=1)
#Calculate
pool = Pool(processes=cores)
results = pool.map(calc_kernel, torun)
#Reintegrate and calculate results
insample_mp = np.concatenate(results) < iso
# Integrate for all values below iso.
integral = insample_mp.sum() / float(insample_mp.shape[0])
out_list.append(integral)
return out_list
# Generate some random two-dimensional data:
def measure(n):
"Measurement model, return two coupled measurements."
m1 = np.random.normal(size=n)
m2 = np.random.normal(scale=0.5, size=n)
return m1+m2, m1-m2
# Create list to pass.
m_list = []
for i in range(60):
m1, m2 = measure(5)
m_list.append(m1.tolist())
m_list.append(m2.tolist())
# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)
Rozwiązanie przedstawione przezOphion działa świetnie na oryginalnym kodzie, który przedstawiłem, ale nie działa z następującym błędem w tej wersji:
Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Próbowałem przesunąćcalc_kernel
funkcjonować od jednej z odpowiedzi w tym pytaniuMultiprocessing: Jak korzystać z Pool.map na funkcji zdefiniowanej w klasie? stwierdza, że„Funkcja, którą dajesz mapowi () musi być dostępna poprzez import twojego modułu”; ale nadal nie mogę uruchomić tego kodu.
Każda pomoc będzie bardzo mile widziana.
Dodaj 2RealizowanieOphion's sugestia, aby usunąćcalc_kernel
funkcja i proste użycie:
results = pool.map(kernel, torun)
działa, aby pozbyć sięPicklingError
ale teraz widzę to, gdy tworzę inicjałm_list
o czymś więcej niż około 62-63 elementów dostaję ten błąd:
Traceback (most recent call last):
File "~/gauss_kde_temp.py", line 67, in <module>
print 'Integral result: ', kde_integration(m_list)
File "~/gauss_kde_temp.py", line 38, in kde_integration
pool = Pool(processes=cores)
File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
self._result_handler.start()
File "/usr/lib/python2.7/threading.py", line 494, in start
_start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread
Ponieważ moja rzeczywista lista w mojej rzeczywistej implementacji tego kodu może zawierać do 2000 pozycji, ten problem sprawia, że kod jest bezużyteczny. Linia38
czy to ten:
pool = Pool(processes=cores)
więc najwyraźniej ma to coś wspólnego z liczbą rdzeni, których używam?
To pytanie„Nie można uruchomić nowego błędu wątku” w Pythonie sugeruje użycie:
threading.active_count()
sprawdzić liczbę wątków, które mam, gdy dostanę ten błąd. Sprawdziłem i zawsze się zawiesza, gdy sięga374
wątki. Jak mogę zakodować ten problem?
Oto nowe pytanie dotyczące tego ostatniego wydania:Błąd wątku: nie można rozpocząć nowego wątku