关于同步的一点思考

2015/03/24 JUC

线程同步可以说在日常开发中是用的很多, 但对于其内部如何实现的,一般人可能知道的并不多。 本篇文章将从如何实现简单的锁开始,介绍linux中的锁实现futex的优点及原理,最后分析java中同步机制如wait/notify, synchronized, ReentrantLock。

自己实现锁 首先,如果要你实现操作系统的锁,该如何实现?先想想这个问题,暂时不考虑性能、可用性等问题,就用最简单、粗暴的方式。当你心中有个大致的思路后,再接着往下看。

下文中的代码都是伪代码。

自旋 最容易想到可能是自旋:

volatile int status=0;

void lock(){

while(!compareAndSet(0,1)){
}
//get lock

}

void unlock(){ status=0; }

boolean compareAndSet(int except,int newValue){ //cas操作,修改status成功则返回true } 上面的代码通过自旋和cas来实现一个最简单的锁。

这样实现的锁显然有个致命的缺点:耗费cpu资源。没有竞争到锁的线程会一直占用cpu资源进行cas操作,假如一个线程获得锁后要花费10s处理业务逻辑,那另外一个线程就会白白的花费10s的cpu资源。(假设系统中就只有这两个线程的情况)。

yield+自旋 要解决自旋锁的性能问题必须让竞争锁失败的线程不忙等,而是在获取不到锁的时候能把cpu资源给让出来,说到让cpu资源,你可能想到了yield()方法,看看下面的例子:

volatile int status=0;

void lock(){

while(!compareAndSet(0,1)){
	yield();
}
//get lock

}

void unlock(){ status=0; }

当线程竞争锁失败时,会调用yield方法让出cpu。需要注意的是该方法只是当前让出cpu,有可能操作系统下次还是选择运行该线程。其实现是 将当期线程移动到所在优先调度队列的末端(操作系统线程调度了解一下?有时间的话,下次写写这块内容)。也就是说,如果该线程处于优先级最高的调度队列且该队列只有该线程,那操作系统下次还是运行该线程。

自旋+yield的方式并没有完全解决问题,当系统只有两个线程竞争锁时,yield是有效的。但是如果有100个线程竞争锁,当线程1获得锁后,还有99个线程在反复的自旋+yield,线程2调用yield后,操作系统下次运行的可能是线程3;而线程3CAS失败后调用yield后,操作系统下次运行的可能是线程4… 假如运行在单核cpu下,在竞争锁时最差只有1%的cpu利用率,导致获得锁的线程1一直被中断,执行实际业务代码时间变得更长,从而导致锁释放的时间变的更长。

sleep+自旋 你可能从一开始就想到了,当竞争锁失败后,可以将用Thread.sleep将线程休眠,从而不占用cpu资源:

volatile int status=0;

void lock(){

while(!compareAndSet(0,1)){
	sleep(10);
}
//get lock

}

void unlock(){ status=0; }

上述方式我们可能见的比较多,通常用于实现上层锁。该方式不适合用于操作系统级别的锁,因为作为一个底层锁,其sleep时间很难设置。sleep的时间取决于同步代码块的执行时间,sleep时间如果太短了,会导致线程切换频繁(极端情况和yield方式一样);sleep时间如果设置的过长,会导致线程不能及时获得锁。因此没法设置一个通用的sleep值。就算sleep的值由调用者指定也不能完全解决问题:有的时候调用锁的人也不知道同步块代码会执行多久。

park+自旋 那可不可以在获取不到锁的时候让线程释放cpu资源进行等待,当持有锁的线程释放锁的时候将等待的线程唤起呢?

volatile int status=0;

Queue parkQueue;

void lock(){

while(!compareAndSet(0,1)){
	//
	lock_wait();
}
//get lock

}

void synchronized unlock(){ lock_notify(); }

void lock_wait(){ //将当期线程加入到等待队列 parkQueue.add(nowThread); //将当期线程释放cpu releaseCpu(); } void lock_notify(){ //得到要唤醒的线程 Thread t=parkList.poll(); //唤醒等待线程 wakeAThread(t); }

上面是伪代码,描述这种设计思想,至于释放cpu资源、唤醒等待线程的的具体实现,后文会再说。这种方案相比于sleep而言,只有在锁被释放的时候,竞争锁的线程才会被唤醒,不会存在过早或过晚唤醒的问题。

小结 对于锁冲突不严重的情况,用自旋锁会更适合,试想每个线程获得锁后很短的一段时间内就释放锁,竞争锁的线程只要经历几次自旋运算后就能获得锁,那就没必要等待该线程了,因为等待线程意味着需要进入到内核态进行上下文切换,而上下文切换是有成本的并且还不低,如果锁很快就释放了,那上下文切换的开销将超过自旋。

目前操作系统中,一般是用自旋+等待结合的形式实现锁:在进入锁时先自旋一定次数,如果还没获得锁再进行等待。

futex linux底层用futex实现锁,futex由一个内核层的队列和一个用户空间层的atomic integer构成。当获得锁时,尝试cas更改integer,如果integer原始值是0,则修改成功,该线程获得锁,否则就将当期线程放入到 wait queue中(即操作系统的等待队列)。

上述说法有些抽象,如果你没看明白也没关系。我们先看一下没有futex之前,linux是怎么实现锁的。

futex诞生之前 在futex诞生之前,linux下的同步机制可以归为两类:用户态的同步机制 和内核同步机制。 用户态的同步机制基本上就是利用原子指令实现的自旋锁。关于自旋锁其缺点也说过了,不适用于大的临界区(即锁占用时间比较长的情况)。

内核提供的同步机制,如semaphore等,使用的是上文说的自旋+等待的形式。 它对于大小临界区和都适用。但是因为它是内核层的(释放cpu资源是内核级调用),所以每次lock与unlock都是一次系统调用,即使没有锁冲突,也必须要通过系统调用进入内核之后才能识别。

理想的同步机制应该是没有锁冲突时在用户态利用原子指令就解决问题,而需要挂起等待时再使用内核提供的系统调用进行睡眠与唤醒。换句话说,在用户态的自旋失败时,能不能让进程挂起,由持有锁的线程释放锁时将其唤醒? 如果你没有较深入地考虑过这个问题,很可能想当然的认为类似于这样就行了(伪代码):

void lock(int lockval) { //trylock是用户级的自旋锁 while(!trylock(lockval)) { wait();//释放cpu,并将当期线程加入等待队列,是系统调用 } }

boolean trylock(int lockval){ int i=0; //localval=1代表上锁成功 while(!compareAndSet(lockval,0,1)){ if(++i>10){ return false; } } return true; }

void unlock(int lockval) { compareAndSet(lockval,1,0); notify(); } 上述代码的问题是trylock和wait两个调用之间存在一个窗口: 如果一个线程trylock失败,在调用wait时持有锁的线程释放了锁,当前线程还是会调用wait进行等待,但之后就没有人再将该线程唤醒了。

futex诞生之后 我们来看看futex的方法定义:

 //uaddr指向一个地址,val代表这个地址期待的值,当*uaddr==val时,才会进行wait
 int futex_wait(int *uaddr, int val);
 //唤醒n个在uaddr指向的锁变量上挂起等待的进程
 int futex_wake(int *uaddr, int n);

futex_wait真正将进程挂起之前会检查addr指向的地址的值是否等于val,如果不相等则会立即返回,由用户态继续trylock。否则将当期线程插入到一个队列中去,并挂起。

futex内部维护了一个队列,在线程挂起前会线程插入到其中,同时对于队列中的每个节点都有一个标识,代表该线程关联锁的uaddr。这样,当用户态调用futex_wake时,只需要遍历这个等待队列,把带有相同uaddr的节点所对应的进程唤醒就行了。

作为优化,futex维护的其实是个类似java 中的concurrent hashmap的结构。其持有一个总链表,总链表中每个元素都是一个带有自旋锁的子链表。调用futex_wait挂起的进程,通过其uaddr hash到某一个具体的子链表上去。这样一方面能分散对等待队列的竞争、另一方面减小单个队列的长度,便于futex_wake时的查找。每个链表各自持有一把spinlock,将”*uaddr和val的比较操作”与”把进程加入队列的操作”保护在一个临界区中。 另外,futex是支持多进程的,当使用futex在多进程间进行同步时,需要考虑同一个物理内存地址在不同进程中的虚拟地址是不同的。

End 本文讲述了实现锁的几种形式以及linux中futex的实现,下篇文章会讲讲Java中ReentrantLock,包括其java层的实现以及使用到的LockSupport.park的底层实现。

在<关于同步的一点思考-上>中介绍了几种实现锁的方式以及linux底层futex的实现原理 ReentrantLock的实现网上有很多文章了,本篇文章会简单介绍下其java层实现,重点放在分析竞争锁失败后如何阻塞线程。 因篇幅有限,synchronized的内容将会放到下篇文章。

Java Lock的实现 ReentrantLock是jdk中常用的锁实现,其实现逻辑主语基于AQS(juc包中的大多数同步类实现都是基于AQS);接下来会简单介绍AQS的大致原理,关于其实现细节以及各种应用,之后会写一篇文章具体分析。

AQS AQS是类AbstractQueuedSynchronizer.java的简称,JUC包下的ReentrantLock、CyclicBarrier、CountdownLatch都使用到了AQS。

其大致原理如下:

AQS维护一个叫做state的int型变量和一个双向链表,state用来表示同步状态,双向链表存储的是等待锁的线程 加锁时首先调用tryAcquire尝试获得锁,如果获得锁失败,则将线程插入到双向链表中,并调用LockSupport.park()方法阻塞当前线程。 释放锁时调用LockSupport.unpark()唤起链表中的第一个节点的线程。被唤起的线程会重新走一遍竞争锁的流程。 其中tryAcquire方法是抽象方法,具体实现取决于实现类,我们常说的公平锁和非公平锁的区别就在于该方法的实现。

ReentrantLock ReentrantLock分为公平锁和非公平锁,我们只看公平锁。 ReentrantLock.lock会调用到ReentrantLock#FairSync.lock中:

FairSync.java

static final class FairSync extends Sync {

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

AbstractQueuedSynchronizer.java

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 可以看到FairSync.lock调用了AQS的acquire方法,而在acquire中首先调用tryAcquire尝试获得锁,以下两种情况返回true:

state==0(代表没有线程持有锁),且等待队列为空(公平的实现),且cas修改state成功。 当前线程已经获得了锁,这次调用是重入 如果tryAcquire失败则调用acquireQueued阻塞当前线程。acquireQueued最终会调用到LockSupport.park()阻塞线程。

LockSupport.park 个人认为,要深入理解锁机制,一个很重要的点是理解系统是如何阻塞线程的。

LockSupport.java

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    UNSAFE.park(false, 0L);
    setBlocker(t, null);
} park方法的参数blocker是用于负责这次阻塞的同步对象,在AQS的调用中,这个对象就是AQS本身。我们知道synchronized关键字是需要指定一个对象的(如果作用于方法上则是当前对象或当前类),与之类似blocker就是LockSupport指定的对象。

park方法调用了native方法UNSAFE.park,第一个参数代表第二个参数是否是绝对时间,第二个参数代表最长阻塞时间。

其实现如下,只保留核心代码,完整代码看查看unsafe.cpp

Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time){ … thread->parker()->park(isAbsolute != 0, time); … }

park方法在os_linux.cpp中(其他操作系统的实现在os_xxx中)

void Parker::park(bool isAbsolute, jlong time) {

… //获得当前线程 Thread* thread = Thread::current(); assert(thread->is_Java_thread(), “Must be JavaThread”); JavaThread *jt = (JavaThread *)thread;

//如果当前线程被设置了interrupted标记,则直接返回 if (Thread::is_interrupted(thread, false)) { return; }

if (time > 0) { //unpacktime中根据isAbsolute的值来填充absTime结构体,isAbsolute为true时,time代表绝对时间且单位是毫秒,否则time是相对时间且单位是纳秒 //absTime.tvsec代表了对于时间的秒 //absTime.tv_nsec代表对应时间的纳秒 unpackTime(&absTime, isAbsolute, time); }

//调用mutex trylock方法
if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
return; }
//_counter是一个许可的数量,跟ReentrantLock里定义的许可变量基本都是一个原理。 unpack方法调用时会将_counter赋值为1。 //_counter>0代表已经有人调用了unpark,所以不用阻塞

int status ; if (_counter > 0) { // no wait needed _counter = 0; //释放mutex锁 status = pthread_mutex_unlock(_mutex); return; }

//设置线程状态为CONDVAR_WAIT OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */); … //等待 _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX; pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);

… //释放mutex锁 status = pthread_mutex_unlock(_mutex) ;

} park方法用POSIX的pthread_cond_timedwait方法阻塞线程,调用pthread_cond_timedwait前需要先获得锁,因此park主要流程为:

调用pthread_mutex_trylock尝试获得锁,如果获取锁失败则直接返回 调用pthread_cond_timedwait进行等待 调用pthread_mutex_unlock释放锁 另外,在阻塞当前线程前,会调用OSThreadWaitState的构造方法将线程状态设置为CONDVAR_WAIT,在Jvm中Thread状态枚举如下

enum ThreadState { ALLOCATED, // Memory has been allocated but not initialized INITIALIZED, // The thread has been initialized but yet started RUNNABLE, // Has been started and is runnable, but not necessarily running MONITOR_WAIT, // Waiting on a contended monitor lock CONDVAR_WAIT, // Waiting on a condition variable OBJECT_WAIT, // Waiting on an Object.wait() call BREAKPOINTED, // Suspended at breakpoint SLEEPING, // Thread.sleep() ZOMBIE // All done, but not reclaimed yet }; Linux的timedwait 由上文我们可以知道LockSupport.park方法最终是由POSIX的 pthread_cond_timedwait的方法实现的。 我们现在就进一步看看pthread_mutex_trylock,pthread_cond_timedwait,pthread_mutex_unlock这几个方法是如何实现的。

Linux系统中相关代码在glibc库中。

pthread_mutex_trylock 先看trylock的实现, 代码在glibc的pthread_mutex_trylock.c文件中,该方法代码很多,我们只看主要代码

//pthread_mutex_t是posix中的互斥锁结构体 int __pthread_mutex_trylock (mutex) pthread_mutex_t *mutex; { int oldval; pid_t id = THREAD_GETMEM (THREAD_SELF, tid); switch (__builtin_expect (PTHREAD_MUTEX_TYPE (mutex), PTHREAD_MUTEX_TIMED_NP)) {

case PTHREAD_MUTEX_ERRORCHECK_NP:
case PTHREAD_MUTEX_TIMED_NP:
case PTHREAD_MUTEX_ADAPTIVE_NP:
  /* Normal mutex.  */
  if (lll_trylock (mutex->__data.__lock) != 0)
break;

  /* Record the ownership.  */
  mutex->__data.__owner = id;
  ++mutex->__data.__nusers;

  return 0;
}

} //以下代码在lowlevellock.h中
#define __lll_trylock(futex)
(atomic_compare_and_exchange_val_acq (futex, 1, 0) != 0) #define lll_trylock(futex) __lll_trylock (&(futex)) mutex默认用的是PTHREAD_MUTEX_NORMAL类型(与PTHREAD_MUTEX_TIMED_NP相同); 因此会先调用lll_trylock方法,lll_trylock实际上是一个cas操作,如果mutex->__data.__lock==0则将其修改为1并返回0,否则返回1。

如果成功,则更改mutex中的owner为当前线程。

pthread_mutex_unlock pthread_mutex_unlock.c

int internal_function attribute_hidden __pthread_mutex_unlock_usercnt (mutex, decr) pthread_mutex_t mutex; int decr; { if (__builtin_expect (type, PTHREAD_MUTEX_TIMED_NP) == PTHREAD_MUTEX_TIMED_NP) { / Always reset the owner field. / normal: mutex->__data.__owner = 0; if (decr) / One less user. */ –mutex->__data.__nusers;

  /* Unlock.  */
  lll_unlock (mutex->__data.__lock, PTHREAD_MUTEX_PSHARED (mutex));
  return 0;
} } pthread_mutex_unlock将mutex中的owner清空,并调用了lll_unlock方法

lowlevellock.h

#define __lll_unlock(futex, private)
((void) ({
int *__futex = (futex);
int __val = atomic_exchange_rel (__futex, 0);

if (__builtin_expect (__val > 1, 0))
lll_futex_wake (__futex, 1, private);
})) #define lll_unlock(futex, private) __lll_unlock(&(futex), private)

#define lll_futex_wake(ftx, nr, private)
({
DO_INLINE_SYSCALL(futex, 3, (long) (ftx),
__lll_private_flag (FUTEX_WAKE, private),
(int) (nr));
_r10 == -1 ? -_retval : _retval;
}) lll_unlock分为两个步骤:

将futex设置为0并拿到设置之前的值(用户态操作) 如果futex之前的值>1,代表存在锁冲突,也就是说有线程调用了FUTEX_WAIT在休眠,所以通过调用系统函数FUTEX_WAKE唤醒休眠线程 FUTEX_WAKE 在上一篇文章有分析,futex机制的核心是当获得锁时,尝试cas更改一个int型变量(用户态操作),如果integer原始值是0,则修改成功,该线程获得锁,否则就将当期线程放入到 wait queue中,wait queue中的线程不会被系统调度(内核态操作)。

futex变量的值有3种:0代表当前锁空闲,1代表有线程持有当前锁,2代表存在锁冲突。futex的值初始化时是0;当调用try_lock的时候会利用cas操作改为1(见上面的trylock函数);当调用lll_lock时,如果不存在锁冲突,则将其改为1,否则改为2。

#define __lll_lock(futex, private)
((void) ({
int *__futex = (futex);
if (__builtin_expect (atomic_compare_and_exchange_bool_acq (__futex,
1, 0), 0))
{
if (__builtin_constant_p (private) && (private) == LLL_PRIVATE)
__lll_lock_wait_private (__futex);
else
__lll_lock_wait (__futex, private);
}
})) #define lll_lock(futex, private) __lll_lock (&(futex), private)

void __lll_lock_wait_private (int futex) { //第一次进来的时候futex==1,所以不会走这个if if (futex == 2) lll_futex_wait (futex, 2, LLL_PRIVATE); //在这里会把futex设置成2,并调用futex_wait让当前线程等待 while (atomic_exchange_acq (futex, 2) != 0) lll_futex_wait (futex, 2, LLL_PRIVATE); }

pthread_cond_timedwait pthread_cond_timedwait用于阻塞线程,实现线程等待, 代码在glibc的pthread_cond_timedwait.c文件中,代码较长,你可以先简单过一遍,看完下面的分析再重新读一遍代码

int int __pthread_cond_timedwait (cond, mutex, abstime) pthread_cond_t *cond; pthread_mutex_t *mutex; const struct timespec *abstime; { struct _pthread_cleanup_buffer buffer; struct _condvar_cleanup_buffer cbuffer; int result = 0;

/* Catch invalid parameters. */ if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) return EINVAL;

int pshared = (cond->__data.__mutex == (void *) ~0l) ? LLL_SHARED : LLL_PRIVATE;

//1.获得cond锁 lll_lock (cond->__data.__lock, pshared);

//2.释放mutex锁 int err = __pthread_mutex_unlock_usercnt (mutex, 0); if (err) { lll_unlock (cond->__data.__lock, pshared); return err; }

/* We have one new user of the condvar. */ //每执行一次wait(pthread_cond_timedwait/pthread_cond_wait),__total_seq就会+1 ++cond->__data.__total_seq; //用来执行futex_wait的变量 ++cond->__data.__futex; //标识该cond还有多少线程在使用,pthread_cond_destroy需要等待所有的操作完成 cond->__data.__nwaiters += 1 « COND_NWAITERS_SHIFT;

/* Remember the mutex we are using here. If there is already a different address store this is a bad user bug. Do not store anything for pshared condvars. */ //保存mutex锁 if (cond->__data.__mutex != (void *) ~0l) cond->__data.__mutex = mutex;

/* Prepare structure passed to cancellation handler. */ cbuffer.cond = cond; cbuffer.mutex = mutex;

/* Before we block we enable cancellation. Therefore we have to install a cancellation handler. */ __pthread_cleanup_push (&buffer, __condvar_cleanup, &cbuffer);

/* The current values of the wakeup counter. The “woken” counter must exceed this value. / //记录futex_wait前的__wakeup_seq(为该cond上执行了多少次sign操作+timeout次数)和__broadcast_seq(代表在该cond上执行了多少次broadcast) unsigned long long int val; unsigned long long int seq; val = seq = cond->__data.__wakeup_seq; / Remember the broadcast counter. */ cbuffer.bc_seq = cond->__data.__broadcast_seq;

while (1) { //3.计算要wait的相对时间 struct timespec rt; { #ifdef __NR_clock_gettime INTERNAL_SYSCALL_DECL (err); int ret; ret = INTERNAL_VSYSCALL (clock_gettime, err, 2, (cond->__data.__nwaiters & ((1 « COND_NWAITERS_SHIFT) - 1)), &rt);

ifndef __ASSUME_POSIX_TIMERS

if (__builtin_expect (INTERNAL_SYSCALL_ERROR_P (ret, err), 0))
  {
    struct timeval tv;
    (void) gettimeofday (&tv, NULL);

    /* Convert the absolute timeout value to a relative timeout.  */
    rt.tv_sec = abstime->tv_sec - tv.tv_sec;
    rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000;
  }
else # endif
  {
    /* Convert the absolute timeout value to a relative timeout.  */
    rt.tv_sec = abstime->tv_sec - rt.tv_sec;
    rt.tv_nsec = abstime->tv_nsec - rt.tv_nsec;
  } #else /* Get the current time.  So far we support only one clock.  */ struct timeval tv; (void) gettimeofday (&tv, NULL);

/* Convert the absolute timeout value to a relative timeout.  */
rt.tv_sec = abstime->tv_sec - tv.tv_sec;
rt.tv_nsec = abstime->tv_nsec - tv.tv_usec * 1000; #endif } if (rt.tv_nsec < 0) { rt.tv_nsec += 1000000000; --rt.tv_sec; } /*---计算要wait的相对时间 end---- */

//是否超时 /* Did we already time out? */ if (__builtin_expect (rt.tv_sec < 0, 0)) { //被broadcast唤醒,这里疑问的是,为什么不需要判断__wakeup_seq? if (cbuffer.bc_seq != cond->__data.__broadcast_seq) goto bc_out;

  goto timeout;
}

  unsigned int futex_val = cond->__data.__futex;

  //4.释放cond锁,准备wait
  lll_unlock (cond->__data.__lock, pshared);

  /* Enable asynchronous cancellation.  Required by the standard.  */
  cbuffer.oldtype = __pthread_enable_asynccancel ();

  //5.调用futex_wait
  /* Wait until woken by signal or broadcast.  */
  err = lll_futex_timed_wait (&cond->__data.__futex,
			  futex_val, &rt, pshared);

  /* Disable asynchronous cancellation.  */
  __pthread_disable_asynccancel (cbuffer.oldtype);


  //6.重新获得cond锁,因为又要访问&修改cond的数据了
  lll_lock (cond->__data.__lock, pshared);

  //__broadcast_seq值发生改变,代表发生了有线程调用了广播
  if (cbuffer.bc_seq != cond->__data.__broadcast_seq)
goto bc_out;

 //判断是否是被sign唤醒的,sign会增加__wakeup_seq
 //第二个条件cond->__data.__woken_seq != val的意义在于
//可能两个线程A、B在wait,一个线程调用了sign导致A被唤醒,这时B因为超时被唤醒
//对于B线程来说,执行到这里时第一个条件也是满足的,从而导致上层拿到的result不是超时
//所以这里需要判断下__woken_seq(即该cond已经被唤醒的线程数)是否等于__wakeup_seq(sign执行次数+timeout次数)
  val = cond->__data.__wakeup_seq;
  if (val != seq && cond->__data.__woken_seq != val)
break;

  /* Not woken yet.  Maybe the time expired?  */
  if (__builtin_expect (err == -ETIMEDOUT, 0))
{
timeout:
  /* Yep.  Adjust the counters.  */
  ++cond->__data.__wakeup_seq;
  ++cond->__data.__futex;

  /* The error value.  */
  result = ETIMEDOUT;
  break;
}
}

//一个线程已经醒了所以这里__woken_seq +1 ++cond->__data.__woken_seq;

bc_out: // cond->__data.__nwaiters -= 1 « COND_NWAITERS_SHIFT;

/* If pthread_cond_destroy was called on this variable already, notify the pthread_cond_destroy caller all waiters have left and it can be successfully destroyed. */ if (cond->__data.__total_seq == -1ULL && cond->__data.__nwaiters < (1 « COND_NWAITERS_SHIFT)) lll_futex_wake (&cond->__data.__nwaiters, 1, pshared);

//9.cond数据修改完毕,释放锁 lll_unlock (cond->__data.__lock, pshared);

/* The cancellation handling is back to normal, remove the handler. */ __pthread_cleanup_pop (&buffer, 0);

//10.重新获得mutex锁 err = __pthread_mutex_cond_lock (mutex);

return err ?: result; } 上面的代码虽然加了注释,但相信大多数人第一次看都看不懂。 我们来简单梳理下,上面代码有两把锁,一把是mutex锁,一把cond锁。另外,在调用pthread_cond_timedwait前后必须调用pthread_mutex_lock(&mutex);和pthread_mutex_unlock(&mutex);加/解mutex锁。

因此pthread_cond_timedwait的使用大致分为几个流程:

加mutex锁(在pthread_cond_timedwait调用前) 加cond锁 释放mutex锁 修改cond数据 释放cond锁 执行futex_wait 重新获得cond锁 比较cond的数据,判断当前线程是被正常唤醒的还是timeout唤醒的,需不需要重新wait 修改cond数据 是否cond锁 重新获得mutex锁 释放mutex锁(在pthread_cond_timedwait调用后) 看到这里,你可能有几点疑问:为什么需要两把锁?mutex锁和cond锁的作用是什么?

mutex锁 说mutex锁的作用之前,我们回顾一下java的Object.wait的使用。Object.wait必须是在synchronized同步块中使用。试想下如果不加synchronized也能运行Object.wait的话会存在什么问题?

Object condObj=new Object(); voilate int flag = 0; public void waitTest(){ if(flag == 0){ condObj.wait(); } } public void notifyTest(){ flag=1; condObj.notify(); }

如上代码,A线程调用waitTest,这时flag==0,所以准备调用wait方法进行休眠,这时B线程开始执行,调用notifyTest将flag置为1,并调用notify方法,注意:此时A线程还没调用wait,所以notfiy没有唤醒任何线程。然后A线程继续执行,调用wait方法进行休眠,而之后不会有人来唤醒A线程,A线程将永久wait下去!

Object condObj=new Object(); voilate int flag = 0; public void waitTest(){ synchronized(condObj){ if(flag == 0){ condObj.wait(); } }

} public void notifyTest(){ synchronized(condObj){ flag=1; condObj.notify(); } }

在有锁保护下的情况下, 当调用condObj.wait时,flag一定是等于0的,不会存在一直wait的问题。

回到pthread_cond_timedwait,其需要加mutex锁的原因就呼之欲出了:保证wait和其wait条件的原子性

不管是glibc的pthread_cond_timedwait/pthread_cond_signal还是java层的Object.wait/Object.notify,Jdk AQS的Condition.await/Condition.signal,所有的Condition机制都需要在加锁环境下才能使用,其根本原因就是要保证进行线程休眠时,条件变量是没有被篡改的。

注意下mutex锁释放的时机,回顾上文中pthread_cond_timedwait的流程,在第2步时就释放了mutex锁,之后调用futex_wait进行休眠,为什么要在休眠前就释放mutex锁呢?原因也很简单:如果不释放mutex锁就开始休眠,那其他线程就永远无法调用signal方法将休眠线程唤醒(因为调用signal方法前需要获得mutex锁)。

在线程被唤醒之后还要在第10步中重新获得mutex锁是为了保证锁的语义(思考下如果不重新获得mutex锁会发生什么)。

cond锁 cond锁的作用其实很简单: 保证对象cond->data的线程安全。 在pthread_cond_timedwait时需要修改cond->data的数据,如增加__total_seq(在这个cond上一共执行过多少次wait)增加__nwaiters(现在还有多少个线程在wait这个cond),所有在修改及访问cond->data时需要加cond锁。

这里我没想明白的一点是,用mutex锁也能保证cond->data修改的线程安全,只要晚一点释放mutex锁就行了。为什么要先释放mutex,重新获得cond来保证线程安全? 是为了避免mutex锁住的范围太大吗?

该问题的答案可以见评论区@11800222 的回答:

mutex锁不能保护cond->data修改的线程安全,调用signal的线程没有用mutex锁保护修改cond的那段临界区。

pthread_cond_wait/signal这一对本身用cond锁同步就能睡眠唤醒。 wait的时候需要传入mutex是因为睡眠前需要释放mutex锁,但睡眠之前又不能有无锁的空隙,解决办法是让mutex锁在cond锁上之后再释放。 而signal前不需要释放mutex锁,在持有mutex的情况下signal,之后再释放mutex锁。

如何唤醒休眠线程 唤醒休眠线程的代码比较简单,主要就是调用lll_futex_wake。

int __pthread_cond_signal (cond) pthread_cond_t *cond; { int pshared = (cond->__data.__mutex == (void *) ~0l) ? LLL_SHARED : LLL_PRIVATE;

//因为要操作cond的数据,所以要加锁 lll_lock (cond->__data.__lock, pshared);

/* Are there any waiters to be woken? */ if (cond->__data.__total_seq > cond->__data.__wakeup_seq) { //__wakeup_seq为执行sign与timeout次数的和 ++cond->__data.__wakeup_seq; ++cond->__data.__futex;

   ...
	//唤醒wait的线程
  lll_futex_wake (&cond->__data.__futex, 1, pshared);
}

/* We are done. */ lll_unlock (cond->__data.__lock, pshared);

return 0; } End 本文对Java简单介绍了ReentrantLock实现原理,对LockSupport.park底层实现pthread_cond_timedwait机制做了详细分析。

看完这篇文章,你可能还会有疑问:Synchronized锁的实现和ReentrantLock是一样的吗?Thread.sleep/Object.wait休眠线程的原理和LockSupport.park有什么区别?linux内核层的futex的具体是如何实现的?

这些问题,之后的文章会一一解答,尽请期待~

Search

    微信好友

    博士的沙漏

    Table of Contents