Maneira mais rápida de calcular a distância euclideana mínima entre duas matrizes contendo vetores de alta dimensionalidade

Eu comecei uma pergunta semelhante sobreoutro segmento, mas eu estava me concentrando em como usar o OpenCV. Tendo falhado em alcançar o que eu queria originalmente, perguntarei exatamente o que eu quero.

Eu tenho duas matrizes. A matriz a é 2782x128 e a Matriz b é 4000x128, ambos valores char não assinados. Os valores são armazenados em uma única matriz. Para cada vetor em a, eu preciso do índice do vetor em b com a distância euclidiana mais próxima.

Ok, agora meu código para conseguir isso:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

E anexado são os arquivos com matrizes de amostra.

matrixa matrixb

Estou usando windows.h apenas para calcular o tempo de consumo, por isso, se você quiser testar o código em outra plataforma que o windows, basta alterar o cabeçalho windows.h e alterar a forma de calcular o tempo de consumo.

Este código no meu computador é de cerca de 0,5 segundos. O problema é que eu tenho outro código no Matlab que faz a mesma coisa em 0.05 segundos. Nos meus experimentos, estou recebendo várias matrizes como a matriz a cada segundo, então 0,5 segundo é demais.

Agora o código matlab para calcular isso:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

Está bem. O código Matlab está usando isso (x-a) ^ 2 = x ^ 2 + a ^ 2 - 2ab.

Então minha próxima tentativa foi fazer a mesma coisa. Eu apaguei o meu próprio código para fazer os mesmos cálculos, mas foi de 1,2 segundos aprox.

Então, tentei usar bibliotecas externas diferentes. A primeira tentativa foi Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

Este código Eigen custa 1,2 aprox para apenas a linha que diz: ab = a * b.transpose ();

Um código semelhante usando opencv foi usado também, e o custo do ab = a * b.transpose (); foi de 0,65 segundos.

Então, é realmente chato que o Matlab é capaz de fazer a mesma coisa tão rapidamente e eu não sou capaz em C ++! É claro que ser capaz de executar meu experimento seria ótimo, mas acho que a falta de conhecimento é o que realmente está me irritando. Como posso conseguir pelo menos o mesmo desempenho que no Matlab? Qualquer tipo de solução é bem-vindo. Quero dizer, qualquer biblioteca externa (livre se possível), loop desenrolando coisas, coisas de modelo, intruções de SSE (eu sei que elas existem), cache de coisas. Como eu disse, meu principal objetivo é aumentar o meu conhecimento para poder codificar como esse pensa com um desempenho mais rápido.

desde já, obrigado

EDIT: mais código sugerido por David Hammen. Eu lancei as matrizes para int antes de fazer qualquer cálculo. Aqui está o código:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

Todo o processo é agora 0,6 e os loops de fundição no início são 0,001 segundos. Talvez eu tenha feito algo errado?

EDIT2: Qualquer coisa sobre Eigen? Quando procuro por bibliotecas externas, elas sempre falam sobre Eigen e sua velocidade. Eu fiz algo errado? Aqui um código simples usando Eigen que mostra que não é tão rápido. Talvez eu esteja faltando alguma configuração ou alguma bandeira, ou ...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

Este código é de cerca de 0,9 segundos.

questionAnswers(3)

yourAnswerToTheQuestion