Piscina de hilo simple en C ++ -- ++ campo con multithreading campo con thread-safety campo con synchronization campo con windows camp codereview Relacionados El problema

Simple thread pool in C++


6
vote

problema

Español

Escribí una simple piscina de hilo, que funciona bastante bien. Me gustaría ver su revisión de ello.

Una característica importante que necesitaba en la piscina es la capacidad de esperar hasta que se completen todas las tareas que envié a la piscina (para que más tarde pueda enviar otras tareas que dependen del resultado de las tareas anteriores).

Esta es la tarea básica de la que heredó, para enviar tareas:

  class Task { public :     virtual void ExecuteTask() = 0; };   

Este es el archivo .h del grupo de hilos:

  #include<time.h> #include <Windows.h> #include<list>  using namespace std;  class ThreadPool { public :     ThreadPool(int threadCount=0);     bool ScheduleTask(Task* task);     bool WaitForWorkToBeFinished(DWORD dwMilliseconds);     ~ThreadPool();  private :     static DWORD WINAPI ThreadStart(LPVOID threadParameters);     DWORD ThreadMain();      list<Task*> mTasks;     HANDLE* mThreads;     int mThreadCount;     CRITICAL_SECTION mCriticalSection;     CONDITION_VARIABLE mConditionVariable;     CONDITION_VARIABLE mConditionVariableTaskFinished;     int mNumberOfTasksNotFinished;     bool mConsum; };   

Este es el archivo CPP:

  #include "ThreadPool.h"  ThreadPool::ThreadPool(int threadCount) {     mThreadCount=threadCount;     mThreads = new HANDLE[mThreadCount]; //This handles will be used to wait upon them      InitializeCriticalSection(&mCriticalSection);     InitializeConditionVariable(&mConditionVariable);     InitializeConditionVariable(&mConditionVariableTaskFinished);     mConsum = true;     mNumberOfTasksNotFinished = 0;     for(int i = 0; i< mThreadCount;i++)         mThreads[i] = CreateThread(NULL, 0, ThreadStart, (void*)this, 0,NULL); }  bool ThreadPool::ScheduleTask(Task* task) {     EnterCriticalSection(&mCriticalSection);     mTasks.push_back(task); //This is a simple std::list     mNumberOfTasksNotFinished++;     LeaveCriticalSection(&mCriticalSection);      WakeConditionVariable(&mConditionVariable); //Waking up the threads so they will know there is a job to do     return true; }  DWORD WINAPI ThreadPool::ThreadStart(LPVOID threadParameters) //Just a static function to help me use member function as a thread {     ThreadPool* referenceToThis = (ThreadPool*)threadParameters;     return referenceToThis->ThreadMain(); }  DWORD ThreadPool::ThreadMain() //The main thread function {     do     {         Task* currentTask;         EnterCriticalSection(&mCriticalSection);          while(mTasks.size() ==0 && mConsum)             SleepConditionVariableCS(&mConditionVariable,&mCriticalSection, INFINITE);          if(!mConsum) //Destructor ordered on abort         {             LeaveCriticalSection(&mCriticalSection);             return 0;         }          //If we got here, we successfully aquired the lock and the list<Task> is not empty         currentTask = mTasks.front();         mTasks.pop_front();          LeaveCriticalSection(&mCriticalSection);          currentTask->ExecuteTask();         //delete currentTask;          EnterCriticalSection(&mCriticalSection);         mNumberOfTasksNotFinished--;         LeaveCriticalSection(&mCriticalSection);          WakeConditionVariable(&mConditionVariableTaskFinished);     }while(mConsum);      return 0; } //This function is very important, it gives the user the ability to send 10 tasks to the thread pool // then to wait till all the tasks completed, and give the next 10 which are dependand on the result of the previous ones. bool ThreadPool::WaitForWorkToBeFinished(DWORD dwMilliseconds) {     EnterCriticalSection(&mCriticalSection);     while(mNumberOfTasksNotFinished!=0)         SleepConditionVariableCS(&mConditionVariableTaskFinished,&mCriticalSection, INFINITE);     LeaveCriticalSection(&mCriticalSection);      return true; }  ThreadPool::~ThreadPool() {     mConsum = false;     WakeAllConditionVariable (&mConditionVariable);     WaitForMultipleObjects(mThreadCount,mThreads, true, INFINITE);     DeleteCriticalSection(&mCriticalSection); }   
Original en ingles

I wrote a simple thread pool, which works pretty well. I would like to see your review of it.

One important feature that I needed in the pool is the ability to wait until all the tasks that I sent to the pool are complete (so that later I could send other tasks that are dependent on the result of the previous tasks).

This is the basic task which you inherit from, to send tasks:

class Task { public :     virtual void ExecuteTask() = 0; }; 

This is the .h file of the Thread pool:

#include<time.h> #include <Windows.h> #include<list>  using namespace std;  class ThreadPool { public :     ThreadPool(int threadCount=0);     bool ScheduleTask(Task* task);     bool WaitForWorkToBeFinished(DWORD dwMilliseconds);     ~ThreadPool();  private :     static DWORD WINAPI ThreadStart(LPVOID threadParameters);     DWORD ThreadMain();      list<Task*> mTasks;     HANDLE* mThreads;     int mThreadCount;     CRITICAL_SECTION mCriticalSection;     CONDITION_VARIABLE mConditionVariable;     CONDITION_VARIABLE mConditionVariableTaskFinished;     int mNumberOfTasksNotFinished;     bool mConsum; }; 

This is the CPP file:

#include "ThreadPool.h"  ThreadPool::ThreadPool(int threadCount) {     mThreadCount=threadCount;     mThreads = new HANDLE[mThreadCount]; //This handles will be used to wait upon them      InitializeCriticalSection(&mCriticalSection);     InitializeConditionVariable(&mConditionVariable);     InitializeConditionVariable(&mConditionVariableTaskFinished);     mConsum = true;     mNumberOfTasksNotFinished = 0;     for(int i = 0; i< mThreadCount;i++)         mThreads[i] = CreateThread(NULL, 0, ThreadStart, (void*)this, 0,NULL); }  bool ThreadPool::ScheduleTask(Task* task) {     EnterCriticalSection(&mCriticalSection);     mTasks.push_back(task); //This is a simple std::list     mNumberOfTasksNotFinished++;     LeaveCriticalSection(&mCriticalSection);      WakeConditionVariable(&mConditionVariable); //Waking up the threads so they will know there is a job to do     return true; }  DWORD WINAPI ThreadPool::ThreadStart(LPVOID threadParameters) //Just a static function to help me use member function as a thread {     ThreadPool* referenceToThis = (ThreadPool*)threadParameters;     return referenceToThis->ThreadMain(); }  DWORD ThreadPool::ThreadMain() //The main thread function {     do     {         Task* currentTask;         EnterCriticalSection(&mCriticalSection);          while(mTasks.size() ==0 && mConsum)             SleepConditionVariableCS(&mConditionVariable,&mCriticalSection, INFINITE);          if(!mConsum) //Destructor ordered on abort         {             LeaveCriticalSection(&mCriticalSection);             return 0;         }          //If we got here, we successfully aquired the lock and the list<Task> is not empty         currentTask = mTasks.front();         mTasks.pop_front();          LeaveCriticalSection(&mCriticalSection);          currentTask->ExecuteTask();         //delete currentTask;          EnterCriticalSection(&mCriticalSection);         mNumberOfTasksNotFinished--;         LeaveCriticalSection(&mCriticalSection);          WakeConditionVariable(&mConditionVariableTaskFinished);     }while(mConsum);      return 0; } //This function is very important, it gives the user the ability to send 10 tasks to the thread pool // then to wait till all the tasks completed, and give the next 10 which are dependand on the result of the previous ones. bool ThreadPool::WaitForWorkToBeFinished(DWORD dwMilliseconds) {     EnterCriticalSection(&mCriticalSection);     while(mNumberOfTasksNotFinished!=0)         SleepConditionVariableCS(&mConditionVariableTaskFinished,&mCriticalSection, INFINITE);     LeaveCriticalSection(&mCriticalSection);      return true; }  ThreadPool::~ThreadPool() {     mConsum = false;     WakeAllConditionVariable (&mConditionVariable);     WaitForMultipleObjects(mThreadCount,mThreads, true, INFINITE);     DeleteCriticalSection(&mCriticalSection); } 
              
         
         

Lista de respuestas

7
 
vote
vote
La mejor respuesta
 

algunos puntos:

Su EnterCriticalSection / 99887766555443311 El código debe colocarse en un objeto RAII alcepado. Esto le ahorra de licencia explícitamente, hace que su código sea más pequeño, y evite un punto muerto si el código sincronizado se lanza.

Haz algo similar con todo lo que requiere la limpieza (como mThreads = new HANDLE[mThreadCount]; ).

La interfaz de su clase debe recibir las tareas por referencia y almacenarlas por puntero dentro. De esta manera, le queda claro para el código del cliente que la clase no toma la propiedad de las tareas que recibe. (consulte tercera edición a continuación).

Utilizar reinterpret_cast en lugar de moldes de estilo C (debido a que los casts de estilo C = malo, mmmkay?)

  using namespace std;   

no hagas esto. Restringe en gran medida qué código de cliente (quienquiera que incluye a su clase) puede escribir, y los errores generados en el código del cliente casi siempre son oscuros. Está mejor repitiendo que en los ámbitos locales (si debe) o (mejor aún), no usarlo (considere escribir using std::string; y similar, en los ámbitos que usa el tipo, o simplemente declarando 9988776655544335 ).

Editar: Si 9988776655544336 nunca devuelve False, ¿por qué devuelve BOOL? ¿No debería ser void ?

Segundo Editar : ConsDer usando un INT sin firmar para el número de subprocesos ( 9988777665544338 ).

Tercera edición ( Hola Loki :))

Con respecto a la propiedad de los hilos y la gestión de la vida útil del objeto (cómo implementar de forma segura y indicarla en la interfaz pública de la clase).

Considere este código de cliente:

  ThreadPool runner();  // bad bad code: don't call new with raw pointers like this in practice // but easy to write to make my point below. BackgroundTask *t = new BackgroundTask(); // class BackgroundTask: public Task                                           // defined elsewhere  runner.ScheduleTask(t);   

Lo que no sabe. Mirar este código: ¿Debería eliminar LeaveCriticalSection0 después de que se complete el Threadpool? Será LeaveCriticalSection1 ¡Elimine la tarea en sí? Si lo hace, 99887766555443312 apuntará a la memoria eliminada después de LeaveCriticalSection3 termina. Si no lo hace, deberá recordar verificar cuando el corredor haya terminado, y llame a eliminar explícitamente, cada vez que use el grupo de hilos.

Usted podría implementar el Threadpool a LeaveCriticalSection4 Todas las tareas después de la ejecución, pero luego, un programador emprendedor (tal vez usted mismo) podría escribir:

  LeaveCriticalSection5  

y esto bloqueará su programa cuando su Threadpool elimine la tarea.

Otra alternativa de código de cliente, este mucho peor:

  LeaveCriticalSection6  

La interfaz no indica la propiedad y lo hace interrumpir su codificación, para ver la implementación, así que sabe cómo escribir el código del cliente que no se filtra o se bloquea.

Soluciones:

Puede cambiar su interfaz para tomar un LeaveCriticalSection7 en su lugar. (Similar a la aprobación por puntero), el único inconveniente es que debe asegurarse de que las tareas que pase a SchedulETASK deben estar en alcance hasta que el Threadpool deja de ejecutar (en otras palabras, todavía puede escribir el agradable LeaveCriticalSection8 < / Código> Función).

Esto es mejor, ya que al menos no necesita preguntarse si eliminará la tarea (que pasará por puntero puede, o puede no sugerir).

Mejor aún, cambie su interfaz para tomar un STD :: Unique_ptr en su lugar:

  LeaveCriticalSection9  

Código del cliente:

  mThreads = new HANDLE[mThreadCount];0  

Dado que la función toma un IPLY_PTR ahora, ahora conoce Debe pasar un puntero y también sabe que también está pasando la propiedad.

 

A few points:

Your EnterCriticalSection/LeaveCriticalSection code should be placed into a scoped RAII object. This saves you from explicitly calling leave, makes your code smaller, and avoids a deadlock if the synchronized code throws.

Do something similar with everything that requires cleanup (like mThreads = new HANDLE[mThreadCount];).

The interface of your class should receive the tasks by reference and store them by pointer within. This way, you make it clear to client code that the class doesn't take ownership of the tasks it receives. (see Third Edit below).

Use reinterpret_cast instead of C-style casts (because C-style casts = bad, mmmkay?!)

using namespace std; 

Don't do this. It greatly restricts what client code (whoever includes your class) can write, and the errors generated in client code are almost always obscure. You're better off repeating that in local scopes (if you must) or (better still) not using it (consider writing using std::string; and similar, in the scopes you use the type, or simply declaring std::string x;).

Edit: If bool ThreadPool::ScheduleTask(Task* task) never returns false, why does it return bool? Shouldn't it be void?

Second Edit: Consder using an unsigned int for the number of threads (mThreadCount).

Third edit ( Hi Loki :) )

Regarding ownership of the threads and object lifetime management (how to implement safely and indicate it in the public interface of the class).

Consider this client code:

ThreadPool runner();  // bad bad code: don't call new with raw pointers like this in practice // but easy to write to make my point below. BackgroundTask *t = new BackgroundTask(); // class BackgroundTask: public Task                                           // defined elsewhere  runner.ScheduleTask(t); 

What you don't know looking at this code: should you delete t after the ThreadPool completes running? Will runner delete the task itself? If it does, t will point to deleted memory after runner finishes. If it doesn't, you will have to remember to check when runner has finished, and call delete explicitly, every single time you use the thread pool.

You could implement the ThreadPool to delete all tasks after execution, but then, an enterprising programmer (maybe yourself) could write:

ThreadPool runner(); BackgroundTask t(); runner.ScheduleTask(&t); // pass address of automatic storage object 

and this will crash your program when your ThreadPool deletes the task.

Another client code alternative, this one much much worse:

void crashy(ThreadPool& pool) {     BackgroundTask t;     pool.ScheduleTask(&t); } // t goes out of scope here and is destroyed but p still holds it's address  ThreadPool p; crashy(p); // some compilers tend to happily accept this (maybe with a warning) // p now has a pointer to a deleted BackgroundTask instance 

The interface doesn't indicate ownership and makes you interrupt your coding, to look at the implementation, just so you know how to write client code that doesn't leak or crash.

Solutions:

You could change your interface to take a Task& instead. (Similar to passing by pointer), the only drawback is that you need to make sure the tasks you pass to ScheduleTask must still be in scope until the ThreadPool stops running (in other words, you can still write the nice crashy function).

This is better, as at least you don't need to wonder if it will delete the task (which passing by pointer may, or may not suggest).

Better yet, change your interface to take a std::unique_ptr instead:

class ThreadPool { public :     bool ScheduleTask(std::unique_ptr<Task> task) // << std::unique_ptr by val.     {         EnterCriticalSection(&mCriticalSection);         mTasks.push_back( std::move(task) ); // << notice std::move         mNumberOfTasksNotFinished++;         LeaveCriticalSection(&mCriticalSection); // also notice non-raii unlock;                                                  // if push_back throws on                                                   // allocating new list node,                                                  // your whole app is deadlocked          WakeConditionVariable(&mConditionVariable);         return true;     }  private :      list<std::unique_ptr<Task>> mTasks; // << notice std::unique_ptr     ...  }; 

Client code:

ThreadPool runner(); std::unique_ptr<Task> p(new BackgroundTask()); runner.ScheduleTask(std::move(p)); // works, no lifetime management problems                                    // and no ambguity either 

Since the function takes a unique_ptr now, you now know you need to pass a pointer and you also know you are passing ownership as well.

 
 
       
       
2
 
vote

re. Los comentarios a esta pregunta, no veo ( En la documentación de la API de Win32 ) se requiere cualquier función para eliminar una variable de condición inicializada usando InitializeConditionVariable .

Pero, todos los asas de rosca que creó usando CreateThread debe cerrarse: mediante el uso de una llamada correspondiente a CloseHandle(mThreads[i]) en su ThreadPool::~ThreadPool() Destructor.


Una cosa más: su clase Task debe tener un destructor definido; VER POR EJEMPLO:

Guía # 4: Un destructor de clase base debe ser público y virtual, o protegido y no volurto.

 

Re. the comments to this question, I don't see (in the Win32 API documentation) any function required to delete a condition variable initialized using InitializeConditionVariable.

But, all the thread handles which you created using CreateThread should be closed: by using a corresponding call to CloseHandle(mThreads[i]) in your ThreadPool::~ThreadPool() destructor.


One more thing: your Task class should probably have a defined destructor; see for example:

Guideline #4: A base class destructor should be either public and virtual, or protected and nonvirtual.

 
 

Relacionados problema

3  Leyendo todos los contenidos de archivos a través de la Asamblea X64  ( Reading all file contents via x64 assembly ) 
He surgido con el siguiente fragmento mediante la construcción de las respuestas que se le da a mi Pregunta de StackOverflow . Solo tratando de obtener otros...

5  Calculadora Avanzada C  ( Advanced c calculator ) 
Me pidieron que creara una calculadora avanzada en C, lo llamo avanzado porque realmente maneja las reglas aritméticas. Se mencionó que no necesito manejar ex...

1  Exportando bolsas, listas de saltos y archivos LNK con PowerShell  ( Exporting shellbags jump lists and lnk files with powershell ) 
Soy nuevo en PowerShell (comenzó ayer) y se preguntó si hay una forma de exportar bolsas de cáscara, listas de saltos y archivos LNK de la Región de Región de...

5  Clase de socket para uso en servidor web  ( Socket class for use in web server ) 
He creado esta clase de socket para usar en mi aplicación Web Server. Tengo dos clases: una para el zócalo y el otro para el servidor web. import 'package:...

3  std :: string / std :: wtring plantilla wrapper para Win32 API  ( Stdstring stdwstring template wrapper for win32 api ) 
No he completado esto, pero quiero hacer mi propia biblioteca de plantillas para envolver la API de Win32 para que sea compatible con STD :: STRING / STD :: W...

4  Solucionador de ecuación cuadrática en C ++  ( Quadratic equation solver in c ) 
// ConsoleApplication1.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include <cstdlib> #include <iostream> #include <cm...

4  Obteniendo información del usuario de Windows del Active Directory  ( Getting windows user info from the active directory ) 
Tomé un poco de código de calderillas que he estado usando para determinar la información de un usuario de Windows activo para enviar correos electrónicos con...

9  Plataforma independiente donde la función  ( Platform independent whereis function ) 
Estoy tratando de escribir un paquete independiente de Python (& gt; = 3.6.5) que tiene un puñado de archivos de clase Java. Clase que necesitan compilarse us...

2  Clase de módulos DLL de Windows  ( Windows dll module class ) 
Estoy haciendo un proyecto donde uso DLL's como módulos o complementos de tiempo de ejecución. Para hacer esto, utilizo el código de Windows lock1 y lock2 ...

1  Consigue el nombre de OS en C ++  ( Get os name in c ) 
Esta es la función que codificé, obviamente se utiliza para obtener el nombre del sistema: Array#include?0 Esta función devuelve un puntero a un búfer d...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos