diff --git a/src/thread/pthread_cond_timedwait.c b/src/thread/pthread_cond_timedwait.c index 2d192b0..136fa6a 100644 --- a/src/thread/pthread_cond_timedwait.c +++ b/src/thread/pthread_cond_timedwait.c @@ -42,12 +42,32 @@ static inline void lock(volatile int *l) } } +/* Avoid taking the lock if we know it isn't necessary. */ +static inline int lockRace(volatile int *l, int*volatile* notifier) +{ + int ret = 1; + if (!*notifier && (ret = a_cas(l, 0, 1))) { + a_cas(l, 1, 2); + do __wait(l, 0, 2, 1); + while (!*notifier && (ret = a_cas(l, 0, 2))); + } + return ret; +} + static inline void unlock(volatile int *l) { if (a_swap(l, 0)==2) __wake(l, 1, 1); } +static inline void unlockRace(volatile int *l, int known) +{ + int found = a_swap(l, 0); + known += (found == 2); + if (known > 0) + __wake(l, known, 1); +} + static inline void unlock_requeue(volatile int *l, volatile int *r, int w) { a_store(l, 0); @@ -56,6 +76,53 @@ static inline void unlock_requeue(volatile int *l, volatile int *r, int w) || __syscall(SYS_futex, l, FUTEX_REQUEUE, 0, 1, r); } +/* Splice the node out of the current list of c and notify a signaling + thread with whom there was contention. */ +static inline void leave(struct waiter* node) { + /* Access to cv object is valid because this waiter was not + * yet signaled and a new signal/broadcast cannot return + * after seeing a LEAVING waiter without getting notified + * via the futex notify below. */ + pthread_cond_t *c = node->cond; + int locked = lockRace(&c->_c_lock, &node->notify); + /* node->notify will only be changed while node is + * still in the list.*/ + int * ref = node->notify; + + if (!ref) { + if (node->prev) node->prev->next = node->next; + else if (c->_c_head == node) c->_c_head = node->next; + if (node->next) node->next->prev = node->prev; + else if (c->_c_tail == node) c->_c_tail = node->prev; + + unlock(&c->_c_lock); + } else { + /* A race occurred with a signaling or broadcasting thread. The call + * to unlockRace, there, ensures that sufficiently many waiters on _c_lock + * are woken up. */ + if (!locked) unlock(&c->_c_lock); + + /* There will be at most one signaling or broadcasting thread waiting on ref[0]. + * Make sure that we don't waste a futex wake, if that thread isn't yet in futex wait. */ + if (a_fetch_add(&ref[0], -1)==1 && ref[1]) + __wake(&ref[0], 1, 1); + } + + node->mutex_ret = pthread_mutex_lock(node->mutex); +} + +static inline void enqueue(pthread_cond_t * c, struct waiter* node) { + lock(&c->_c_lock); + + struct waiter* ohead = c->_c_head; + node->next = ohead; + if (ohead) ohead->prev = node; + else c->_c_tail = node; + c->_c_head = node; + + unlock(&c->_c_lock); +} + enum { WAITING, SIGNALED, @@ -78,33 +145,14 @@ static void unwait(void *arg) int oldstate = a_cas(&node->state, WAITING, LEAVING); if (oldstate == WAITING) { - /* Access to cv object is valid because this waiter was not - * yet signaled and a new signal/broadcast cannot return - * after seeing a LEAVING waiter without getting notified - * via the futex notify below. */ - - pthread_cond_t *c = node->cond; - lock(&c->_c_lock); - - if (c->_c_head == node) c->_c_head = node->next; - else if (node->prev) node->prev->next = node->next; - if (c->_c_tail == node) c->_c_tail = node->prev; - else if (node->next) node->next->prev = node->prev; - - unlock(&c->_c_lock); - - if (node->notify) { - if (a_fetch_add(node->notify, -1)==1) - __wake(node->notify, 1, 1); - } - } else { - /* Lock barrier first to control wake order. */ - lock(&node->barrier); + leave(node); + return; } - node->mutex_ret = pthread_mutex_lock(node->mutex); + /* Lock barrier first to control wake order. */ + lock(&node->barrier); - if (oldstate == WAITING) return; + node->mutex_ret = pthread_mutex_lock(node->mutex); if (!node->next) a_inc(&node->mutex->_m_waiters); @@ -121,7 +169,7 @@ static void unwait(void *arg) int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict m, const struct timespec *restrict ts) { - struct waiter node = { .cond = c, .mutex = m }; + struct waiter node = { .cond = c, .mutex = m, .state = WAITING, .barrier = 2 }; int e, seq, *fut, clock = c->_c_clock; if ((m->_m_type&15) && (m->_m_lock&INT_MAX) != __pthread_self()->tid) @@ -138,17 +186,10 @@ int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict seq = c->_c_seq; a_inc(&c->_c_waiters); } else { - lock(&c->_c_lock); - - seq = node.barrier = 2; + seq = node.barrier; fut = &node.barrier; - node.state = WAITING; - node.next = c->_c_head; - c->_c_head = &node; - if (!c->_c_tail) c->_c_tail = &node; - else node.next->prev = &node; - unlock(&c->_c_lock); + enqueue(c, &node); } pthread_mutex_unlock(m); @@ -162,35 +203,85 @@ int pthread_cond_timedwait(pthread_cond_t *restrict c, pthread_mutex_t *restrict return node.mutex_ret ? node.mutex_ret : e; } +static inline int cond_signal (struct waiter * p, int* ref) +{ + int ret = a_cas(&p->state, WAITING, SIGNALED); + if (ret != WAITING) { + ref[0]++; + p->notify = ref; + if (p->prev) p->prev->next = p->next; + if (p->next) p->next->prev = p->prev; + p->next = 0; + p->prev = 0; + } + return ret; +} + int __private_cond_signal(pthread_cond_t *c, int n) { - struct waiter *p, *first=0; - int ref = 0, cur; + struct waiter *p, *prev, *first=0; + int ref[2] = { 0 }, cur; - lock(&c->_c_lock); - for (p=c->_c_tail; n && p; p=p->prev) { - if (a_cas(&p->state, WAITING, SIGNALED) != WAITING) { - ref++; - p->notify = &ref; + if (n == 1) { + lock(&c->_c_lock); + for (p=c->_c_tail; p; p=prev) { + prev = p->prev; + if (!cond_signal(p, ref)) { + first=p; + p=prev; + first->prev = 0; + break; + } + } + /* Split the list, leaving any remainder on the cv. */ + if (p) { + p->next = 0; } else { - n--; - if (!first) first=p; + c->_c_head = 0; } - } - /* Split the list, leaving any remainder on the cv. */ - if (p) { - if (p->next) p->next->prev = 0; - p->next = 0; + c->_c_tail = p; + unlockRace(&c->_c_lock, ref[0]); } else { - c->_c_head = 0; + lock(&c->_c_lock); + struct waiter * head = c->_c_head; + if (head) { + /* Signal head and tail first to reduce possible + * races for the cv to the beginning of the + * processing. */ + int headrace = cond_signal(head, ref); + struct waiter * tail = c->_c_tail; + p=tail->prev; + if (tail != head) { + if (!cond_signal(tail, ref)) first=tail; + else while (p != head) { + prev = p->prev; + if (!cond_signal(p, ref)) { + first=p; + p=prev; + break; + } + p=prev; + } + } + if (!first && !headrace) first = head; + c->_c_head = 0; + c->_c_tail = 0; + /* Now process the inner part of the list. */ + if (p) { + while (p != head) { + prev = p->prev; + cond_signal(p, ref); + p=prev; + } + } + } + unlockRace(&c->_c_lock, ref[0]); } - c->_c_tail = p; - unlock(&c->_c_lock); /* Wait for any waiters in the LEAVING state to remove * themselves from the list before returning or allowing * signaled threads to proceed. */ - while ((cur = ref)) __wait(&ref, 0, cur, 1); + while ((cur = ref[0])) __wait(&ref[0], &ref[1], cur, 1); /* Allow first signaled waiter, if any, to proceed. */ if (first) unlock(&first->barrier);