Foreach paralelo con un nivel configurable de concurrencia -- # campo con multithreading campo con concurrency campo con locking camp codereview Relacionados El problema

Parallel foreach with configurable level of concurrency


4
vote

problema

Español

El propósito de este código es permitirme hacer un bucle de más de 100 artículos (hasta a la vez), realizando alguna acción en ellos, y luego devuelva solo una vez que todos los artículos se hayan procesado: < / p>

  /// <summary>Generic method to perform an action or set of actions /// in parallel on each item in a collection of items, returning /// only when all actions have been completed.</summary> /// <typeparam name="T">The element type</typeparam> /// <param name="elements">A collection of elements, each of which to /// perform the action on.</param> /// <param name="action">The action to perform on each element. The /// action should of course be thread safe.</param> /// <param name="MaxConcurrent">The maximum number of concurrent actions.</param> public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action) {     // Semaphore limiting the number of parallel requests     Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);     // Count of the number of remaining threads to be completed     int remaining = 0;     // Signal to notify the main thread when a worker is done     AutoResetEvent onComplete = new AutoResetEvent(false);      foreach (T element in elements)     {         Interlocked.Increment(ref remaining);         limit.WaitOne();         new Thread(() =>         {             try             {                 action(element);             }             catch (Exception ex)             {                 Console.WriteLine("Error performing concurrent action: " + ex);             }             finally             {                 Interlocked.Decrement(ref remaining);                 limit.Release();                 onComplete.Set();             }         }).Start();     }     // Wait for all requests to complete     while (remaining > 0)         onComplete.WaitOne(10); // Slightly better than Thread.Sleep(10) }   

Incluyo un tiempo de espera en el WaitOne()

Antes de verificar remaining nuevamente para proteger contra el caso raro donde el último hilo sobresaliente que restante "y luego se firma la finalización Entre la verificación del hilo principal "restante" y esperando la siguiente señal de finalización, lo que de lo contrario resultaría en el hilo principal que falta la última señal y bloqueo para siempre. Esto es más rápido que solo usar Thread.Sleep(10) porque tiene la posibilidad de volver inmediatamente después del último hilo se completa.

Objetivos:

  1. Asegurar la seguridad de los hilos: quiero asegurarme de que accidentalmente no volveré temprano (antes de que se hayan activado todos los elementos), y asegúrese de no quedarme en punto muerto o estancado.

  2. Agregue la menor cantidad de gastos generales como sea posible: minimizar la cantidad de tiempo que menos de MAX_CONCURRENT están ejecutando action y regresando lo antes posible después de la final < Código> action se ha realizado.

Original en ingles

The purpose of this code is to let me loop over 100 items (up to MAX_CONCURRENT at a time), performing some action on them, and then return only once all items have been processed:

/// <summary>Generic method to perform an action or set of actions /// in parallel on each item in a collection of items, returning /// only when all actions have been completed.</summary> /// <typeparam name="T">The element type</typeparam> /// <param name="elements">A collection of elements, each of which to /// perform the action on.</param> /// <param name="action">The action to perform on each element. The /// action should of course be thread safe.</param> /// <param name="MaxConcurrent">The maximum number of concurrent actions.</param> public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action) {     // Semaphore limiting the number of parallel requests     Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);     // Count of the number of remaining threads to be completed     int remaining = 0;     // Signal to notify the main thread when a worker is done     AutoResetEvent onComplete = new AutoResetEvent(false);      foreach (T element in elements)     {         Interlocked.Increment(ref remaining);         limit.WaitOne();         new Thread(() =>         {             try             {                 action(element);             }             catch (Exception ex)             {                 Console.WriteLine("Error performing concurrent action: " + ex);             }             finally             {                 Interlocked.Decrement(ref remaining);                 limit.Release();                 onComplete.Set();             }         }).Start();     }     // Wait for all requests to complete     while (remaining > 0)         onComplete.WaitOne(10); // Slightly better than Thread.Sleep(10) } 

I include a timeout on the WaitOne() before checking remaining again to protect against the rare case where the last outstanding thread decrements 'remaining' and then signals completion between the main thread checking 'remaining' and waiting for the next completion signal, which would otherwise result in the main thread missing the last signal and locking forever. This is faster than just using Thread.Sleep(10) because it has a chance to return immediately after the last thread completes.

Goals:

  1. Ensure thread safety - I want to be sure I won't accidentally return too early (before all elements have been acted on), and be sure that I don't become deadlocked or otherwise stuck.

  2. Add as little overhead as possible - minimizing amount of time that fewer than MAX_CONCURRENT threads are executing action, and returning as soon as possible after the final action has been performed.

           
       
       

Lista de respuestas

3
 
vote

He hecho uso de una señal generations655443360 generations1 y evita que la espera ocupada involucrada en el sondeo con el 99887766655443362 :

  generations3  
 

I've made use of a CountdownEvent signal to avoid the use of the remaining integer and avoid the busy waiting involved in polling it with the unreliable AutoResetEvent onComplete:

public static void PerformActionsInParallel<T>(IEnumerable<T> elements, Action<T> action) {     int threads = MaxConcurrent ?? DefaultMaxConcurrentRequests;     // Ensure elements is only enumerated once.     elements = elements as T[] ?? elements.ToArray();     // Semaphore limiting the number of parallel requests     Semaphore limit = new Semaphore(MAX_CONCURRENT, MAX_CONCURRENT);     // Count of the number of remaining threads to be completed     CountdownEvent remaining = new CountdownEvent(elements.Count());      foreach (T element in elements)     {         limit.WaitOne();         new Thread(() =>         {             try             {                 action(element);             }             catch (Exception ex)             {                 Console.WriteLine("Error performing concurrent action: " + ex);             }             finally             {                 remaining.Signal();                 limit.Release();             }         }).Start();     }     // Wait for all requests to complete     remaining.Wait(); } 
 
 
2
 
vote

Hay una cosa que me molesta particularmente: el último bloque while . Ese es un ejemplo de esperando ocupado y eso es algo que debe evitarse imo. Una posible solución a este problema podría ser almacenar los objetos 9988776665544331 a un List y una vez que haya creado todos los hilos que ejecuta un 9988776665544333 para cada hilo en dicha lista.

Dicho esto, sugeriría echar un vistazo a plinq y tpl < / a>.

Una última cosa: eliminaría el Console.WriteLine en el bloque también . Yo diría que la instrucción Console.WriteLine65544336 se debe usar solo en un método 9988776665544337 . Consulte aquí y aquí para maneras de manejar la excepción cuando se trabaja de una manera de ASYNC.

 

There's one thing that is particularly disturbing me: the last while block. That's an example of Busy waiting and that's something that should be avoided IMO. A possible solution to this problem could be to store the Thread objects to a List and once you have created all the threads you run a Thread.Join for each thread on such list.

That being said, I'd suggest to take a look at PLINQ and TPL.

One last thing: I'd remove the Console.WriteLine in the catch block also. I'd say that the Console.WriteLine instruction should be used only in a Main method. See here and here for ways to handle exception when working in an async way.

 
 
         
         

Relacionados problema

2  Función para bloquear un archivo usando Memcache, versión 2  ( Function to lock a file using memcache version 2 ) 
Basado en comentarios sugeridos para Función para bloquear un archivo usando Memcache < / a>, he modificado el código a import os import shutil import mem...

7  Esperando la conexión del servidor de juegos  ( Waiting for game server connection ) 
public boolean connectedOnGameServer = false; public final Object conGameServerMonitor = new Object(); public void connectedToGameServer() { synchronize...

1  Función para bloquear un archivo usando Memcache, versión 1  ( Function to lock a file using memcache version 1 ) 
Tengo una aplicación web donde enumera los archivos en la UI a muchos usuarios al mismo tiempo. Hay muchas personas que monitorean los archivos y los procesan...

2  Método de actualización de bloqueo de reentrantreadwritelock  ( Reentrantreadwritelock lock upgrade method ) 
Tengo una pregunta sobre la actualización de bloqueo. Específicamente lo que me molesta está entre readlock.unlock () y siguiendo a writelock.lock () ... Esto...

48  Safe-Safe y Lock-Free - Implementación de la cola  ( Thread safe and lock free queue implementation ) 
Estaba tratando de crear una implementación de colas de bloqueo en Java, principalmente para el aprendizaje personal. La cola debe ser general, lo que permite...

3  Simulando Memcachache Obtén y establece la condición de carrera y resolviéndola con Agregar  ( Simulating memcache get and set race condition and solving it with add ) 
Memcache get y set puede llegar a una condición de carrera si los usa juntos para lograr algo como el bloqueo porque no es atómico . Un simple memcache ...

8  C ++ Sección crítica con tiempo de espera  ( C critical section with timeout ) 
Nota: Mi implementación se basa en CodeProject Artículo de Vladislav Gelfer. Basado en valdok 's CÓDIGOS DE VALDOK , reescribí la clase de sección crí...

2  STD :: Lock Implementación en C con PTHEADS  ( Stdlock implementation in c with pthreads ) 
Me estropeé un poco con PTHEADS y necesitaba una alternativa a la función C ++ 11 std::lock (2 args son suficientes), y esto es lo que vino: void lock(pt...

19  Una clase de hilo-piscina / cola personalizada  ( A custom thread pool queue class ) 
Quería una clase que ejecuta cualquier cantidad de tareas, pero solo cierta cantidad al mismo tiempo (por ejemplo, para descargar varios contenidos de Interne...

6  Usando el temporizador con el trabajador de fondo para asegurarse de que el método Dowork se llame  ( Using timer with backgroundworker to ensure the dowork method is called ) 
Tengo una aplicación de formularios de Windows en la que un trabajador de fondo se llama una y otra vez. Necesito evitar el acceso concurrente del código en D...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos