Как синхронизировать pthreads менеджера / работника без объединения?
Я знаком с многопоточностью и успешно разработал много многопоточных программ на Java и Objective-C. Но я не смог достичь следующего в C, используя pthreads без использования соединения из основного потока:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_OF_THREADS 2
struct thread_data {
int start;
int end;
int *arr;
};
void print(int *ints, int n);
void *processArray(void *args);
int main(int argc, const char * argv[])
{
int numOfInts = 10;
int *ints = malloc(numOfInts * sizeof(int));
for (int i = 0; i < numOfInts; i++) {
ints[i] = i;
}
print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pthread_t threads[NUM_OF_THREADS];
struct thread_data thread_data[NUM_OF_THREADS];
// these vars are used to calculate the index ranges for each thread
int remainingWork = numOfInts, amountOfWork;
int startRange, endRange = -1;
for (int i = 0; i < NUM_OF_THREADS; i++) {
amountOfWork = remainingWork / (NUM_OF_THREADS - i);
startRange = endRange + 1;
endRange = startRange + amountOfWork - 1;
thread_data[i].arr = ints;
thread_data[i].start = startRange;
thread_data[i].end = endRange;
pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
remainingWork -= amountOfWork;
}
// 1. Signal to the threads to start working
// 2. Wait for them to finish
print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
free(ints);
return 0;
}
void *processArray(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
// 1. Wait for a signal to start from the main thread
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// 2. Signal to the main thread that you're done
pthread_exit(NULL);
}
void print(int *ints, int n)
{
printf("[");
for (int i = 0; i < n; i++) {
printf("%d", ints[i]);
if (i+1 != n)
printf(", ");
}
printf("]\n");
}
Я хотел бы добиться следующего в приведенном выше коде:
В основном ():
Signal to the threads to start working. Wait for the background threads to finish.В processArray ():
Wait for a signal to start from the main thread Signal to the main thread that you're doneЯ не хочу использовать соединение в главном потоке, потому что внастоящее приложениеосновной поток будет создавать потоки один раз, а затем он будет сигнализировать фоновым потокам о работе много раз, и я не могу позволить основному потоку продолжаться, пока все фоновые потоки не закончили работать. вprocessArray
функцию, я поставлю бесконечный цикл следующим образом:
void *processArray(void *args)
{
struct thread_data *data = (struct thread_data *)args;
while (1)
{
// 1. Wait for a signal to start from the main thread
int *arr = data->arr;
int start = data->start;
int end = data->end;
// Process
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// 2. Signal to the main thread that you're done
}
pthread_exit(NULL);
}
Обратите внимание, что я новичок в C и API posix, поэтому извините, если я упустил что-то очевидное. Но я действительно много чего пробовал, начиная с мьютекса, массива семафоров и их комбинации, но безуспешно. Я думаю, что условная переменная может помочь, но я не могу понять, как ее можно использовать.
Спасибо за ваше время.
Problem Solved:
Большое спасибо, ребята! Я наконец смог заставить это работать безопасно и без использования соединения, следуя вашим советам. Хотя решение несколько уродливо, оно выполняет свою работу и выигрыш в производительности того стоит (как вы увидите ниже). Для всех, кто интересуется, это симуляция реального приложения, над которым я работаю, в котором основной поток постоянно отдает работу фоновым потокам:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_OF_THREADS 5
struct thread_data {
int id;
int start;
int end;
int *arr;
};
pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER;
int currentlyIdle;
pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER;
int workReady;
pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
int currentlyWorking;
pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER;
int canFinish;
void print(int *ints, int n);
void *processArray(void *args);
int validateResult(int *ints, int num, int start);
int main(int argc, const char * argv[])
{
int numOfInts = 10;
int *ints = malloc(numOfInts * sizeof(int));
for (int i = 0; i < numOfInts; i++) {
ints[i] = i;
}
// print(ints, numOfInts);
pthread_t threads[NUM_OF_THREADS];
struct thread_data thread_data[NUM_OF_THREADS];
workReady = 0;
canFinish = 0;
currentlyIdle = 0;
currentlyWorking = 0;
// these vars are used to calculate the index ranges for each thread
int remainingWork = numOfInts, amountOfWork;
int startRange, endRange = -1;
// Create the threads and give each one its data struct.
for (int i = 0; i < NUM_OF_THREADS; i++) {
amountOfWork = remainingWork / (NUM_OF_THREADS - i);
startRange = endRange + 1;
endRange = startRange + amountOfWork - 1;
thread_data[i].id = i;
thread_data[i].arr = ints;
thread_data[i].start = startRange;
thread_data[i].end = endRange;
pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
remainingWork -= amountOfWork;
}
int loops = 1111111;
int expectedStartingValue = ints[0] + loops; // used to validate the results
// The elements in ints[] should be incremented by 1 in each loop
while (loops-- != 0) {
// Make sure all of them are ready
pthread_mutex_lock(¤tlyIdleMutex);
while (currentlyIdle != NUM_OF_THREADS) {
pthread_cond_wait(¤tlyIdleCond, ¤tlyIdleMutex);
}
pthread_mutex_unlock(¤tlyIdleMutex);
// All threads are now blocked; it's safe to not lock the mutex.
// Prevent them from finishing before authorized.
canFinish = 0;
// Reset the number of currentlyWorking threads
currentlyWorking = NUM_OF_THREADS;
// Signal to the threads to start
pthread_mutex_lock(&workReadyMutex);
workReady = 1;
pthread_cond_broadcast(&workReadyCond );
pthread_mutex_unlock(&workReadyMutex);
// Wait for them to finish
pthread_mutex_lock(¤tlyWorkingMutex);
while (currentlyWorking != 0) {
pthread_cond_wait(¤tlyWorkingCond, ¤tlyWorkingMutex);
}
pthread_mutex_unlock(¤tlyWorkingMutex);
// The threads are now waiting for permission to finish
// Prevent them from starting again
workReady = 0;
currentlyIdle = 0;
// Allow them to finish
pthread_mutex_lock(&canFinishMutex);
canFinish = 1;
pthread_cond_broadcast(&canFinishCond);
pthread_mutex_unlock(&canFinishMutex);
}
// print(ints, numOfInts);
if (validateResult(ints, numOfInts, expectedStartingValue)) {
printf("Result correct.\n");
}
else {
printf("Result invalid.\n");
}
// clean up
for (int i = 0; i < NUM_OF_THREADS; i++) {
pthread_cancel(threads[i]);
}
free(ints);
return 0;
}
void *processArray(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
while (1) {
// Set yourself as idle and signal to the main thread, when all threads are idle main will start
pthread_mutex_lock(¤tlyIdleMutex);
currentlyIdle++;
pthread_cond_signal(¤tlyIdleCond);
pthread_mutex_unlock(¤tlyIdleMutex);
// wait for work from main
pthread_mutex_lock(&workReadyMutex);
while (!workReady) {
pthread_cond_wait(&workReadyCond , &workReadyMutex);
}
pthread_mutex_unlock(&workReadyMutex);
// Do the work
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// mark yourself as finished and signal to main
pthread_mutex_lock(¤tlyWorkingMutex);
currentlyWorking--;
pthread_cond_signal(¤tlyWorkingCond);
pthread_mutex_unlock(¤tlyWorkingMutex);
// Wait for permission to finish
pthread_mutex_lock(&canFinishMutex);
while (!canFinish) {
pthread_cond_wait(&canFinishCond , &canFinishMutex);
}
pthread_mutex_unlock(&canFinishMutex);
}
pthread_exit(NULL);
}
int validateResult(int *ints, int n, int start)
{
int tmp = start;
for (int i = 0; i < n; i++, tmp++) {
if (ints[i] != tmp) {
return 0;
}
}
return 1;
}
void print(int *ints, int n)
{
printf("[");
for (int i = 0; i < n; i++) {
printf("%d", ints[i]);
if (i+1 != n)
printf(", ");
}
printf("]\n");
}
Я не уверен, что еслиpthread_cancel
достаточно для очистки! Что касается барьера, он был бы очень полезен, если бы он не был ограничен некоторыми ОС, как упомянуто@Jeremy.
Benchmarks:
Я хотел убедиться, что эти многочисленные условия на самом деле не замедляют алгоритм, поэтому я настроил этот тест для сравнения двух решений:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/resource.h>
#define NUM_OF_THREADS 5
struct thread_data {
int start;
int end;
int *arr;
};
pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t currentlyIdleCond = PTHREAD_COND_INITIALIZER;
int currentlyIdle;
pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t workReadyCond = PTHREAD_COND_INITIALIZER;
int workReady;
pthread_cond_t currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
int currentlyWorking;
pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t canFinishCond = PTHREAD_COND_INITIALIZER;
int canFinish;
void *processArrayMutex(void *args);
void *processArrayJoin(void *args);
double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops);
double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops);
int main(int argc, const char * argv[])
{
int numOfInts = 10;
int *join_ints = malloc(numOfInts * sizeof(int));
int *mutex_ints = malloc(numOfInts * sizeof(int));
for (int i = 0; i < numOfInts; i++) {
join_ints[i] = i;
mutex_ints[i] = i;
}
pthread_t join_threads[NUM_OF_THREADS];
pthread_t mutex_threads[NUM_OF_THREADS];
struct thread_data join_thread_data[NUM_OF_THREADS];
struct thread_data mutex_thread_data[NUM_OF_THREADS];
workReady = 0;
canFinish = 0;
currentlyIdle = 0;
currentlyWorking = 0;
int remainingWork = numOfInts, amountOfWork;
int startRange, endRange = -1;
for (int i = 0; i < NUM_OF_THREADS; i++) {
amountOfWork = remainingWork / (NUM_OF_THREADS - i);
startRange = endRange + 1;
endRange = startRange + amountOfWork - 1;
join_thread_data[i].arr = join_ints;
join_thread_data[i].start = startRange;
join_thread_data[i].end = endRange;
mutex_thread_data[i].arr = mutex_ints;
mutex_thread_data[i].start = startRange;
mutex_thread_data[i].end = endRange;
pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]);
remainingWork -= amountOfWork;
}
int numOfBenchmarkTests = 100;
int numberOfLoopsPerTest= 1000;
double join_sum = 0.0, mutex_sum = 0.0;
for (int i = 0; i < numOfBenchmarkTests; i++)
{
double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest);
double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest);
join_sum += joinTime;
mutex_sum+= mutexTime;
}
double join_avg = join_sum / numOfBenchmarkTests;
double mutex_avg= mutex_sum / numOfBenchmarkTests;
printf("Join average : %f\n", join_avg);
printf("Mutex average: %f\n", mutex_avg);
double diff = join_avg - mutex_avg;
if (diff > 0.0)
printf("Mutex is %.0f%% faster.\n", 100 * diff / join_avg);
else if (diff < 0.0)
printf("Join is %.0f%% faster.\n", 100 * diff / mutex_avg);
else
printf("Both have the same performance.");
free(join_ints);
free(mutex_ints);
return 0;
}
// From https://stackoverflow.com/a/2349941/408286
double get_time()
{
struct timeval t;
struct timezone tzp;
gettimeofday(&t, &tzp);
return t.tv_sec + t.tv_usec*1e-6;
}
double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops)
{
double start = get_time();
int loops = num_loops;
while (loops-- != 0) {
// Make sure all of them are ready
pthread_mutex_lock(¤tlyIdleMutex);
while (currentlyIdle != NUM_OF_THREADS) {
pthread_cond_wait(¤tlyIdleCond, ¤tlyIdleMutex);
}
pthread_mutex_unlock(¤tlyIdleMutex);
// All threads are now blocked; it's safe to not lock the mutex.
// Prevent them from finishing before authorized.
canFinish = 0;
// Reset the number of currentlyWorking threads
currentlyWorking = NUM_OF_THREADS;
// Signal to the threads to start
pthread_mutex_lock(&workReadyMutex);
workReady = 1;
pthread_cond_broadcast(&workReadyCond );
pthread_mutex_unlock(&workReadyMutex);
// Wait for them to finish
pthread_mutex_lock(¤tlyWorkingMutex);
while (currentlyWorking != 0) {
pthread_cond_wait(¤tlyWorkingCond, ¤tlyWorkingMutex);
}
pthread_mutex_unlock(¤tlyWorkingMutex);
// The threads are now waiting for permission to finish
// Prevent them from starting again
workReady = 0;
currentlyIdle = 0;
// Allow them to finish
pthread_mutex_lock(&canFinishMutex);
canFinish = 1;
pthread_cond_broadcast(&canFinishCond);
pthread_mutex_unlock(&canFinishMutex);
}
return get_time() - start;
}
double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops)
{
double start = get_time();
int loops = num_loops;
while (loops-- != 0) {
// create them
for (int i = 0; i < NUM_OF_THREADS; i++) {
pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]);
}
// wait
for (int i = 0; i < NUM_OF_THREADS; i++) {
pthread_join(threads[i], NULL);
}
}
return get_time() - start;
}
void *processArrayMutex(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
while (1) {
// Set yourself as idle and signal to the main thread, when all threads are idle main will start
pthread_mutex_lock(¤tlyIdleMutex);
currentlyIdle++;
pthread_cond_signal(¤tlyIdleCond);
pthread_mutex_unlock(¤tlyIdleMutex);
// wait for work from main
pthread_mutex_lock(&workReadyMutex);
while (!workReady) {
pthread_cond_wait(&workReadyCond , &workReadyMutex);
}
pthread_mutex_unlock(&workReadyMutex);
// Do the work
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
// mark yourself as finished and signal to main
pthread_mutex_lock(¤tlyWorkingMutex);
currentlyWorking--;
pthread_cond_signal(¤tlyWorkingCond);
pthread_mutex_unlock(¤tlyWorkingMutex);
// Wait for permission to finish
pthread_mutex_lock(&canFinishMutex);
while (!canFinish) {
pthread_cond_wait(&canFinishCond , &canFinishMutex);
}
pthread_mutex_unlock(&canFinishMutex);
}
pthread_exit(NULL);
}
void *processArrayJoin(void *args)
{
struct thread_data *data = (struct thread_data *)args;
int *arr = data->arr;
int start = data->start;
int end = data->end;
// Do the work
for (int i = start; i <= end; i++) {
arr[i] = arr[i] + 1;
}
pthread_exit(NULL);
}
И вывод:
Join average : 0.153074
Mutex average: 0.071588
Mutex is 53% faster.
Еще раз спасибо Я очень ценю вашу помощь!