La forma más rápida de calcular la distancia euclidiana mínima entre dos matrices que contienen vectores de alta dimensión

Comencé una pregunta similar enotro hilo, pero luego me estaba enfocando en cómo usar OpenCV. Habiendo fallado en lograr lo que originalmente quería, preguntaré aquí exactamente lo que quiero.

Tengo dos matrices. La matriz a es 2782x128 y la matriz b es 4000x128, ambos valores de caracteres sin signo. Los valores se almacenan en una sola matriz. Para cada vector en a, necesito el índice del vector en b con la distancia euclidiana más cercana.

Ok, ahora mi código para lograr esto:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

Y adjuntos están los archivos con matrices de muestra.

matriza matrizb

Estoy usando windows.h solo para calcular el tiempo de consumo, por lo que si desea probar el código en otra plataforma que no sea Windows, simplemente cambie el encabezado de windows.h y cambie la forma de calcular el tiempo de consumo.

Este código en mi computadora es de unos 0.5 segundos. El problema es que tengo otro código en Matlab que hace lo mismo en 0.05 segundos. En mis experimentos, recibo varias matrices como matriz a cada segundo, por lo que 0.5 segundos es demasiado.

Ahora el código matlab para calcular esto:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

De acuerdo. El código de Matlab usa ese (x-a) ^ 2 = x ^ 2 + a ^ 2 - 2ab.

Así que mi siguiente intento fue hacer lo mismo. Borre mi propio código para hacer los mismos cálculos, pero fue de aproximadamente 1,2 segundos.

Entonces, traté de usar diferentes bibliotecas externas. El primer intento fue Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

Este código Eigen cuesta 1.2 aproximadamente solo por la línea que dice: ab = a * b.transpose ();

También se usó un código similar que usa opencv, y el costo de ab = a * b.transpose (); Fue de 0.65 segundos.

Entonces, ¡es realmente molesto que matlab sea capaz de hacer esto mismo tan rápidamente y no puedo en C ++! Por supuesto, poder realizar mi experimento sería genial, pero creo que la falta de conocimiento es lo que realmente me molesta. ¿Cómo puedo lograr al menos el mismo rendimiento que en Matlab? Cualquier tipo de solusión es bienvenida. Quiero decir, cualquier biblioteca externa (gratuita si es posible), cosas que se desenrollan en bucle, cosas de plantilla, instrucciones SSE (sé que existen), cosas de caché. Como dije, mi propósito principal es aumentar mi conocimiento para poder codificar ideas como esta con un rendimiento más rápido.

Gracias por adelantado

EDIT: más código sugerido por David Hammen. Eché las matrices a int antes de hacer cualquier cálculo. Aquí está el código:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

El proceso completo ahora es 0.6, y los bucles de lanzamiento al principio son 0.001 segundos. Tal vez hice algo mal?

EDIT2: ¿Algo sobre Eigen? Cuando busco librerías externas, siempre hablan de Eigen y su velocidad. Hice algo mal? Aquí un código simple usando Eigen que muestra que no es tan rápido. Tal vez me falte alguna configuración o alguna bandera, o ...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

Este código es de unos 0.9 segundos.

Respuestas a la pregunta(3)

Su respuesta a la pregunta