Przyspiesz próbkowanie oszacowań jądra

Tutaj jestMWE znacznie większego kodu, którego używam. Zasadniczo wykonuje integrację Monte Carlo nad KDE (oszacowanie gęstości jądra) dla wszystkich wartości znajdujących się poniżej pewnego progu (sugerowano metodę integracji przy tym pytaniu BTW:Zintegruj oszacowanie gęstości jądra 2D).

import numpy as np
from scipy import stats
import time

# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()

# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)

# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5

# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik

# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik

# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik

# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik

Dane wyjściowe wyglądają mniej więcej tak:

iso:  0.00259208679199
resample:  0.000817060470581
filter/sample:  2.10829401016
integral:  4.2200088501e-05

co wyraźnie oznacza, żefiltr / próbka wywołanie zużywa prawie cały czas, którego kod używa do uruchomienia. Muszę uruchomić ten blok kodu iteracyjnie kilka tysięcy razy, aby mogło to zająć dużo czasu.

Czy jest jakiś sposób na przyspieszenie procesu filtrowania / próbkowania?

Dodaj

Oto nieco bardziej realistycznyMWE mojego aktualnego kodu zapisanego w wielowątkowym rozwiązaniu Ophiona:

import numpy as np
from scipy import stats
from multiprocessing import Pool

def kde_integration(m_list):

    m1, m2 = [], []
    for item in m_list:
        # Color data.
        m1.append(item[0])
        # Magnitude data.
        m2.append(item[1])

    # Define limits.
    xmin, xmax = min(m1), max(m1)
    ymin, ymax = min(m2), max(m2)

    # Perform a kernel density estimate on the data:
    x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
    values = np.vstack([m1, m2])
    kernel = stats.gaussian_kde(values)

    out_list = []

    for point in m_list:

        # Compute the point below which to integrate.
        iso = kernel((point[0], point[1]))

        # Sample KDE distribution
        sample = kernel.resample(size=1000)

        #Create definition.
        def calc_kernel(samp):
            return kernel(samp)

        #Choose number of cores and split input array.
        cores = 4
        torun = np.array_split(sample, cores, axis=1)

        #Calculate
        pool = Pool(processes=cores)
        results = pool.map(calc_kernel, torun)

        #Reintegrate and calculate results
        insample_mp = np.concatenate(results) < iso

        # Integrate for all values below iso.
        integral = insample_mp.sum() / float(insample_mp.shape[0])

        out_list.append(integral)

    return out_list


# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Create list to pass.
m_list = []
for i in range(60):
    m1, m2 = measure(5)
    m_list.append(m1.tolist())
    m_list.append(m2.tolist())

# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)

Rozwiązanie przedstawione przezOphion działa świetnie na oryginalnym kodzie, który przedstawiłem, ale nie działa z następującym błędem w tej wersji:

Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

Próbowałem przesunąćcalc_kernel funkcjonować od jednej z odpowiedzi w tym pytaniuMultiprocessing: Jak korzystać z Pool.map na funkcji zdefiniowanej w klasie? stwierdza, że„Funkcja, którą dajesz mapowi () musi być dostępna poprzez import twojego modułu”; ale nadal nie mogę uruchomić tego kodu.

Każda pomoc będzie bardzo mile widziana.

Dodaj 2

RealizowanieOphion's sugestia, aby usunąćcalc_kernel funkcja i proste użycie:

results = pool.map(kernel, torun)

działa, aby pozbyć sięPicklingError ale teraz widzę to, gdy tworzę inicjałm_list o czymś więcej niż około 62-63 elementów dostaję ten błąd:

Traceback (most recent call last):
  File "~/gauss_kde_temp.py", line 67, in <module>
    print 'Integral result: ', kde_integration(m_list)
  File "~/gauss_kde_temp.py", line 38, in kde_integration
    pool = Pool(processes=cores)
  File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
    self._result_handler.start()
  File "/usr/lib/python2.7/threading.py", line 494, in start
    _start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread

Ponieważ moja rzeczywista lista w mojej rzeczywistej implementacji tego kodu może zawierać do 2000 pozycji, ten problem sprawia, że ​​kod jest bezużyteczny. Linia38 czy to ten:

pool = Pool(processes=cores)

więc najwyraźniej ma to coś wspólnego z liczbą rdzeni, których używam?

To pytanie„Nie można uruchomić nowego błędu wątku” w Pythonie sugeruje użycie:

threading.active_count()

sprawdzić liczbę wątków, które mam, gdy dostanę ten błąd. Sprawdziłem i zawsze się zawiesza, gdy sięga374 wątki. Jak mogę zakodować ten problem?

Oto nowe pytanie dotyczące tego ostatniego wydania:Błąd wątku: nie można rozpocząć nowego wątku

questionAnswers(2)

yourAnswerToTheQuestion