同样是修改数据,一个采用加锁的方式保证原子性,一个采用CAS的方式保证原子性。
都是能够达到目的的,但是常用的锁(例如显式的Lock和隐式的synchonized),都会把获取不到锁的线程挂起,相对于CAS的不挂起,多了挂起和唤醒的开销。
题外话:CAS与锁的关系
CAS只是在这个场景下,比使用锁来得更纯粹,因为只做数据更新,所以开销更少。但是业务上为了保证一系列操作的原子性,还是要使用锁的。而且锁的底层实现,也依赖于类似于CAS这样的原子性操作。
别的帖子都说RingBuffer中不维护尾指针,尾指针由消费者维护(所谓维护指针,就是修改、移动指针)其实这一句话有点误导性,如果RingBuffer不知道尾部在哪里,那它的数据存储肯定就会出问题,例如把还没消费过的数据给覆盖了。
确实,消费者会自行维护自己的消费指针(消费者指针是消费者消费过的最后一条数据的序号,下一篇中会详细讲到),RingBuffer也不会去干涉消费者指针的维护,但是它会引用所有消费者的指针,读取他们的值,以此作为“尾部”的判断依据。实际上就是最慢的那个消费者作为边界
我们直接来看代码,这个是RingBuffer的publishEvent方法,我们看到,它首先取得一个可用的序列号,然后再将数据放入该序列号的对应位置中。
@OverridepublicvoidpublishEvent(EventTranslator
这里附上几个图可能更好理解:(右边是后续补充的用“画图”画的,对单元格添加一些颜色进行区分)
情况1:队列已满,生产者尝试使用新序号14,但由于(14-8=6),由于最慢的消费者目前消费的最后一条数据的序号是5,5号之后的数据还没被消费,6>5,所以序号14还不能用。生产者线程挂起,下次再次尝试。
情况2:消费者1消费了序号6的数据。(14-8=6)不大于6,这时序号14可用,生产者得到可用的序号。
RingBuffer如何知道有哪些消费者?哪些gatingSequense是从哪里来的?
在构建RingBuffer注册处理类的时候,就将消费者Sequense注册到RingBuffer中了。
看代码的话,定位到gatingSequences在AbastractSequencer,对应的有个addGatingSequenses方法用于注入gatingSequence
publicabstractclassAbstractSequencerimplementsSequencer{//...protectedvolatileSequence[]gatingSequences=newSequence[0];@OverridepublicfinalvoidaddGatingSequences(Sequence...gatingSequences){SequenceGroups.addSequences(this,SEQUENCE_UPDATER,this,gatingSequences);}//...}再查看addGatingSequences被调用的地方,即通过RingBuffer的方法,设置到Sequencer中,这个Sequencer是生产者使用的序号管理器
publicfinalclassRingBuffer
publicclassDisruptor
我们看到在SiingleProducerSequencer的next方法中,会缓存上一次的消费者最小序列号,这有什么用呢?
用途就是不需要每次都读取各消费者的序号,只要没超过上一次的最小值的地方都可以直接分配,如果超过了,则进行再次判断
为啥读取最小值不需要保证原子性?
看了这个获取最小消费序号的,可能会奇怪,为啥这个操作不需要上锁,这个不是会获取到旧值吗?
确实,这个最小值获取到的时候,实际上数值已经变更。但是由于我们的目的是为了防止指针越位,所以用旧值是没有问题的。(旧值<=实际上的最小值)
publicstaticlonggetMinimumSequence(finalSequence[]sequences,longminimum){for(inti=0,n=sequences.length;i