múltiples hilos capaces de obtener rebaño al mismo tiempo

Tenía la impresión de queflock (2) es seguro para subprocesos, recientemente, me encontré con el caso en el código, donde varios subprocesos pueden obtener un bloqueo en el mismo archivo, todos sincronizados con el uso de la obtención de un bloqueo exclusivo utilizando el c api flock. El proceso 25554 es una aplicación multiproceso que tiene 20 subprocesos, el número de subprocesos que se bloquean en el mismo archivo varía cuando se produce el punto muerto. La aplicación multiproceso testEvent es el escritor del archivo, donde fue empujado el lector del archivo. Desafortunadamente, ellsof no imprime el valor de LWP, así que no puedo encontrar cuáles son los hilos que sostienen el bloqueo. Cuando ocurre la condición mencionada a continuación, tanto el proceso como los subprocesos se atascan en la llamada de lote como se muestra enpstack ostrace llame al pid 25569 y 25554. Cualquier sugerencia sobre cómo superar esto en RHEL 4.x.

Una cosa que quería actualizar es que el lote no se comporta mal todo el tiempo, cuando la tasa de transmisión de los mensajes es más de 2 mbps solo entonces me meto en este problema de bloqueo con la bandada, debajo de esa tasa de transmisión todo es archivo. He guardado lanum_threads = 20,size_of_msg = 1000bytes constante y solo varió el número de mensajes tx por segundo, comenzando de 10 mensajes a 100 mensajes, que es 20 * 1000 * 100 = 2 mbps, cuando aumento el número de mensajes a 150, entonces ocurre el problema del lote.

Solo quería preguntar cuál es su opinión sobre flockfile c api.

 sudo lsof filename.txt
    COMMAND       PID     USER     FD       TYPE     DEVICE     SIZE   NODE       NAME
    push         25569    root     11u       REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     27uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     28uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     29uW      REG      253.4      1079   49266853   filename.txt
    testEvent    25554    root     30uW      REG      253.4      1079   49266853   filename.txt

El programa de prueba multiproceso que llamará awrite_data_lib_func función lib.

void* sendMessage(void *arg)  {

int* numOfMessagesPerSecond = (int*) arg;
std::cout <<" Executing p thread id " << pthread_self() << std::endl;
 while(!terminateTest) {
   Record *er1 = Record::create();
   er1.setDate("some data");

   for(int i = 0 ; i <=*numOfMessagesPerSecond ; i++){
     ec = _write_data_lib_func(*er1);
     if( ec != SUCCESS) {
       std::cout << "write was not successful" << std::endl;

     }

   }
   delete er1;
   sleep(1);
 }

 return NULL;

El método anterior se llamará en los subprocesos en la función principal de la prueba.

for (i=0; i<_numThreads ; ++i) {
  rc = pthread_create(&threads[i], NULL, sendMessage, (void *)&_num_msgs);
  assert(0 == rc);

}

Aquí está la fuente del escritor / lector, debido a razones de propiedad que no quería simplemente cortar y pegar, la fuente del escritor accederá a múltiples hilos en un proceso

int write_data_lib_func(Record * rec) {      
if(fd == -1 ) {  
    fd = open(fn,O_RDWR| O_CREAT | O_APPEND, 0666);
} 
if ( fd >= 0 ) {
   /* some code */ 

   if( flock(fd, LOCK_EX) < 0 ) {
     print "some error message";
   }
   else { 
    if( maxfilesize) {
      off_t len = lseek ( fd,0,SEEK_END);
      ...
      ... 
      ftruncate( fd,0);
      ...
      lseek(fd,0,SEEK_SET); 
   } /* end of max spool size */ 
   if( writev(fd,rec) < 0 ) {
     print "some error message" ; 
   }

   if(flock(fd,LOCK_UN) < 0 ) {
   print some error message; 
   } 

n el lado del lector, las cosas son un proceso demonio sin hilos.

int readData() {
    while(true) {
      if( fd == -1 ) {
         fd= open (filename,O_RDWR);
      }
      if( flock (fd, LOCK_EX) < 0 ) { 
        print "some error message"; 
        break; 
      } 
      if( n = read(fd,readBuf,readBufSize)) < 0 ) { 
        print "some error message" ;
        break;
      }  
      if( off < n ) { 
        if ( off <= 0 && n > 0 ) { 
          corrupt_file = true; 
        } 
        if ( lseek(fd, off-n, SEEK_CUR) < 0 ) { 
          print "some error message"; 
        } 
        if( corrupt_spool ) {  
          if (ftruncate(fd,0) < 0 ) { 
             print "some error message";
             break;
           }  
        }
      }
      if( flock(fd, LOCK_UN) < 0 ) 
       print some error message ;
      }  
   }     
}

Respuestas a la pregunta(4)

Su respuesta a la pregunta