Самый быстрый способ вычислить минимальное евклидово расстояние между двумя матрицами, содержащими векторы высокой размерности
Я начал похожий вопросдругая нить, но тогда я сосредоточился на том, как использовать OpenCV. Не сумев добиться того, чего я изначально хотел, я спрошу здесь именно то, что я хочу.
У меня есть две матрицы. Матрица a имеет размер 2782x128, а матрица b имеет размер 4000x128, оба значения без знака. Значения хранятся в одном массиве. Для каждого вектора в a мне нужен индекс вектора в b с ближайшим евклидовым расстоянием.
Хорошо, теперь мой код для достижения этой цели:
#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"
using namespace std;
void main(int argc, char* argv[])
{
int a_size;
unsigned char* a = NULL;
read_matrix(&a, a_size,"matrixa");
int b_size;
unsigned char* b = NULL;
read_matrix(&b, b_size,"matrixb");
LARGE_INTEGER liStart;
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency( &liPerfFreq );
QueryPerformanceCounter( &liStart );
int* indexes = NULL;
min_distance_loop(&indexes, b, b_size, a, a_size);
QueryPerformanceCounter( &liEnd );
cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;
if (a)
delete[]a;
if (b)
delete[]b;
if (indexes)
delete[]indexes;
return;
}
void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
ofstream myfile;
float f;
FILE * pFile;
pFile = fopen (matrixPath,"r");
fscanf (pFile, "%d", &matrix_size);
*matrix = new unsigned char[matrix_size*128];
for (int i=0; i<matrix_size*128; ++i)
{
unsigned int matPtr;
fscanf (pFile, "%u", &matPtr);
matrix[i]=(unsigned char)matPtr;
}
fclose (pFile);
}
void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
const int descrSize = 128;
*indexes = (int*)malloc(a_size*sizeof(int));
int dataIndex=0;
int vocIndex=0;
int min_distance;
int distance;
int multiply;
unsigned char* dataPtr;
unsigned char* vocPtr;
for (int i=0; i<a_size; ++i)
{
min_distance = LONG_MAX;
for (int j=0; j<b_size; ++j)
{
distance=0;
dataPtr = &a[dataIndex];
vocPtr = &b[vocIndex];
for (int k=0; k<descrSize; ++k)
{
multiply = *dataPtr++-*vocPtr++;
distance += multiply*multiply;
// If the distance is greater than the previously calculated, exit
if (distance>min_distance)
break;
}
// if distance smaller
if (distance<min_distance)
{
min_distance = distance;
(*indexes)[i] = j;
}
vocIndex+=descrSize;
}
dataIndex+=descrSize;
vocIndex=0;
}
}
Прилагаются файлы с образцами матриц.
Я использую windows.h просто для вычисления потребляющего времени, поэтому, если вы хотите протестировать код на другой платформе, отличной от windows, просто измените заголовок windows.h и измените способ вычисления потребляющего времени.
Этот код в моем компьютере составляет около 0,5 секунд. Проблема в том, что у меня есть другой код в Matlab, который делает то же самое за 0,05 секунды. В моих экспериментах я получаю несколько матриц, таких как матрица а каждую секунду, поэтому 0,5 секунды - это слишком много.
Теперь код Matlab для расчета этого:
aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b';
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);
Хорошо. Код Matlab использует это (x-a) ^ 2 = x ^ 2 + a ^ 2 - 2ab.
Поэтому моей следующей попыткой было сделать то же самое. Я удалил свой собственный код, чтобы выполнить те же вычисления, но это было примерно за 1,2 секунды.
Затем я попытался использовать разные внешние библиотеки. Первая попытка была Эйген:
const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);
unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
for (int j=0; j<descrSize; ++j)
{
a(i,j)=(int)*dataPtr++;
}
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
for (int j=0; j<descrSize; ++j)
{
b(i,j)=(int)*vocPtr ++;
}
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();
int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
d.row(i).minCoeff(&index[i]);
}
Этот Eigen-код стоит примерно 1.2 для одной строки, которая говорит: ab = a * b.transpose ();
Аналогичный код с использованием opencv был также использован, и стоимость ab = a * b.transpose (); было 0,65 секунды.
Итак, это действительно раздражает, что matlab может делать то же самое так быстро, а я не умею в C ++! Конечно, было бы здорово провести мой эксперимент, но я думаю, что недостаток знаний - это то, что действительно раздражает меня. Как мне достичь хотя бы той же производительности, что и в Matlab? Любой вид растворения приветствуется. Я имею в виду любую внешнюю библиотеку (бесплатную, если это возможно), циклическое развертывание, шаблоны, SSE-вторжения (я знаю, что они существуют), кэширование. Как я уже сказал, моя главная цель - расширить свои знания, чтобы код мог мыслить так и быстрее.
заранее спасибо
РЕДАКТИРОВАТЬ: больше кода, предложенного Дэвидом Хамменом. Я привел массивы к int, прежде чем делать какие-либо вычисления. Вот код:
void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
const int descrSize = 128;
int* a_int;
int* b_int;
LARGE_INTEGER liStart;
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency( &liPerfFreq );
QueryPerformanceCounter( &liStart );
a_int = (int*)malloc(a_size*descrSize*sizeof(int));
b_int = (int*)malloc(b_size*descrSize*sizeof(int));
for(int i=0; i<descrSize*a_size; ++i)
a_int[i]=(int)a[i];
for(int i=0; i<descrSize*b_size; ++i)
b_int[i]=(int)b[i];
QueryPerformanceCounter( &liEnd );
cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;
*indexes = (int*)malloc(a_size*sizeof(int));
int dataIndex=0;
int vocIndex=0;
int min_distance;
int distance;
int multiply;
/*unsigned char* dataPtr;
unsigned char* vocPtr;*/
int* dataPtr;
int* vocPtr;
for (int i=0; i<a_size; ++i)
{
min_distance = LONG_MAX;
for (int j=0; j<b_size; ++j)
{
distance=0;
dataPtr = &a_int[dataIndex];
vocPtr = &b_int[vocIndex];
for (int k=0; k<descrSize; ++k)
{
multiply = *dataPtr++-*vocPtr++;
distance += multiply*multiply;
// If the distance is greater than the previously calculated, exit
if (distance>min_distance)
break;
}
// if distance smaller
if (distance<min_distance)
{
min_distance = distance;
(*indexes)[i] = j;
}
vocIndex+=descrSize;
}
dataIndex+=descrSize;
vocIndex=0;
}
}
Весь процесс теперь составляет 0,6, а начальные циклы - 0,001 секунды. Может я что то не так сделал?
EDIT2: что-нибудь об Эйгене? Когда я ищу внешних библиотек, они всегда говорят об Эйгене и его скорости. Я сделал что-то не так? Вот простой код с использованием Eigen, который показывает, что это не так быстро. Может быть, мне не хватает какой-либо конфигурации или флаг, или ...
MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;
Этот код составляет около 0,9 секунд.