Implementación de bloqueo rápido utilizando tuberías solamente -- ampo con concurrency campo con locking campo con lock-free campo con c11 camp codereview Relacionados El problema

Fast lock implementation using pipes only


4
vote

problema

Español

Por diversión quería crear una implementación de bloqueo que fuera tan rápido como uno usando futexes, pero que usaban tuberías usadas en su lugar y que las canteras de cola en el espacio de usuario en lugar de en el kernel. Para la cola, el código utiliza una versión optimizada del algoritmo libre de bloqueo de algoritmos simples, rápidos y prácticos, sin bloqueo y bloqueo de colas concurrentes en http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html . Sin embargo, no estoy seguro de que todas las optimizaciones que haya hecho son correctas. También hice algunas implementaciones tradicionales para la comparación.

La interfaz de bloqueo es:

   /*   * Copyright 2017 Steven Stewart-Gallus   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License.   * You may obtain a copy of the License at   *   *     http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or   * implied.  See the License for the specific language governing   * permissions and limitations under the License.   */  #ifndef LOCK_H  #define LOCK_H   struct lock;   struct lock *lock_create(void);  void lock_acquire(struct lock *lock);  void lock_release(struct lock *lock);   #endif   

El programa de prueba es:

   /*   * Copyright 2017 Steven Stewart-Gallus   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License.   * You may obtain a copy of the License at   *   *     http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or   * implied.  See the License for the specific language governing   * permissions and limitations under the License.   */  #define _GNU_SOURCE 1   #include <errno.h>  #include <pthread.h>  #include <stdio.h>   #include "lock.h"   #define PRODUCER_LOOP 1000000U  #define NUM_THREADS 10U   static void *contend(void * arg);   int main()  {     struct lock *the_lock = lock_create();      pthread_t threads[NUM_THREADS];     for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {         pthread_create(&threads[ii], NULL, contend, the_lock);     }      for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {         void *retval;         pthread_join(threads[ii], &retval);     }     return 0;  }   

La implementación de bloqueo es:

   /*   * Copyright 2017 Steven Stewart-Gallus   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License.   * You may obtain a copy of the License at   *   *     http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or   * implied.  See the License for the specific language governing   * permissions and limitations under the License.   */  #define _GNU_SOURCE 1   #include "lock.h"   #include <errno.h>  #include <fcntl.h>  #include <pthread.h>  #include <stdatomic.h>  #include <stdbool.h>  #include <stdio.h>  #include <stdint.h>  #include <stdlib.h>  #include <sys/poll.h>  #include <sys/eventfd.h>  #include <xmmintrin.h>  #include <unistd.h>   #if defined DO_FUTEX || defined PARTIAL_FUTEX  #include <linux/futex.h>  #include <sys/syscall.h>  #endif   #define NOTIFIER_FD_SPIN_LOOPS 40U  #define FUTEX_SPIN_LOOPS 40U  #define NOTIFIER_FUTEX_SPIN_LOOPS 20U  #define NOOP_CONTEND_CYCLES 4U  #define PAUSE_CONTEND_CYCLES 40U   struct node;   #define ALIGN_TO_CACHE _Alignas (16)   #if defined RAW_EVENTFD  struct event {     int fd;  };  #elif defined DO_FUTEX  struct event {     ALIGN_TO_CACHE _Atomic(bool) spinning;     ALIGN_TO_CACHE _Atomic(int) triggered;  };  #else  struct event {     ALIGN_TO_CACHE _Atomic(bool) triggered;     ALIGN_TO_CACHE _Atomic(struct node *) head;     ALIGN_TO_CACHE _Atomic(struct node *) tail;     ALIGN_TO_CACHE _Atomic(uint64_t) head_contention;     ALIGN_TO_CACHE _Atomic(uint64_t) tail_contention;  };  #endif  static void event_init (struct event *event);  static void event_wait (struct event *event);  static void event_signal (struct event *event);   #if defined NATIVE  struct lock {     pthread_mutex_t mutex;  };  #else  struct lock {     atomic_flag locked;     struct event when_unlocked;  };  #endif   #if defined NATIVE  struct lock *lock_create(void)  {     struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);     if (!lock)         abort();     pthread_mutex_init (&lock->mutex, 0);     return lock;  }  void lock_acquire(struct lock *lock)  {     pthread_mutex_lock (&lock->mutex);  }   void lock_release(struct lock *lock)  {     pthread_mutex_unlock (&lock->mutex);  }  #else  struct lock *lock_create(void)  {     struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);     if (!lock)         abort();     atomic_flag_clear_explicit(&lock->locked, memory_order_relaxed);     event_init (&lock->when_unlocked);     return lock;  }  void lock_acquire(struct lock *lock)  {     for (;;) {         if (!atomic_flag_test_and_set_explicit (&lock->locked, memory_order_acquire))             break;          event_wait (&lock->when_unlocked);     }  }   void lock_release(struct lock *lock)  {     atomic_flag_clear_explicit (&lock->locked, memory_order_release);     event_signal (&lock->when_unlocked);  }  #endif   #if defined RAW_EVENTFD  static void event_init (struct event *event) {     if (-1 == (event->fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)))         abort();  }   static void event_wait (struct event *event) {     {         struct pollfd fds[1U] = {{ .fd = event->fd,                        .events = POLLIN}};         if (-1 == poll(fds, 1U, -1)) {             abort();         }     }      for (;;) {         uint64_t xx;         if (-1 == read(event->fd, &xx, sizeof xx)) {             if (EAGAIN == errno)                 break;             abort();         }     }  }   static void event_signal (struct event *event) {     static uint64_t const one = 1U;     if (-1 == write(event->fd, &one, sizeof one)) {         if (errno != EAGAIN)             abort();     }  }  #elif defined DO_FUTEX  static void event_init (struct event *event) {     event->triggered = false;     event->spinning = false;  }   static void event_wait (struct event *event) {     for (;;) {         atomic_store_explicit(&event->spinning, true, memory_order_release);          atomic_thread_fence(memory_order_acquire);          bool got_triggered = false;         for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {             got_triggered |= atomic_exchange_explicit(&event->triggered, false, memory_order_relaxed);             if (got_triggered)                 break;             _mm_pause();         }         atomic_thread_fence(memory_order_acq_rel);         atomic_store_explicit(&event->spinning, false, memory_order_release);         if (got_triggered)             break;         if (atomic_exchange_explicit(&event->triggered, false, memory_order_acq_rel))             break;          syscall(__NR_futex, &event->triggered,                FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);     }  }   static void event_signal (struct event *event) {     /* Already signaled */     bool already_signaled = atomic_exchange_explicit (&event->triggered, 1, memory_order_acq_rel);     bool still_spinning = atomic_load_explicit(&event->spinning, memory_order_acquire);      if (already_signaled || still_spinning)         return;      syscall(__NR_futex, &event->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);  }   #else  struct notifier;   struct node {     ALIGN_TO_CACHE struct notifier * trigger;     ALIGN_TO_CACHE _Atomic(struct node *) next;  };   static ALIGN_TO_CACHE _Atomic(uint64_t) free_list_contention = 0U;  static ALIGN_TO_CACHE _Atomic(struct node *) free_list = NULL;  static ALIGN_TO_CACHE _Atomic(uint64_t) notifier_free_list_contention = 0U;  static ALIGN_TO_CACHE _Atomic(struct notifier *) notifier_free_list = NULL;   /* All used once so inline */  static inline void enqueue (struct event *event, struct notifier *notifier);  static inline struct notifier * dequeue (struct event *event);   static inline struct node * allocate (void);  static inline void deallocate (struct node *node);   static inline struct notifier * allocate_notifier (void);  static inline void deallocate_notifier (struct notifier *node);   static inline void notifier_signal(struct notifier *notifier);  static inline void notifier_wait(struct notifier *notifier);   /* Very small so inline */  static inline void on_failure(_Atomic(uint64_t) *contention);  static inline void on_success(_Atomic(uint64_t) *contention);   static void event_init (struct event *event) {     struct node *n = aligned_alloc(_Alignof (struct node), sizeof *n);     if (!n)         abort();     n->next = NULL;     event->head = n;     event->tail = n;     event->head_contention = 0U;     event->tail_contention = 0U;  }   static void event_wait (struct event *event) {     if (atomic_exchange_explicit (&event->triggered, false, memory_order_acq_rel))         return;      struct notifier *notifier = allocate_notifier();      enqueue (event, notifier);      notifier_wait(notifier);      deallocate_notifier (notifier);  }   static void event_signal (struct event *event) {     atomic_store_explicit (&event->triggered, true, memory_order_release);      struct notifier *notifier = dequeue (event);     if (!notifier)         return;      notifier_signal(notifier);  }   static void *from(void *node)  {     return (void *)((uintptr_t)node & 0xFFFFFFFFFFFFU);  }   static uint32_t tag(void *node)  {     return (uintptr_t)node >> 48U;  }   static void *to(void *node, uint32_t tag)  {     return (void *)((uintptr_t)node | (uint64_t)tag << 48U);  }   void enqueue (struct event *event, struct notifier *trigger)  {     struct node *tail;     struct node *tail_again;     struct node *next;      struct node *node = allocate ();     node->trigger = trigger;     node->next = NULL;      for (;;) {         for (;;) {             tail = atomic_load_explicit (&event->tail, memory_order_acquire);             next = atomic_load_explicit (&((struct node*)from (tail))->next, memory_order_acquire);             tail_again = atomic_load_explicit (&event->tail, memory_order_acquire);             if (tail == tail_again)                 break;             on_failure(&event->tail_contention);         }         on_success(&event->tail_contention);          if (!from (next)) {             if (atomic_compare_exchange_strong                 (&((struct node*)from (tail))->next,                  &next,                  to (node, tag (next) + 1U)))                 break;         } else {             atomic_compare_exchange_strong (&event->tail,                          &tail,                          to (from (next), tag (tail) + 1U));         }         on_failure(&event->tail_contention);     }     on_success(&event->tail_contention);     atomic_compare_exchange_strong (&event->tail,                  &tail,                  to (node, tag (tail) + 1U));  }   static struct notifier * dequeue (struct event *event)  {     struct node *head;     struct node *head_again;     struct node *tail;     struct node *next;     struct node *dequeued;     struct notifier *trigger;      for (;;) {         for (;;) {             head = atomic_load_explicit (&event->head, memory_order_acquire);             tail = atomic_load_explicit (&event->tail, memory_order_acquire);             next = atomic_load_explicit (&((struct node *)from (head))->next,                 memory_order_acquire);             head_again = atomic_load_explicit (&event->head, memory_order_acquire);             if (head == head_again)                 break;             on_failure(&event->head_contention);         }         on_success(&event->head_contention);          if (from (head) == from (tail)) {             if (!from (next)) {                 dequeued = NULL;                 trigger = NULL;                 break;             }             atomic_compare_exchange_strong                 (&event->tail,                  &tail,                  to (from (next), tag (tail) + 1U));         } else {             trigger = ((struct node *)from (next))->trigger;             if (atomic_compare_exchange_strong (&event->head,                              &head,                              to (from (next), tag (head) + 1U))) {                 dequeued = from (head);                 break;             }         }         on_failure(&event->head_contention);     }     on_success(&event->head_contention);      if (dequeued) {         deallocate (dequeued);     }     return trigger;  }   struct node * allocate (void)  {     struct node *head;     struct node *next;     struct node *n;      for (;;) {         head = atomic_load_explicit (&free_list, memory_order_relaxed);         n = from (head);         atomic_thread_fence (memory_order_acquire);         if (!n) {             on_success(&free_list_contention);             n = aligned_alloc(_Alignof (struct node), sizeof *n);             if (!n)                 abort();             n->next = NULL;             n->trigger = NULL;             return n;         }         next = atomic_load_explicit (&n->next, memory_order_acquire);         if (atomic_compare_exchange_weak_explicit(&free_list,                         &head,                               to (from (next), tag (head) + 1U),                               memory_order_acq_rel,                               memory_order_acquire))             break;         on_failure(&free_list_contention);     }     on_success(&free_list_contention);     n->next = NULL;     return n;  }   static void deallocate (struct node *node)  {     struct node *head;      for (;;) {         head = atomic_load_explicit (&free_list, memory_order_relaxed);         node->next = to (from (head), 0);         atomic_thread_fence(memory_order_acquire);         if (atomic_compare_exchange_weak_explicit (&free_list,                                &head,                                to (node, tag (head) + 1U),                                memory_order_acq_rel,                                memory_order_acquire))             break;         on_failure(&free_list_contention);     }     on_success(&free_list_contention);  }   #if defined PARTIAL_FUTEX  struct notifier {     ALIGN_TO_CACHE _Atomic(bool) spinning;     ALIGN_TO_CACHE _Atomic(int) triggered;     ALIGN_TO_CACHE _Atomic(struct notifier *) next;  };  #else  struct notifier {     int fds[2U];     ALIGN_TO_CACHE _Atomic(bool) spinning;     ALIGN_TO_CACHE _Atomic(bool) triggered;     ALIGN_TO_CACHE _Atomic(struct notifier *) next;  };  #endif   struct notifier * allocate_notifier (void)  {     struct notifier *head;     struct notifier *next;     struct notifier *n;      for (;;) {         head = atomic_load_explicit (&notifier_free_list, memory_order_relaxed);         n = from (head);         atomic_thread_fence (memory_order_acquire);         if (!n) {             n = aligned_alloc(_Alignof (struct notifier), sizeof *n);             if (!n)                 abort();   #if !defined PARTIAL_FUTEX  #if defined DO_EVENTFD             int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);             if (-1 == fd)                 abort();             n->fds[0U] = fd;             n->fds[1U] = fd;  #else             if (-1 == pipe2(n->fds, O_CLOEXEC | O_NONBLOCK))                 abort();  #endif  #endif              n->spinning = false;             n->triggered = false;             n->next = NULL;             return n;         }         next = atomic_load_explicit (&n->next, memory_order_acquire);         if (atomic_compare_exchange_weak_explicit(&notifier_free_list,                         &head,                               to (from (next), tag (head) + 1U),                               memory_order_acq_rel,                               memory_order_acquire))             break;         on_failure(&notifier_free_list_contention);     }     on_success(&notifier_free_list_contention);      n->next = NULL;     n->triggered = false;     return n;  }   static void deallocate_notifier (struct notifier *node)  {     struct notifier *head;      node->triggered = false;     node->next = NULL;      for (;;) {         head = atomic_load_explicit (&notifier_free_list, memory_order_relaxed);         node->next = to (from (head), 0);         atomic_thread_fence(memory_order_acquire);         if (atomic_compare_exchange_weak_explicit (&notifier_free_list,                           &head,                           to (node, tag (head) + 1U),                           memory_order_acq_rel,                           memory_order_acquire))             break;         on_failure(&notifier_free_list_contention);     }     on_success(&notifier_free_list_contention);  }   #if defined PARTIAL_FUTEX  static inline void notifier_signal(struct notifier *notifier)  {     bool aleady_signaled = atomic_exchange_explicit(&notifier->triggered, true, memory_order_acq_rel);     bool still_spinning = atomic_load_explicit(&notifier->spinning, memory_order_acquire);      if (aleady_signaled || still_spinning)         return;      syscall(__NR_futex, &notifier->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);  }   static inline void notifier_wait(struct notifier *notifier)  {     for (;;) {         atomic_store_explicit(&notifier->spinning, true, memory_order_release);          atomic_thread_fence(memory_order_acquire);          bool got_triggered = false;         for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {             got_triggered |= atomic_exchange_explicit(&notifier->triggered, false, memory_order_relaxed);             if (got_triggered)                 break;             _mm_pause();         }         atomic_thread_fence(memory_order_acq_rel);         atomic_store_explicit(&notifier->spinning, false, memory_order_release);         if (got_triggered)             break;         if (atomic_exchange_explicit(&notifier->triggered, false, memory_order_acq_rel))             break;          syscall(__NR_futex, &notifier->triggered,                FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);     }  }  #else  static inline void notifier_signal(struct notifier *notifier)  {     bool aleady_signaled = atomic_exchange_explicit(&notifier->triggered, true, memory_order_acq_rel);     bool still_spinning = atomic_load_explicit(&notifier->spinning, memory_order_acquire);      if (aleady_signaled || still_spinning)         return;      {         static uint64_t const one = 1U;         if (-1 == write(notifier->fds[1U], &one, sizeof one)) {             if (errno != EAGAIN)                 abort();         }     }  }   static inline void notifier_wait(struct notifier *notifier)  {     for (;;) {         atomic_store_explicit(&notifier->spinning, true, memory_order_release);          atomic_thread_fence(memory_order_acquire);          bool got_triggered = false;         for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {             got_triggered |= atomic_exchange_explicit(&notifier->triggered, false, memory_order_relaxed);             if (got_triggered)                 break;             _mm_pause();         }         atomic_thread_fence(memory_order_acq_rel);         atomic_store_explicit(&notifier->spinning, false, memory_order_release);         if (got_triggered)             break;         if (atomic_exchange_explicit(&notifier->triggered, false, memory_order_acq_rel))             break;          {             struct pollfd fds[1U] = {{ .fd = notifier->fds[0U],                            .events = POLLIN}};             if (-1 == poll(fds, 1U, -1)) {                 abort();             }         }          for (;;) {             uint64_t xx;             if (-1 == read(notifier->fds[0U], &xx, sizeof xx)) {                 if (EAGAIN == errno)                     break;                 abort();             }         }     }  }  #endif   static void on_failure(_Atomic(uint64_t) *contention)  {     uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);     uint64_t next_count = contention_count + 1U;     atomic_store_explicit(contention,             contention_count < UINT64_MAX ?              next_count : contention_count,              memory_order_relaxed);      if (contention_count < NOOP_CONTEND_CYCLES) {         /* do nothing */     } else if (contention_count < PAUSE_CONTEND_CYCLES) {         _mm_pause();     } else {         sched_yield();     }  }   static void on_success(_Atomic(uint64_t) *contention)  {     uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);     uint64_t next_count = contention_count - 1U;     atomic_store_explicit(contention,             contention_count > 0U ?              next_count : contention_count,              memory_order_relaxed);  }  #endif   static void *contend(void * arg)  {     struct lock *lock = arg;     for (size_t ii = 0U; ii < PRODUCER_LOOP; ++ii) {         lock_acquire(lock);         /* Do nothing */         lock_release(lock);     }     return 0;  }    
Original en ingles

For fun I wanted to create a lock implementation that was about as fast as one using futexes but that used used pipes instead and that queues waiters in user space instead of in-kernel. For the queue the code uses an optimized version of the lock-free algorithm from Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms at http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html . I am not sure all the optimizations I have made are correct however. I also made some traditional implementations for comparison.

The lock interface is:

 /*   * Copyright 2017 Steven Stewart-Gallus   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License.   * You may obtain a copy of the License at   *   *     http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or   * implied.  See the License for the specific language governing   * permissions and limitations under the License.   */  #ifndef LOCK_H  #define LOCK_H   struct lock;   struct lock *lock_create(void);  void lock_acquire(struct lock *lock);  void lock_release(struct lock *lock);   #endif 

The test program is:

 /*   * Copyright 2017 Steven Stewart-Gallus   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License.   * You may obtain a copy of the License at   *   *     http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or   * implied.  See the License for the specific language governing   * permissions and limitations under the License.   */  #define _GNU_SOURCE 1   #include <errno.h>  #include <pthread.h>  #include <stdio.h>   #include "lock.h"   #define PRODUCER_LOOP 1000000U  #define NUM_THREADS 10U   static void *contend(void * arg);   int main()  {     struct lock *the_lock = lock_create();      pthread_t threads[NUM_THREADS];     for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {         pthread_create(&threads[ii], NULL, contend, the_lock);     }      for (size_t ii = 0U; ii < NUM_THREADS; ++ii) {         void *retval;         pthread_join(threads[ii], &retval);     }     return 0;  } 

The lock implementation is:

 /*   * Copyright 2017 Steven Stewart-Gallus   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License.   * You may obtain a copy of the License at   *   *     http://www.apache.org/licenses/LICENSE-2.0   *   * Unless required by applicable law or agreed to in writing, software   * distributed under the License is distributed on an "AS IS" BASIS,   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or   * implied.  See the License for the specific language governing   * permissions and limitations under the License.   */  #define _GNU_SOURCE 1   #include "lock.h"   #include <errno.h>  #include <fcntl.h>  #include <pthread.h>  #include <stdatomic.h>  #include <stdbool.h>  #include <stdio.h>  #include <stdint.h>  #include <stdlib.h>  #include <sys/poll.h>  #include <sys/eventfd.h>  #include <xmmintrin.h>  #include <unistd.h>   #if defined DO_FUTEX || defined PARTIAL_FUTEX  #include <linux/futex.h>  #include <sys/syscall.h>  #endif   #define NOTIFIER_FD_SPIN_LOOPS 40U  #define FUTEX_SPIN_LOOPS 40U  #define NOTIFIER_FUTEX_SPIN_LOOPS 20U  #define NOOP_CONTEND_CYCLES 4U  #define PAUSE_CONTEND_CYCLES 40U   struct node;   #define ALIGN_TO_CACHE _Alignas (16)   #if defined RAW_EVENTFD  struct event {     int fd;  };  #elif defined DO_FUTEX  struct event {     ALIGN_TO_CACHE _Atomic(bool) spinning;     ALIGN_TO_CACHE _Atomic(int) triggered;  };  #else  struct event {     ALIGN_TO_CACHE _Atomic(bool) triggered;     ALIGN_TO_CACHE _Atomic(struct node *) head;     ALIGN_TO_CACHE _Atomic(struct node *) tail;     ALIGN_TO_CACHE _Atomic(uint64_t) head_contention;     ALIGN_TO_CACHE _Atomic(uint64_t) tail_contention;  };  #endif  static void event_init (struct event *event);  static void event_wait (struct event *event);  static void event_signal (struct event *event);   #if defined NATIVE  struct lock {     pthread_mutex_t mutex;  };  #else  struct lock {     atomic_flag locked;     struct event when_unlocked;  };  #endif   #if defined NATIVE  struct lock *lock_create(void)  {     struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);     if (!lock)         abort();     pthread_mutex_init (&lock->mutex, 0);     return lock;  }  void lock_acquire(struct lock *lock)  {     pthread_mutex_lock (&lock->mutex);  }   void lock_release(struct lock *lock)  {     pthread_mutex_unlock (&lock->mutex);  }  #else  struct lock *lock_create(void)  {     struct lock *lock = aligned_alloc(_Alignof(struct lock), sizeof *lock);     if (!lock)         abort();     atomic_flag_clear_explicit(&lock->locked, memory_order_relaxed);     event_init (&lock->when_unlocked);     return lock;  }  void lock_acquire(struct lock *lock)  {     for (;;) {         if (!atomic_flag_test_and_set_explicit (&lock->locked, memory_order_acquire))             break;          event_wait (&lock->when_unlocked);     }  }   void lock_release(struct lock *lock)  {     atomic_flag_clear_explicit (&lock->locked, memory_order_release);     event_signal (&lock->when_unlocked);  }  #endif   #if defined RAW_EVENTFD  static void event_init (struct event *event) {     if (-1 == (event->fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK)))         abort();  }   static void event_wait (struct event *event) {     {         struct pollfd fds[1U] = {{ .fd = event->fd,                        .events = POLLIN}};         if (-1 == poll(fds, 1U, -1)) {             abort();         }     }      for (;;) {         uint64_t xx;         if (-1 == read(event->fd, &xx, sizeof xx)) {             if (EAGAIN == errno)                 break;             abort();         }     }  }   static void event_signal (struct event *event) {     static uint64_t const one = 1U;     if (-1 == write(event->fd, &one, sizeof one)) {         if (errno != EAGAIN)             abort();     }  }  #elif defined DO_FUTEX  static void event_init (struct event *event) {     event->triggered = false;     event->spinning = false;  }   static void event_wait (struct event *event) {     for (;;) {         atomic_store_explicit(&event->spinning, true, memory_order_release);          atomic_thread_fence(memory_order_acquire);          bool got_triggered = false;         for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {             got_triggered |= atomic_exchange_explicit(&event->triggered, false, memory_order_relaxed);             if (got_triggered)                 break;             _mm_pause();         }         atomic_thread_fence(memory_order_acq_rel);         atomic_store_explicit(&event->spinning, false, memory_order_release);         if (got_triggered)             break;         if (atomic_exchange_explicit(&event->triggered, false, memory_order_acq_rel))             break;          syscall(__NR_futex, &event->triggered,                FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);     }  }   static void event_signal (struct event *event) {     /* Already signaled */     bool already_signaled = atomic_exchange_explicit (&event->triggered, 1, memory_order_acq_rel);     bool still_spinning = atomic_load_explicit(&event->spinning, memory_order_acquire);      if (already_signaled || still_spinning)         return;      syscall(__NR_futex, &event->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);  }   #else  struct notifier;   struct node {     ALIGN_TO_CACHE struct notifier * trigger;     ALIGN_TO_CACHE _Atomic(struct node *) next;  };   static ALIGN_TO_CACHE _Atomic(uint64_t) free_list_contention = 0U;  static ALIGN_TO_CACHE _Atomic(struct node *) free_list = NULL;  static ALIGN_TO_CACHE _Atomic(uint64_t) notifier_free_list_contention = 0U;  static ALIGN_TO_CACHE _Atomic(struct notifier *) notifier_free_list = NULL;   /* All used once so inline */  static inline void enqueue (struct event *event, struct notifier *notifier);  static inline struct notifier * dequeue (struct event *event);   static inline struct node * allocate (void);  static inline void deallocate (struct node *node);   static inline struct notifier * allocate_notifier (void);  static inline void deallocate_notifier (struct notifier *node);   static inline void notifier_signal(struct notifier *notifier);  static inline void notifier_wait(struct notifier *notifier);   /* Very small so inline */  static inline void on_failure(_Atomic(uint64_t) *contention);  static inline void on_success(_Atomic(uint64_t) *contention);   static void event_init (struct event *event) {     struct node *n = aligned_alloc(_Alignof (struct node), sizeof *n);     if (!n)         abort();     n->next = NULL;     event->head = n;     event->tail = n;     event->head_contention = 0U;     event->tail_contention = 0U;  }   static void event_wait (struct event *event) {     if (atomic_exchange_explicit (&event->triggered, false, memory_order_acq_rel))         return;      struct notifier *notifier = allocate_notifier();      enqueue (event, notifier);      notifier_wait(notifier);      deallocate_notifier (notifier);  }   static void event_signal (struct event *event) {     atomic_store_explicit (&event->triggered, true, memory_order_release);      struct notifier *notifier = dequeue (event);     if (!notifier)         return;      notifier_signal(notifier);  }   static void *from(void *node)  {     return (void *)((uintptr_t)node & 0xFFFFFFFFFFFFU);  }   static uint32_t tag(void *node)  {     return (uintptr_t)node >> 48U;  }   static void *to(void *node, uint32_t tag)  {     return (void *)((uintptr_t)node | (uint64_t)tag << 48U);  }   void enqueue (struct event *event, struct notifier *trigger)  {     struct node *tail;     struct node *tail_again;     struct node *next;      struct node *node = allocate ();     node->trigger = trigger;     node->next = NULL;      for (;;) {         for (;;) {             tail = atomic_load_explicit (&event->tail, memory_order_acquire);             next = atomic_load_explicit (&((struct node*)from (tail))->next, memory_order_acquire);             tail_again = atomic_load_explicit (&event->tail, memory_order_acquire);             if (tail == tail_again)                 break;             on_failure(&event->tail_contention);         }         on_success(&event->tail_contention);          if (!from (next)) {             if (atomic_compare_exchange_strong                 (&((struct node*)from (tail))->next,                  &next,                  to (node, tag (next) + 1U)))                 break;         } else {             atomic_compare_exchange_strong (&event->tail,                          &tail,                          to (from (next), tag (tail) + 1U));         }         on_failure(&event->tail_contention);     }     on_success(&event->tail_contention);     atomic_compare_exchange_strong (&event->tail,                  &tail,                  to (node, tag (tail) + 1U));  }   static struct notifier * dequeue (struct event *event)  {     struct node *head;     struct node *head_again;     struct node *tail;     struct node *next;     struct node *dequeued;     struct notifier *trigger;      for (;;) {         for (;;) {             head = atomic_load_explicit (&event->head, memory_order_acquire);             tail = atomic_load_explicit (&event->tail, memory_order_acquire);             next = atomic_load_explicit (&((struct node *)from (head))->next,                 memory_order_acquire);             head_again = atomic_load_explicit (&event->head, memory_order_acquire);             if (head == head_again)                 break;             on_failure(&event->head_contention);         }         on_success(&event->head_contention);          if (from (head) == from (tail)) {             if (!from (next)) {                 dequeued = NULL;                 trigger = NULL;                 break;             }             atomic_compare_exchange_strong                 (&event->tail,                  &tail,                  to (from (next), tag (tail) + 1U));         } else {             trigger = ((struct node *)from (next))->trigger;             if (atomic_compare_exchange_strong (&event->head,                              &head,                              to (from (next), tag (head) + 1U))) {                 dequeued = from (head);                 break;             }         }         on_failure(&event->head_contention);     }     on_success(&event->head_contention);      if (dequeued) {         deallocate (dequeued);     }     return trigger;  }   struct node * allocate (void)  {     struct node *head;     struct node *next;     struct node *n;      for (;;) {         head = atomic_load_explicit (&free_list, memory_order_relaxed);         n = from (head);         atomic_thread_fence (memory_order_acquire);         if (!n) {             on_success(&free_list_contention);             n = aligned_alloc(_Alignof (struct node), sizeof *n);             if (!n)                 abort();             n->next = NULL;             n->trigger = NULL;             return n;         }         next = atomic_load_explicit (&n->next, memory_order_acquire);         if (atomic_compare_exchange_weak_explicit(&free_list,                         &head,                               to (from (next), tag (head) + 1U),                               memory_order_acq_rel,                               memory_order_acquire))             break;         on_failure(&free_list_contention);     }     on_success(&free_list_contention);     n->next = NULL;     return n;  }   static void deallocate (struct node *node)  {     struct node *head;      for (;;) {         head = atomic_load_explicit (&free_list, memory_order_relaxed);         node->next = to (from (head), 0);         atomic_thread_fence(memory_order_acquire);         if (atomic_compare_exchange_weak_explicit (&free_list,                                &head,                                to (node, tag (head) + 1U),                                memory_order_acq_rel,                                memory_order_acquire))             break;         on_failure(&free_list_contention);     }     on_success(&free_list_contention);  }   #if defined PARTIAL_FUTEX  struct notifier {     ALIGN_TO_CACHE _Atomic(bool) spinning;     ALIGN_TO_CACHE _Atomic(int) triggered;     ALIGN_TO_CACHE _Atomic(struct notifier *) next;  };  #else  struct notifier {     int fds[2U];     ALIGN_TO_CACHE _Atomic(bool) spinning;     ALIGN_TO_CACHE _Atomic(bool) triggered;     ALIGN_TO_CACHE _Atomic(struct notifier *) next;  };  #endif   struct notifier * allocate_notifier (void)  {     struct notifier *head;     struct notifier *next;     struct notifier *n;      for (;;) {         head = atomic_load_explicit (&notifier_free_list, memory_order_relaxed);         n = from (head);         atomic_thread_fence (memory_order_acquire);         if (!n) {             n = aligned_alloc(_Alignof (struct notifier), sizeof *n);             if (!n)                 abort();   #if !defined PARTIAL_FUTEX  #if defined DO_EVENTFD             int fd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);             if (-1 == fd)                 abort();             n->fds[0U] = fd;             n->fds[1U] = fd;  #else             if (-1 == pipe2(n->fds, O_CLOEXEC | O_NONBLOCK))                 abort();  #endif  #endif              n->spinning = false;             n->triggered = false;             n->next = NULL;             return n;         }         next = atomic_load_explicit (&n->next, memory_order_acquire);         if (atomic_compare_exchange_weak_explicit(&notifier_free_list,                         &head,                               to (from (next), tag (head) + 1U),                               memory_order_acq_rel,                               memory_order_acquire))             break;         on_failure(&notifier_free_list_contention);     }     on_success(&notifier_free_list_contention);      n->next = NULL;     n->triggered = false;     return n;  }   static void deallocate_notifier (struct notifier *node)  {     struct notifier *head;      node->triggered = false;     node->next = NULL;      for (;;) {         head = atomic_load_explicit (&notifier_free_list, memory_order_relaxed);         node->next = to (from (head), 0);         atomic_thread_fence(memory_order_acquire);         if (atomic_compare_exchange_weak_explicit (&notifier_free_list,                           &head,                           to (node, tag (head) + 1U),                           memory_order_acq_rel,                           memory_order_acquire))             break;         on_failure(&notifier_free_list_contention);     }     on_success(&notifier_free_list_contention);  }   #if defined PARTIAL_FUTEX  static inline void notifier_signal(struct notifier *notifier)  {     bool aleady_signaled = atomic_exchange_explicit(&notifier->triggered, true, memory_order_acq_rel);     bool still_spinning = atomic_load_explicit(&notifier->spinning, memory_order_acquire);      if (aleady_signaled || still_spinning)         return;      syscall(__NR_futex, &notifier->triggered, FUTEX_WAKE_PRIVATE, 1, NULL, NULL, 0);  }   static inline void notifier_wait(struct notifier *notifier)  {     for (;;) {         atomic_store_explicit(&notifier->spinning, true, memory_order_release);          atomic_thread_fence(memory_order_acquire);          bool got_triggered = false;         for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {             got_triggered |= atomic_exchange_explicit(&notifier->triggered, false, memory_order_relaxed);             if (got_triggered)                 break;             _mm_pause();         }         atomic_thread_fence(memory_order_acq_rel);         atomic_store_explicit(&notifier->spinning, false, memory_order_release);         if (got_triggered)             break;         if (atomic_exchange_explicit(&notifier->triggered, false, memory_order_acq_rel))             break;          syscall(__NR_futex, &notifier->triggered,                FUTEX_WAIT_PRIVATE, 0, NULL, NULL, 0);     }  }  #else  static inline void notifier_signal(struct notifier *notifier)  {     bool aleady_signaled = atomic_exchange_explicit(&notifier->triggered, true, memory_order_acq_rel);     bool still_spinning = atomic_load_explicit(&notifier->spinning, memory_order_acquire);      if (aleady_signaled || still_spinning)         return;      {         static uint64_t const one = 1U;         if (-1 == write(notifier->fds[1U], &one, sizeof one)) {             if (errno != EAGAIN)                 abort();         }     }  }   static inline void notifier_wait(struct notifier *notifier)  {     for (;;) {         atomic_store_explicit(&notifier->spinning, true, memory_order_release);          atomic_thread_fence(memory_order_acquire);          bool got_triggered = false;         for (size_t ii = 0U; ii < NOTIFIER_FD_SPIN_LOOPS; ++ii) {             got_triggered |= atomic_exchange_explicit(&notifier->triggered, false, memory_order_relaxed);             if (got_triggered)                 break;             _mm_pause();         }         atomic_thread_fence(memory_order_acq_rel);         atomic_store_explicit(&notifier->spinning, false, memory_order_release);         if (got_triggered)             break;         if (atomic_exchange_explicit(&notifier->triggered, false, memory_order_acq_rel))             break;          {             struct pollfd fds[1U] = {{ .fd = notifier->fds[0U],                            .events = POLLIN}};             if (-1 == poll(fds, 1U, -1)) {                 abort();             }         }          for (;;) {             uint64_t xx;             if (-1 == read(notifier->fds[0U], &xx, sizeof xx)) {                 if (EAGAIN == errno)                     break;                 abort();             }         }     }  }  #endif   static void on_failure(_Atomic(uint64_t) *contention)  {     uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);     uint64_t next_count = contention_count + 1U;     atomic_store_explicit(contention,             contention_count < UINT64_MAX ?              next_count : contention_count,              memory_order_relaxed);      if (contention_count < NOOP_CONTEND_CYCLES) {         /* do nothing */     } else if (contention_count < PAUSE_CONTEND_CYCLES) {         _mm_pause();     } else {         sched_yield();     }  }   static void on_success(_Atomic(uint64_t) *contention)  {     uint64_t contention_count = atomic_load_explicit(contention, memory_order_relaxed);     uint64_t next_count = contention_count - 1U;     atomic_store_explicit(contention,             contention_count > 0U ?              next_count : contention_count,              memory_order_relaxed);  }  #endif   static void *contend(void * arg)  {     struct lock *lock = arg;     for (size_t ii = 0U; ii < PRODUCER_LOOP; ++ii) {         lock_acquire(lock);         /* Do nothing */         lock_release(lock);     }     return 0;  }  
              

Lista de respuestas


Relacionados problema

3  Pila Allocator y Características de alineación de C11  ( Stack allocator and c11s alignment features ) 
Estoy tratando de escribir un alojador de pila personalizado para un juego en el que estoy trabajando. Para lidiar con la alineación de la memoria, estoy usan...

4  Un enfoque genérico para las listas doblemente vinculadas  ( A generic approach to doubly linked lists ) 
He escrito una pequeña implementación para listas doblemente vinculadas. Mientras que realmente tenía la intención de ser usado solo en los proyectos posterio...

15  Plantilla Vector Structura en C11  ( Template vector struct in c11 ) 
Esta revisión será un poco complicada: estaba tratando de implementar una plantilla 9988776655544334 Struct in C11. Por supuesto, las plantillas realmente n...

5  Implementación de Memset_s basado en la norma C11  ( Implementation of memset s based on c11 standard ) 
El siguiente código intenta implementar Hello Soonog, this is a simple template which should allow you to see the bXsics how this might work some dXy. 7 bas...

3  Parser de opción de línea de comando simple  ( Simple command line option parser ) 
Como parte de un proyecto donde estaré reescribiendo los coreques GNU más comunes para fines de práctica, escribí una simple línea de comandos analizador de o...

3  Calculadora de máscara de subred básica + un poco extra  ( Basic subnet mask calculator a little extra ) 
Para mi clase de redes, aprendimos acerca de las máscaras de subretar y subred. Decidí escribir un poco de "calculadora" (un término generoso) que cuando se...

7  Convierte la cadena UTF8 a la cadena UTF32 en C  ( Convert utf8 string to utf32 string in c ) 
Estoy haciendo alguna programación recreativa en C (después de pasar algún tiempo en C ++, pero profesionalmente usando solo PHP / JavaScript). Escribí un c...

3  Cliente IRC con soporte TLS y SASL  ( Irc client with tls and sasl support ) 
Escribí este pequeño programa para ... No lo sé. Pero se conecta a los servidores IRC utilizando OpenSSL y puede iniciar sesión a través de SASL. El código ne...

2  Lock-Free Stathicated-Async-Interrump-Safe Multi-Consumer Double Buffer  ( Lock free statically allocated async interrupt safe multi consumer double buffer ) 
Tengo un solo sistema integrado roscado con interrupciones anidadas. Una interrupción de un escritor actualizará periódicamente alguna estructura de datos ...

8  Una interfaz de cadena dinámica en C  ( A dynamic string interface in c ) 
Así que he escrito lo que dice el título. Obviamente, estoy buscando velocidad, seguridad y facilidad de uso. Para lograr la proporción perfecta, he realiza...




© 2022 respuesta.top Reservados todos los derechos. Centro de preguntas y respuestas reservados todos los derechos