tips:如果只是想看AQS的实现的话可以从第三节开始看,前面只是讲结构和使用
在开始了解AQS之前,先看下AQS的内部结构,这样在看实现代码的时候至少有一个整体的概念,重点要记住的是Node类几种状态的作用,其他结构有个概念就行。
如上,在AQS中大致有:
在AQS中,内部类有两个:Node和ConditionObject。Node是队列的实现根基,里面存放了许多重要的信息,如操作的线程、线程竞争的状态(特别重要)等;而ConditionObject则是Condition接口的实现类,用来实现唤醒指定线程组的(等待队列)。
关系如下图(下方的Waiter节点也是Node节点,这里为了便于区分取名不同):
Node内部类:AQS两个队列的实现节点。
0:初始状态或者不代表任何意义时的取值。
SIGNAL(-1):这个状态一般由下一个节点来设置,代表的意思是当前节点在释放了资源后将后续节点的线程唤醒。(大白话就是后续节点拜托前方的大哥东西用完了叫他,他先去睡会儿)
CONDITION(-2):表示节点处于等待队列中,等待队列中的节点不会参与资源竞争,必须从等待队列出来后重新加入同步队列才能参与竞争。
PROPAGATE(-3):在共享模式的时候用到。共享模式下,不仅只是唤醒下个节点,还可能唤醒下下个节点(根据当前剩余资源state的值能否满足最近节点的需求决定)。
CANCELLED(1):表示该节点没用了,可能是等太久了,也可能是其他原因,总之就是废了,处于该状态的节点不会再改变,所以AQS中经常会判断节点状态是否大于0来检查节点是否还有用。
现在对AQS有了模模糊糊的了解,来看看要如何使用这个框架。其采用模板设计模式实现,定义了许多顶级方法如acquire、release等,这些方法子类不能重写但是可以调用,而要正确的使用这些方法则要按照其要求重写一些方法如tryAcquire(顶级方法内部调用了开放方法)。
可以重写的方法有tryAcquire、tryRelease、tryAcquireShared、tryReleaseShared、isHeldExclusively共五种,每个方法里面没有具体的实现,反而是直接抛出了异常,但是不一定要全部重写,比方说只重写tryAcquire、tryRelease则表示要实现的是独占模式的锁。
如果只是使用AQS的话,再加上几个变更状态的方法就可以了,我们不需要了解更多的东西,如同AQS的文档给出的案例一般,简单的重写几个方法便可以实现一种锁,如下,一个不可重入锁的简单实现。
publicstaticvoidmain(String[]args){Locklock=newMutex();newThread(()->{lock.lock();try{System.err.println("获得锁线程名:"+Thread.currentThread().getName());TimeUnit.SECONDS.sleep(3);System.err.println("3秒过去....");}catch(InterruptedExceptione){e.printStackTrace();}finally{lock.unlock();System.err.println(Thread.currentThread().getName()+"释放锁");}}).start();newThread(()->{lock.lock();try{System.err.println("获得锁线程名:"+Thread.currentThread().getName());TimeUnit.SECONDS.sleep(3);System.err.println("3秒过去....");}catch(InterruptedExceptione){e.printStackTrace();}finally{lock.unlock();System.err.println(Thread.currentThread().getName()+"释放锁");}}).start();}最终的结果图如下
这样就实现了一个不可重入锁,是不是看起来很简单?
首先要先明白的是AQS分为两种模式——独占模式和共享模式。一般来说只会用到其中一种,两种模式的资源竞争都是在同步队列中发生的,不要跟等待队列混淆。
独占模式:每次只能允许一个节点获取到资源,每次释放资源之后也只会唤醒后驱节点。
共享模式:每次可以允许多个节点按照顺序获取资源,每次释放头节点资源后可能会唤醒后驱的后驱。(下方讲实现的时候有解释)
来看acquire方法(如果讲的不是容易让人理解,可以结合后方的流程图一起),ReentrantLock中lock就是这个方法,可以类比理解。
在看代码需要明确知道的是,tryAcquire和tryRelease这些操作才是对资源的获取和释放,AQS中的顶级方法如acquire的作用只是对资源获取操作之后的处理。
//代码逻辑不复杂,首先尝试获取资源,如果成功则直接返回,失败则加入同步队列争夺资源publicfinalvoidacquire(intarg){//尝试获得锁,如果失败了则增加节点放入等待队列中if(!tryAcquire(arg)&&acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}可以看到整体的方法十分简单,就在一个if条件中调用了3个方法,tryAcquire就不说了,先说下addWaiter做了什么,addWaiter方法将当前线程封装成一个节点放入同步队列的尾部,如果失败就不断的尝试直到成功为止,其方法代码如下。
privateNodeaddWaiter(Nodemode){//将当前线程封装入一个节点之中,mode代表共享模式还是独占模式Nodenode=newNode(Thread.currentThread(),mode);//首先尝试一次快速的尾随其后,如果失败的话则采用正常方式入队Nodepred=tail;if(pred!=null){node.prev=pred;if(compareAndSetTail(pred,node)){pred.next=node;returnnode;}}//入队操作enq(node);returnnode;}再看下正常的入队操作
privateNodeenq(finalNodenode){//自旋for(;;){Nodet=tail;//如果同步队列是空的话则进行队列的初始化if(t==null){//这里注意初始化的时候head是一个新增的Node,其waitStatus为0if(compareAndSetHead(newNode()))tail=head;}else{//否则的话尝试设置尾节点,失败的话重新循环node.prev=t;if(compareAndSetTail(t,node)){t.next=node;returnt;}}}}可以看出正常入队比快速入队也就多出来了自旋和初始化操作,其他的大致逻辑都是相似的。再看看acquire中的另一个方法acquireQueued。
首先明确这个方法是不断自旋不会退出的,除非成功拿到资源,如果拿不到资源就挂起等待。(不考虑特殊情况)
整个流程的逻辑:
再回来看下acquire方法
publicfinalvoidacquire(intarg){if(!tryAcquire(arg)&&//根据返回的中断标识决定是否执行下方的自我中断acquireQueued(addWaiter(Node.EXCLUSIVE),arg))selfInterrupt();}整个acquire的流程大致为
publicfinalvoidacquireInterruptibly(intarg)throwsInterruptedException{if(Thread.interrupted())//抛出异常处理thrownewInterruptedException();if(!tryAcquire(arg))doAcquireInterruptibly(arg);}3.2独占式释放资源——release了解完获取资源自然知道释放资源的过程,相对来说释放资源要相对容易一些,大致逻辑为尝试释放资源,如果成功了,则改变节点的状态并且唤醒下一个可用节点(一般是下一个,但是可能出现下一个节点已经被取消的情况)
publicfinalbooleanrelease(intarg){if(tryRelease(arg)){Nodeh=head;if(h!=null&&h.waitStatus!=0)//修改线程的状态,并且唤醒下一个节点进行资源竞争unparkSuccessor(h);returntrue;}returnfalse;}privatevoidunparkSuccessor(Nodenode){//改变节点状态intws=node.waitStatus;if(ws<0)compareAndSetWaitStatus(node,ws,0);/**唤醒下一个可用节点,一般来说是下一个节点,但是可能出现下个节点被取消*或者为空的情况,这个时候就要从尾结点向前遍历直到找到有效的节点(从尾节点向前遍历*是因为无论下个节点是空还是取消的节点,正向遍历都不可能走得通了,取消的节点的next*就是其本身,所以只能从后面开始往前遍历)*/Nodes=node.next;if(s==null||s.waitStatus>0){s=null;for(Nodet=tail;t!=null&&t!=node;t=t.prev)if(t.waitStatus<=0)s=t;}//找到下个节点之后将其唤醒if(s!=null)LockSupport.unpark(s.thread);}release的流程图如下:
在上面我们讲的都是独占模式的获取和释放处理,那接下来看看共享模式是怎么实现的。首先理解AQS中共享模式的概念,其代表资源可以被队列中的多个节点按照顺序获得,什么意思呢?
举个例子,我们设置资源变量为3(state=3),首先头结点使用tryAcquireShared(1)获取到了一个资源,那么还剩下2个,这两个可以给头结点的后驱节点使用,如果后驱节点的需求是2那么获取成功并将自己设置为头结点同时断开跟原头结点的连接,但是如果需求是3的话则进入等待状态直到可获取的资源量达到其要求为止,这时就算后续的需求量是1也不会给后续节点,这就是按照顺序获得的意思。例子图如下:
okay,那来看下共享模式下的实现,先看acquireShared方法:判断资源是否获取成功,是的话直接结束,不是的话进入队列进行资源竞争。需要注意的是tryAcquireShared返回值的语义:负值代表失败,其他代表成功并且当前还可获取的资源量。
publicfinalvoidacquireShared(intarg){if(tryAcquireShared(arg)<0)doAcquireShared(arg);}看看doAcquireShared做了什么
//还是强调一次,这些方法只是善后处理,资源的获取还是在tryAcquireShared方法privatevoiddoAcquireShared(intarg){/**整个流程跟acquire方法有些类似,不同点是其获取到资源后*会唤醒后驱线程*///加入队列尾,不再赘述finalNodenode=addWaiter(Node.SHARED);booleanfailed=true;try{//同样记录一个打断标识booleaninterrupted=false;for(;;){//前驱节点finalNodep=node.predecessor();if(p==head){//如果前驱节点是头结点,那么尝试一次获取资源,根据其返回的值判断执行不同操作intr=tryAcquireShared(arg);if(r>=0){//非负值代表资源获取成功,将自己设为头结点后唤醒后驱节点争取资源setHeadAndPropagate(node,r);p.next=null;//helpGC//跟acquire不同的是,其补打断的地方在方法内层,不再放外面if(interrupted)selfInterrupt();failed=false;//处理结束后就退出了return;}}//这里跟acquire一样,判断是否可以休息,休息后被唤醒后补充interrupted标识if(shouldParkAfterFailedAcquire(p,node)&&parkAndCheckInterrupt())interrupted=true;}}finally{if(failed)cancelAcquire(node);}}看看获取资源成功后对后续节点的操作
/***@paramnode当前节点*@parampropagate当前剩余的资源量*/privatevoidsetHeadAndPropagate(Nodenode,intpropagate){//记录原头结点Nodeh=head;//注意这里设置头结点的变化,这里要结合3.3一开始的例子图来理解/**setHead方法体:*head=node;*node.thread=null;*node.prev=null;*/setHead(node);//此时头结点已经变为当前节点了/**存在以下三种情况时唤醒当前节点后驱节点*1.剩余资源量>0*2.node的原前驱节点(即原头节点)释放了资源,==null表示释放完被回收了,<0则表示PROPAGATION*状态,释放之后会将节点状态设置为PROPAGATION*3.头结点可能再次发生了改变并且也释放了资源(竞争激烈的时候发生)*/if(propagate>0||h==null||h.waitStatus<0||(h=head)==null||h.waitStatus<0){Nodes=node.next;if(s==null||s.isShared())//叫醒后续节点争夺资源,这个方法是释放方法的主要方法,放在下节讲doReleaseShared();}}okay,到这里就是共享模式的acquireShared方法,总结一下逻辑:
线程被唤醒后重复2操作,以下是流程图:
直接上代码吧
publicfinalbooleanreleaseShared(intarg){if(tryReleaseShared(arg)){//这个方法理解为唤醒,不要理解为释放资源doReleaseShared();returntrue;}returnfalse;}看看唤醒方法做了啥子
privatevoiddoReleaseShared(){for(;;){Nodeh=head;if(h!=null&&h!=tail){intws=h.waitStatus;//根据节点状态判断执行什么操作if(ws==Node.SIGNAL){/**如果是SIGNAL那么表示其后驱节点处于挂起的状态*使用CAS改变状态后唤醒后驱节点,失败则再次循环(说明被其他线程先执行了该方法)*/if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))continue;//唤醒线程,前面已经说过,不再赘述unparkSuccessor(h);}//将当前节点设置为PROPAGATE,失败则再次循环elseif(ws==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))continue;}//如果头节点改变了,说明唤醒操作是其他线程做的,此时要再次循环if(h==head)break;}}共享模式的release方法在我们看过之前的方法后就简单得多了,这里就不再画流程图了,到此AQS的两个模式和实现暂时告一段落。
两个模式的实现思路大致是相同的,但是方式不同,独占模式每次只允许一个节点获取到资源,而共享模式则允许多个节点按照顺序获取;双方释放后的善后操作也不同,独占模式只唤醒后驱节点,而共享模式则可能唤醒后驱的后驱(资源充足的情况)。