Самый быстрый способ вычислить минимальное евклидово расстояние между двумя матрицами, содержащими векторы высокой размерности

Я начал похожий вопросдругая нить, но тогда я сосредоточился на том, как использовать OpenCV. Не сумев добиться того, чего я изначально хотел, я спрошу здесь именно то, что я хочу.

У меня есть две матрицы. Матрица a имеет размер 2782x128, а матрица b имеет размер 4000x128, оба значения без знака. Значения хранятся в одном массиве. Для каждого вектора в a мне нужен индекс вектора в b с ближайшим евклидовым расстоянием.

Хорошо, теперь мой код для достижения этой цели:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

Прилагаются файлы с образцами матриц.

матрица А matrixb

Я использую windows.h просто для вычисления потребляющего времени, поэтому, если вы хотите протестировать код на другой платформе, отличной от windows, просто измените заголовок windows.h и измените способ вычисления потребляющего времени.

Этот код в моем компьютере составляет около 0,5 секунд. Проблема в том, что у меня есть другой код в Matlab, который делает то же самое за 0,05 секунды. В моих экспериментах я получаю несколько матриц, таких как матрица а каждую секунду, поэтому 0,5 секунды - это слишком много.

Теперь код Matlab для расчета этого:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

Хорошо. Код Matlab использует это (x-a) ^ 2 = x ^ 2 + a ^ 2 - 2ab.

Поэтому моей следующей попыткой было сделать то же самое. Я удалил свой собственный код, чтобы выполнить те же вычисления, но это было примерно за 1,2 секунды.

Затем я попытался использовать разные внешние библиотеки. Первая попытка была Эйген:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

Этот Eigen-код стоит примерно 1.2 для одной строки, которая говорит: ab = a * b.transpose ();

Аналогичный код с использованием opencv был также использован, и стоимость ab = a * b.transpose (); было 0,65 секунды.

Итак, это действительно раздражает, что matlab может делать то же самое так быстро, а я не умею в C ++! Конечно, было бы здорово провести мой эксперимент, но я думаю, что недостаток знаний - это то, что действительно раздражает меня. Как мне достичь хотя бы той же производительности, что и в Matlab? Любой вид растворения приветствуется. Я имею в виду любую внешнюю библиотеку (бесплатную, если это возможно), циклическое развертывание, шаблоны, SSE-вторжения (я знаю, что они существуют), кэширование. Как я уже сказал, моя главная цель - расширить свои знания, чтобы код мог мыслить так и быстрее.

заранее спасибо

РЕДАКТИРОВАТЬ: больше кода, предложенного Дэвидом Хамменом. Я привел массивы к int, прежде чем делать какие-либо вычисления. Вот код:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

Весь процесс теперь составляет 0,6, а начальные циклы - 0,001 секунды. Может я что то не так сделал?

EDIT2: что-нибудь об Эйгене? Когда я ищу внешних библиотек, они всегда говорят об Эйгене и его скорости. Я сделал что-то не так? Вот простой код с использованием Eigen, который показывает, что это не так быстро. Может быть, мне не хватает какой-либо конфигурации или флаг, или ...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

Этот код составляет около 0,9 секунд.

Ответы на вопрос(3)

Ваш ответ на вопрос