Merge branch 'locking-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip

Pull locking updates from Ingo Molnar:
 "The locking tree was busier in this cycle than the usual pattern - a
  couple of major projects happened to coincide.

  The main changes are:

   - implement the atomic_fetch_{add,sub,and,or,xor}() API natively
     across all SMP architectures (Peter Zijlstra)

   - add atomic_fetch_{inc/dec}() as well, using the generic primitives
     (Davidlohr Bueso)

   - optimize various aspects of rwsems (Jason Low, Davidlohr Bueso,
     Waiman Long)

   - optimize smp_cond_load_acquire() on arm64 and implement LSE based
     atomic{,64}_fetch_{add,sub,and,andnot,or,xor}{,_relaxed,_acquire,_release}()
     on arm64 (Will Deacon)

   - introduce smp_acquire__after_ctrl_dep() and fix various barrier
     mis-uses and bugs (Peter Zijlstra)

   - after discovering ancient spin_unlock_wait() barrier bugs in its
     implementation and usage, strengthen its semantics and update/fix
     usage sites (Peter Zijlstra)

   - optimize mutex_trylock() fastpath (Peter Zijlstra)

   - ... misc fixes and cleanups"

* 'locking-core-for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/tip/tip: (67 commits)
  locking/atomic: Introduce inc/dec variants for the atomic_fetch_$op() API
  locking/barriers, arch/arm64: Implement LDXR+WFE based smp_cond_load_acquire()
  locking/static_keys: Fix non static symbol Sparse warning
  locking/qspinlock: Use __this_cpu_dec() instead of full-blown this_cpu_dec()
  locking/atomic, arch/tile: Fix tilepro build
  locking/atomic, arch/m68k: Remove comment
  locking/atomic, arch/arc: Fix build
  locking/Documentation: Clarify limited control-dependency scope
  locking/atomic, arch/rwsem: Employ atomic_long_fetch_add()
  locking/atomic, arch/qrwlock: Employ atomic_fetch_add_acquire()
  locking/atomic, arch/mips: Convert to _relaxed atomics
  locking/atomic, arch/alpha: Convert to _relaxed atomics
  locking/atomic: Remove the deprecated atomic_{set,clear}_mask() functions
  locking/atomic: Remove linux/atomic.h:atomic_fetch_or()
  locking/atomic: Implement atomic{,64,_long}_fetch_{add,sub,and,andnot,or,xor}{,_relaxed,_acquire,_release}()
  locking/atomic: Fix atomic64_relaxed() bits
  locking/atomic, arch/xtensa: Implement atomic_fetch_{add,sub,and,or,xor}()
  locking/atomic, arch/x86: Implement atomic{,64}_fetch_{add,sub,and,or,xor}()
  locking/atomic, arch/tile: Implement atomic{,64}_fetch_{add,sub,and,or,xor}()
  locking/atomic, arch/sparc: Implement atomic{,64}_fetch_{add,sub,and,or,xor}()
  ...
This commit is contained in:
Linus Torvalds
2016-07-25 12:41:29 -07:00
108 changed files with 3341 additions and 1031 deletions

View File

@@ -46,6 +46,7 @@
#include <linux/gfp.h>
#include <linux/kmemcheck.h>
#include <linux/random.h>
#include <linux/jhash.h>
#include <asm/sections.h>
@@ -309,10 +310,14 @@ static struct hlist_head chainhash_table[CHAINHASH_SIZE];
* It's a 64-bit hash, because it's important for the keys to be
* unique.
*/
#define iterate_chain_key(key1, key2) \
(((key1) << MAX_LOCKDEP_KEYS_BITS) ^ \
((key1) >> (64-MAX_LOCKDEP_KEYS_BITS)) ^ \
(key2))
static inline u64 iterate_chain_key(u64 key, u32 idx)
{
u32 k0 = key, k1 = key >> 32;
__jhash_mix(idx, k0, k1); /* Macro that modifies arguments! */
return k0 | (u64)k1 << 32;
}
void lockdep_off(void)
{

View File

@@ -29,12 +29,12 @@ extern void debug_mutex_init(struct mutex *lock, const char *name,
static inline void mutex_set_owner(struct mutex *lock)
{
lock->owner = current;
WRITE_ONCE(lock->owner, current);
}
static inline void mutex_clear_owner(struct mutex *lock)
{
lock->owner = NULL;
WRITE_ONCE(lock->owner, NULL);
}
#define spin_lock_mutex(lock, flags) \

View File

@@ -17,14 +17,20 @@
__list_del((waiter)->list.prev, (waiter)->list.next)
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
/*
* The mutex owner can get read and written to locklessly.
* We should use WRITE_ONCE when writing the owner value to
* avoid store tearing, otherwise, a thread could potentially
* read a partially written and incomplete owner value.
*/
static inline void mutex_set_owner(struct mutex *lock)
{
lock->owner = current;
WRITE_ONCE(lock->owner, current);
}
static inline void mutex_clear_owner(struct mutex *lock)
{
lock->owner = NULL;
WRITE_ONCE(lock->owner, NULL);
}
#else
static inline void mutex_set_owner(struct mutex *lock)

View File

@@ -93,7 +93,7 @@ void queued_read_lock_slowpath(struct qrwlock *lock, u32 cnts)
* that accesses can't leak upwards out of our subsequent critical
* section in the case that the lock is currently held for write.
*/
cnts = atomic_add_return_acquire(_QR_BIAS, &lock->cnts) - _QR_BIAS;
cnts = atomic_fetch_add_acquire(_QR_BIAS, &lock->cnts);
rspin_until_writer_unlock(lock, cnts);
/*

View File

@@ -90,7 +90,7 @@ static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[MAX_NODES]);
* therefore increment the cpu number by one.
*/
static inline u32 encode_tail(int cpu, int idx)
static inline __pure u32 encode_tail(int cpu, int idx)
{
u32 tail;
@@ -103,7 +103,7 @@ static inline u32 encode_tail(int cpu, int idx)
return tail;
}
static inline struct mcs_spinlock *decode_tail(u32 tail)
static inline __pure struct mcs_spinlock *decode_tail(u32 tail)
{
int cpu = (tail >> _Q_TAIL_CPU_OFFSET) - 1;
int idx = (tail & _Q_TAIL_IDX_MASK) >> _Q_TAIL_IDX_OFFSET;
@@ -267,6 +267,63 @@ static __always_inline u32 __pv_wait_head_or_lock(struct qspinlock *lock,
#define queued_spin_lock_slowpath native_queued_spin_lock_slowpath
#endif
/*
* Various notes on spin_is_locked() and spin_unlock_wait(), which are
* 'interesting' functions:
*
* PROBLEM: some architectures have an interesting issue with atomic ACQUIRE
* operations in that the ACQUIRE applies to the LOAD _not_ the STORE (ARM64,
* PPC). Also qspinlock has a similar issue per construction, the setting of
* the locked byte can be unordered acquiring the lock proper.
*
* This gets to be 'interesting' in the following cases, where the /should/s
* end up false because of this issue.
*
*
* CASE 1:
*
* So the spin_is_locked() correctness issue comes from something like:
*
* CPU0 CPU1
*
* global_lock(); local_lock(i)
* spin_lock(&G) spin_lock(&L[i])
* for (i) if (!spin_is_locked(&G)) {
* spin_unlock_wait(&L[i]); smp_acquire__after_ctrl_dep();
* return;
* }
* // deal with fail
*
* Where it is important CPU1 sees G locked or CPU0 sees L[i] locked such
* that there is exclusion between the two critical sections.
*
* The load from spin_is_locked(&G) /should/ be constrained by the ACQUIRE from
* spin_lock(&L[i]), and similarly the load(s) from spin_unlock_wait(&L[i])
* /should/ be constrained by the ACQUIRE from spin_lock(&G).
*
* Similarly, later stuff is constrained by the ACQUIRE from CTRL+RMB.
*
*
* CASE 2:
*
* For spin_unlock_wait() there is a second correctness issue, namely:
*
* CPU0 CPU1
*
* flag = set;
* smp_mb(); spin_lock(&l)
* spin_unlock_wait(&l); if (!flag)
* // add to lockless list
* spin_unlock(&l);
* // iterate lockless list
*
* Which wants to ensure that CPU1 will stop adding bits to the list and CPU0
* will observe the last entry on the list (if spin_unlock_wait() had ACQUIRE
* semantics etc..)
*
* Where flag /should/ be ordered against the locked store of l.
*/
/*
* queued_spin_lock_slowpath() can (load-)ACQUIRE the lock before
* issuing an _unordered_ store to set _Q_LOCKED_VAL.
@@ -322,7 +379,7 @@ void queued_spin_unlock_wait(struct qspinlock *lock)
cpu_relax();
done:
smp_rmb(); /* CTRL + RMB -> ACQUIRE */
smp_acquire__after_ctrl_dep();
}
EXPORT_SYMBOL(queued_spin_unlock_wait);
#endif
@@ -418,7 +475,7 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
* sequentiality; this is because not all clear_pending_set_locked()
* implementations imply full barriers.
*/
smp_cond_acquire(!(atomic_read(&lock->val) & _Q_LOCKED_MASK));
smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_MASK));
/*
* take ownership and clear the pending bit.
@@ -455,6 +512,8 @@ queue:
* pending stuff.
*
* p,*,* -> n,*,*
*
* RELEASE, such that the stores to @node must be complete.
*/
old = xchg_tail(lock, tail);
next = NULL;
@@ -465,6 +524,15 @@ queue:
*/
if (old & _Q_TAIL_MASK) {
prev = decode_tail(old);
/*
* The above xchg_tail() is also a load of @lock which generates,
* through decode_tail(), a pointer.
*
* The address dependency matches the RELEASE of xchg_tail()
* such that the access to @prev must happen after.
*/
smp_read_barrier_depends();
WRITE_ONCE(prev->next, node);
pv_wait_node(node, prev);
@@ -494,7 +562,7 @@ queue:
*
* The PV pv_wait_head_or_lock function, if active, will acquire
* the lock and return a non-zero value. So we have to skip the
* smp_cond_acquire() call. As the next PV queue head hasn't been
* smp_cond_load_acquire() call. As the next PV queue head hasn't been
* designated yet, there is no way for the locked value to become
* _Q_SLOW_VAL. So both the set_locked() and the
* atomic_cmpxchg_relaxed() calls will be safe.
@@ -505,7 +573,7 @@ queue:
if ((val = pv_wait_head_or_lock(lock, node)))
goto locked;
smp_cond_acquire(!((val = atomic_read(&lock->val)) & _Q_LOCKED_PENDING_MASK));
val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));
locked:
/*
@@ -525,9 +593,9 @@ locked:
break;
}
/*
* The smp_cond_acquire() call above has provided the necessary
* acquire semantics required for locking. At most two
* iterations of this loop may be ran.
* The smp_cond_load_acquire() call above has provided the
* necessary acquire semantics required for locking. At most
* two iterations of this loop may be ran.
*/
old = atomic_cmpxchg_relaxed(&lock->val, val, _Q_LOCKED_VAL);
if (old == val)
@@ -551,7 +619,7 @@ release:
/*
* release the node
*/
this_cpu_dec(mcs_nodes[0].count);
__this_cpu_dec(mcs_nodes[0].count);
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);

View File

@@ -112,12 +112,12 @@ static __always_inline int trylock_clear_pending(struct qspinlock *lock)
#else /* _Q_PENDING_BITS == 8 */
static __always_inline void set_pending(struct qspinlock *lock)
{
atomic_set_mask(_Q_PENDING_VAL, &lock->val);
atomic_or(_Q_PENDING_VAL, &lock->val);
}
static __always_inline void clear_pending(struct qspinlock *lock)
{
atomic_clear_mask(_Q_PENDING_VAL, &lock->val);
atomic_andnot(_Q_PENDING_VAL, &lock->val);
}
static __always_inline int trylock_clear_pending(struct qspinlock *lock)

View File

@@ -1478,7 +1478,7 @@ EXPORT_SYMBOL_GPL(rt_mutex_timed_lock);
*/
int __sched rt_mutex_trylock(struct rt_mutex *lock)
{
if (WARN_ON(in_irq() || in_nmi() || in_serving_softirq()))
if (WARN_ON_ONCE(in_irq() || in_nmi() || in_serving_softirq()))
return 0;
return rt_mutex_fasttrylock(lock, rt_mutex_slowtrylock);

View File

@@ -80,7 +80,7 @@ void __init_rwsem(struct rw_semaphore *sem, const char *name,
debug_check_no_locks_freed((void *)sem, sizeof(*sem));
lockdep_init_map(&sem->dep_map, name, key, 0);
#endif
sem->count = RWSEM_UNLOCKED_VALUE;
atomic_long_set(&sem->count, RWSEM_UNLOCKED_VALUE);
raw_spin_lock_init(&sem->wait_lock);
INIT_LIST_HEAD(&sem->wait_list);
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
@@ -114,12 +114,16 @@ enum rwsem_wake_type {
* - the 'active part' of count (&0x0000ffff) reached 0 (but may have changed)
* - the 'waiting part' of count (&0xffff0000) is -ve (and will still be so)
* - there must be someone on the queue
* - the spinlock must be held by the caller
* - the wait_lock must be held by the caller
* - tasks are marked for wakeup, the caller must later invoke wake_up_q()
* to actually wakeup the blocked task(s) and drop the reference count,
* preferably when the wait_lock is released
* - woken process blocks are discarded from the list after having task zeroed
* - writers are only woken if downgrading is false
* - writers are only marked woken if downgrading is false
*/
static struct rw_semaphore *
__rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
__rwsem_mark_wake(struct rw_semaphore *sem,
enum rwsem_wake_type wake_type, struct wake_q_head *wake_q)
{
struct rwsem_waiter *waiter;
struct task_struct *tsk;
@@ -128,13 +132,16 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
waiter = list_entry(sem->wait_list.next, struct rwsem_waiter, list);
if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
if (wake_type == RWSEM_WAKE_ANY)
/* Wake writer at the front of the queue, but do not
* grant it the lock yet as we want other writers
* to be able to steal it. Readers, on the other hand,
* will block as they will notice the queued writer.
if (wake_type == RWSEM_WAKE_ANY) {
/*
* Mark writer at the front of the queue for wakeup.
* Until the task is actually later awoken later by
* the caller, other writers are able to steal it.
* Readers, on the other hand, will block as they
* will notice the queued writer.
*/
wake_up_process(waiter->task);
wake_q_add(wake_q, waiter->task);
}
goto out;
}
@@ -146,15 +153,27 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
if (wake_type != RWSEM_WAKE_READ_OWNED) {
adjustment = RWSEM_ACTIVE_READ_BIAS;
try_reader_grant:
oldcount = rwsem_atomic_update(adjustment, sem) - adjustment;
oldcount = atomic_long_fetch_add(adjustment, &sem->count);
if (unlikely(oldcount < RWSEM_WAITING_BIAS)) {
/* A writer stole the lock. Undo our reader grant. */
if (rwsem_atomic_update(-adjustment, sem) &
RWSEM_ACTIVE_MASK)
/*
* If the count is still less than RWSEM_WAITING_BIAS
* after removing the adjustment, it is assumed that
* a writer has stolen the lock. We have to undo our
* reader grant.
*/
if (atomic_long_add_return(-adjustment, &sem->count) <
RWSEM_WAITING_BIAS)
goto out;
/* Last active locker left. Retry waking readers. */
goto try_reader_grant;
}
/*
* It is not really necessary to set it to reader-owned here,
* but it gives the spinners an early indication that the
* readers now have the lock.
*/
rwsem_set_reader_owned(sem);
}
/* Grant an infinite number of read locks to the readers at the front
@@ -179,7 +198,7 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
adjustment -= RWSEM_WAITING_BIAS;
if (adjustment)
rwsem_atomic_add(adjustment, sem);
atomic_long_add(adjustment, &sem->count);
next = sem->wait_list.next;
loop = woken;
@@ -187,17 +206,15 @@ __rwsem_do_wake(struct rw_semaphore *sem, enum rwsem_wake_type wake_type)
waiter = list_entry(next, struct rwsem_waiter, list);
next = waiter->list.next;
tsk = waiter->task;
wake_q_add(wake_q, tsk);
/*
* Make sure we do not wakeup the next reader before
* setting the nil condition to grant the next reader;
* otherwise we could miss the wakeup on the other
* side and end up sleeping again. See the pairing
* in rwsem_down_read_failed().
* Ensure that the last operation is setting the reader
* waiter to nil such that rwsem_down_read_failed() cannot
* race with do_exit() by always holding a reference count
* to the task to wakeup.
*/
smp_mb();
waiter->task = NULL;
wake_up_process(tsk);
put_task_struct(tsk);
smp_store_release(&waiter->task, NULL);
} while (--loop);
sem->wait_list.next = next;
@@ -216,11 +233,11 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
long count, adjustment = -RWSEM_ACTIVE_READ_BIAS;
struct rwsem_waiter waiter;
struct task_struct *tsk = current;
WAKE_Q(wake_q);
/* set up my own style of waitqueue */
waiter.task = tsk;
waiter.type = RWSEM_WAITING_FOR_READ;
get_task_struct(tsk);
raw_spin_lock_irq(&sem->wait_lock);
if (list_empty(&sem->wait_list))
@@ -228,7 +245,7 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
list_add_tail(&waiter.list, &sem->wait_list);
/* we're now waiting on the lock, but no longer actively locking */
count = rwsem_atomic_update(adjustment, sem);
count = atomic_long_add_return(adjustment, &sem->count);
/* If there are no active locks, wake the front queued process(es).
*
@@ -238,9 +255,10 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
if (count == RWSEM_WAITING_BIAS ||
(count > RWSEM_WAITING_BIAS &&
adjustment != -RWSEM_ACTIVE_READ_BIAS))
sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
sem = __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
/* wait to be given the lock */
while (true) {
@@ -255,17 +273,29 @@ struct rw_semaphore __sched *rwsem_down_read_failed(struct rw_semaphore *sem)
}
EXPORT_SYMBOL(rwsem_down_read_failed);
/*
* This function must be called with the sem->wait_lock held to prevent
* race conditions between checking the rwsem wait list and setting the
* sem->count accordingly.
*/
static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem)
{
/*
* Try acquiring the write lock. Check count first in order
* to reduce unnecessary expensive cmpxchg() operations.
* Avoid trying to acquire write lock if count isn't RWSEM_WAITING_BIAS.
*/
if (count == RWSEM_WAITING_BIAS &&
cmpxchg_acquire(&sem->count, RWSEM_WAITING_BIAS,
RWSEM_ACTIVE_WRITE_BIAS) == RWSEM_WAITING_BIAS) {
if (!list_is_singular(&sem->wait_list))
rwsem_atomic_update(RWSEM_WAITING_BIAS, sem);
if (count != RWSEM_WAITING_BIAS)
return false;
/*
* Acquire the lock by trying to set it to ACTIVE_WRITE_BIAS. If there
* are other tasks on the wait list, we need to add on WAITING_BIAS.
*/
count = list_is_singular(&sem->wait_list) ?
RWSEM_ACTIVE_WRITE_BIAS :
RWSEM_ACTIVE_WRITE_BIAS + RWSEM_WAITING_BIAS;
if (atomic_long_cmpxchg_acquire(&sem->count, RWSEM_WAITING_BIAS, count)
== RWSEM_WAITING_BIAS) {
rwsem_set_owner(sem);
return true;
}
@@ -279,13 +309,13 @@ static inline bool rwsem_try_write_lock(long count, struct rw_semaphore *sem)
*/
static inline bool rwsem_try_write_lock_unqueued(struct rw_semaphore *sem)
{
long old, count = READ_ONCE(sem->count);
long old, count = atomic_long_read(&sem->count);
while (true) {
if (!(count == 0 || count == RWSEM_WAITING_BIAS))
return false;
old = cmpxchg_acquire(&sem->count, count,
old = atomic_long_cmpxchg_acquire(&sem->count, count,
count + RWSEM_ACTIVE_WRITE_BIAS);
if (old == count) {
rwsem_set_owner(sem);
@@ -306,16 +336,11 @@ static inline bool rwsem_can_spin_on_owner(struct rw_semaphore *sem)
rcu_read_lock();
owner = READ_ONCE(sem->owner);
if (!owner) {
long count = READ_ONCE(sem->count);
if (!rwsem_owner_is_writer(owner)) {
/*
* If sem->owner is not set, yet we have just recently entered the
* slowpath with the lock being active, then there is a possibility
* reader(s) may have the lock. To be safe, bail spinning in these
* situations.
* Don't spin if the rwsem is readers owned.
*/
if (count & RWSEM_ACTIVE_MASK)
ret = false;
ret = !rwsem_owner_is_reader(owner);
goto done;
}
@@ -325,10 +350,15 @@ done:
return ret;
}
static noinline
bool rwsem_spin_on_owner(struct rw_semaphore *sem, struct task_struct *owner)
/*
* Return true only if we can still spin on the owner field of the rwsem.
*/
static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem)
{
long count;
struct task_struct *owner = READ_ONCE(sem->owner);
if (!rwsem_owner_is_writer(owner))
goto out;
rcu_read_lock();
while (sem->owner == owner) {
@@ -349,22 +379,16 @@ bool rwsem_spin_on_owner(struct rw_semaphore *sem, struct task_struct *owner)
cpu_relax_lowlatency();
}
rcu_read_unlock();
if (READ_ONCE(sem->owner))
return true; /* new owner, continue spinning */
out:
/*
* When the owner is not set, the lock could be free or
* held by readers. Check the counter to verify the
* state.
* If there is a new owner or the owner is not set, we continue
* spinning.
*/
count = READ_ONCE(sem->count);
return (count == 0 || count == RWSEM_WAITING_BIAS);
return !rwsem_owner_is_reader(READ_ONCE(sem->owner));
}
static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
{
struct task_struct *owner;
bool taken = false;
preempt_disable();
@@ -376,12 +400,17 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
if (!osq_lock(&sem->osq))
goto done;
while (true) {
owner = READ_ONCE(sem->owner);
if (owner && !rwsem_spin_on_owner(sem, owner))
break;
/* wait_lock will be acquired if write_lock is obtained */
/*
* Optimistically spin on the owner field and attempt to acquire the
* lock whenever the owner changes. Spinning will be stopped when:
* 1) the owning writer isn't running; or
* 2) readers own the lock as we can't determine if they are
* actively running or not.
*/
while (rwsem_spin_on_owner(sem)) {
/*
* Try to acquire the lock
*/
if (rwsem_try_write_lock_unqueued(sem)) {
taken = true;
break;
@@ -393,7 +422,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
* we're an RT task that will live-lock because we won't let
* the owner complete.
*/
if (!owner && (need_resched() || rt_task(current)))
if (!sem->owner && (need_resched() || rt_task(current)))
break;
/*
@@ -440,9 +469,10 @@ __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
bool waiting = true; /* any queued threads before us */
struct rwsem_waiter waiter;
struct rw_semaphore *ret = sem;
WAKE_Q(wake_q);
/* undo write bias from down_write operation, stop active locking */
count = rwsem_atomic_update(-RWSEM_ACTIVE_WRITE_BIAS, sem);
count = atomic_long_sub_return(RWSEM_ACTIVE_WRITE_BIAS, &sem->count);
/* do optimistic spinning and steal lock if possible */
if (rwsem_optimistic_spin(sem))
@@ -465,18 +495,29 @@ __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
/* we're now waiting on the lock, but no longer actively locking */
if (waiting) {
count = READ_ONCE(sem->count);
count = atomic_long_read(&sem->count);
/*
* If there were already threads queued before us and there are
* no active writers, the lock must be read owned; so we try to
* wake any read locks that were queued ahead of us.
*/
if (count > RWSEM_WAITING_BIAS)
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READERS);
if (count > RWSEM_WAITING_BIAS) {
WAKE_Q(wake_q);
sem = __rwsem_mark_wake(sem, RWSEM_WAKE_READERS, &wake_q);
/*
* The wakeup is normally called _after_ the wait_lock
* is released, but given that we are proactively waking
* readers we can deal with the wake_q overhead as it is
* similar to releasing and taking the wait_lock again
* for attempting rwsem_try_write_lock().
*/
wake_up_q(&wake_q);
}
} else
count = rwsem_atomic_update(RWSEM_WAITING_BIAS, sem);
count = atomic_long_add_return(RWSEM_WAITING_BIAS, &sem->count);
/* wait until we successfully acquire the lock */
set_current_state(state);
@@ -492,7 +533,7 @@ __rwsem_down_write_failed_common(struct rw_semaphore *sem, int state)
schedule();
set_current_state(state);
} while ((count = sem->count) & RWSEM_ACTIVE_MASK);
} while ((count = atomic_long_read(&sem->count)) & RWSEM_ACTIVE_MASK);
raw_spin_lock_irq(&sem->wait_lock);
}
@@ -507,10 +548,11 @@ out_nolock:
raw_spin_lock_irq(&sem->wait_lock);
list_del(&waiter.list);
if (list_empty(&sem->wait_list))
rwsem_atomic_update(-RWSEM_WAITING_BIAS, sem);
atomic_long_add(-RWSEM_WAITING_BIAS, &sem->count);
else
__rwsem_do_wake(sem, RWSEM_WAKE_ANY);
__rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irq(&sem->wait_lock);
wake_up_q(&wake_q);
return ERR_PTR(-EINTR);
}
@@ -537,6 +579,7 @@ __visible
struct rw_semaphore *rwsem_wake(struct rw_semaphore *sem)
{
unsigned long flags;
WAKE_Q(wake_q);
/*
* If a spinner is present, it is not necessary to do the wakeup.
@@ -573,9 +616,10 @@ locked:
/* do nothing if list empty */
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem, RWSEM_WAKE_ANY);
sem = __rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
wake_up_q(&wake_q);
return sem;
}
@@ -590,14 +634,16 @@ __visible
struct rw_semaphore *rwsem_downgrade_wake(struct rw_semaphore *sem)
{
unsigned long flags;
WAKE_Q(wake_q);
raw_spin_lock_irqsave(&sem->wait_lock, flags);
/* do nothing if list empty */
if (!list_empty(&sem->wait_list))
sem = __rwsem_do_wake(sem, RWSEM_WAKE_READ_OWNED);
sem = __rwsem_mark_wake(sem, RWSEM_WAKE_READ_OWNED, &wake_q);
raw_spin_unlock_irqrestore(&sem->wait_lock, flags);
wake_up_q(&wake_q);
return sem;
}

View File

@@ -22,6 +22,7 @@ void __sched down_read(struct rw_semaphore *sem)
rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
rwsem_set_reader_owned(sem);
}
EXPORT_SYMBOL(down_read);
@@ -33,8 +34,10 @@ int down_read_trylock(struct rw_semaphore *sem)
{
int ret = __down_read_trylock(sem);
if (ret == 1)
if (ret == 1) {
rwsem_acquire_read(&sem->dep_map, 0, 1, _RET_IP_);
rwsem_set_reader_owned(sem);
}
return ret;
}
@@ -124,7 +127,7 @@ void downgrade_write(struct rw_semaphore *sem)
* lockdep: a downgraded write will live on as a write
* dependency.
*/
rwsem_clear_owner(sem);
rwsem_set_reader_owned(sem);
__downgrade_write(sem);
}
@@ -138,6 +141,7 @@ void down_read_nested(struct rw_semaphore *sem, int subclass)
rwsem_acquire_read(&sem->dep_map, subclass, 0, _RET_IP_);
LOCK_CONTENDED(sem, __down_read_trylock, __down_read);
rwsem_set_reader_owned(sem);
}
EXPORT_SYMBOL(down_read_nested);

View File

@@ -1,14 +1,58 @@
/*
* The owner field of the rw_semaphore structure will be set to
* RWSEM_READ_OWNED when a reader grabs the lock. A writer will clear
* the owner field when it unlocks. A reader, on the other hand, will
* not touch the owner field when it unlocks.
*
* In essence, the owner field now has the following 3 states:
* 1) 0
* - lock is free or the owner hasn't set the field yet
* 2) RWSEM_READER_OWNED
* - lock is currently or previously owned by readers (lock is free
* or not set by owner yet)
* 3) Other non-zero value
* - a writer owns the lock
*/
#define RWSEM_READER_OWNED ((struct task_struct *)1UL)
#ifdef CONFIG_RWSEM_SPIN_ON_OWNER
/*
* All writes to owner are protected by WRITE_ONCE() to make sure that
* store tearing can't happen as optimistic spinners may read and use
* the owner value concurrently without lock. Read from owner, however,
* may not need READ_ONCE() as long as the pointer value is only used
* for comparison and isn't being dereferenced.
*/
static inline void rwsem_set_owner(struct rw_semaphore *sem)
{
sem->owner = current;
WRITE_ONCE(sem->owner, current);
}
static inline void rwsem_clear_owner(struct rw_semaphore *sem)
{
sem->owner = NULL;
WRITE_ONCE(sem->owner, NULL);
}
static inline void rwsem_set_reader_owned(struct rw_semaphore *sem)
{
/*
* We check the owner value first to make sure that we will only
* do a write to the rwsem cacheline when it is really necessary
* to minimize cacheline contention.
*/
if (sem->owner != RWSEM_READER_OWNED)
WRITE_ONCE(sem->owner, RWSEM_READER_OWNED);
}
static inline bool rwsem_owner_is_writer(struct task_struct *owner)
{
return owner && owner != RWSEM_READER_OWNED;
}
static inline bool rwsem_owner_is_reader(struct task_struct *owner)
{
return owner == RWSEM_READER_OWNED;
}
#else
static inline void rwsem_set_owner(struct rw_semaphore *sem)
{
@@ -17,4 +61,8 @@ static inline void rwsem_set_owner(struct rw_semaphore *sem)
static inline void rwsem_clear_owner(struct rw_semaphore *sem)
{
}
static inline void rwsem_set_reader_owned(struct rw_semaphore *sem)
{
}
#endif