非阻塞同步算法实战(二)-BoundlessCyclicBarrier

感谢网友trytocatch的投稿

前言
相比上一 篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。

需求介绍
我需要编写一个同步工具,它需要提供这样几个方法:await、pass、cancel。某个线程调用await时,会被阻塞;当调用pass方法时,之前因为await而阻塞的线程将全部被解除阻塞,之后调用await的线程继续被阻塞,直到下一次调用pass。

该工具同时还维护一个版本号,await方法可以带一个目标版本号,如果当前的版本号比目标版本号新或相同,则直接通过,否则,阻塞本线程,直到到达或超过目标版本。调用pass的时候,更新版本号。

如果停止了版本更新,可使用cancel方法来解除所有因await而阻塞的线程,包括指定版本号的。此方法用于避免无谓地等待。若await发生在cancel之后,则仍将被阻塞。

因为CountDownLatch不允许重复使用,CyclicBarrier只支持固定个数的线程,并且都没有维护一个版本号,所以没有已有的类能实现上面的需求,需要自己实现。

问题分析
简单分析可知,应该维护一个队列,来保存当前被阻塞的线程,用于在pass时对它们一一解除阻塞,pass时应该使用一个新的队列,否则不方便正确处理pass前和pass后调用await的线程。

至此,问题的关键就明了了:如何将队列的替换和版本号的更新这两个操作做成原子的。

解决方案
以前在《JAVA并发编程实践》曾看到过这样一个小技巧,如果要原子地更新两个变量,那么可以创建一个新的类将它们封装起来,将这两个变量当定义成类成员变量,更新时,用CAS更新这个类的引用即可。

因为较为复杂,下面先给出完整的代码,再讲解其中的关键。

注意:上面所说pass,在代码中的具体实现为nextCycle,有两个版本,一个自动维护版本号,一个由调用者维护版本号。

/**

  • @author trytocatch@163.com
  • @time 2013-1-31
    */
    public class BoundlessCyclicBarrier {
    protected final AtomicReference waitQueueRef;

    public BoundlessCyclicBarrier() {
    this(0);
    }

    public BoundlessCyclicBarrier(int startVersion) {
    waitQueueRef = new AtomicReference(new VersionQueue(startVersion));
    }

    public final void awaitWithAssignedVersion(int myVersion)
    throws InterruptedException {
    awaitImpl(true, myVersion, 0);
    }

    /**
    *

    • @param myVersion
    • @param nanosTimeout
    • @return if timeout, or be canceled and doesn't reach myVersion, returns false
    • @throws InterruptedException
      */
      public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException {
      return awaitImpl(true, myVersion, nanosTimeout);
      }

    public final void await() throws InterruptedException {
    awaitImpl(false, 0, 0);
    }

    /**
    *

    • @param nanosTimeout
    • @return if and only if timeout, returns false
    • @throws InterruptedException
      */
      public final boolean await(long nanosTimeout)
      throws InterruptedException {
      return awaitImpl(false, 0, nanosTimeout);
      }

    /**

    • pass and version++(some threads may not be unparked when awaitImpl is in process, but it's OK in this Barrier)
    • @return old queue version
      */
      public int nextCycle() {
      VersionQueue oldQueue = waitQueueRef.get();
      VersionQueue newQueue = new VersionQueue(oldQueue.version + 1);
      for(;;){
      if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {
      for (Thread t : oldQueue.queue)
      LockSupport.unpark(t);
      break;
      }
      oldQueue = waitQueueRef.get();
      newQueue.version = oldQueue.version + 1;
      }
      return oldQueue.version;
      }

    /**

    • pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)
    • @param newAssignVersion
      */
      public void nextCycle(int newAssignVersion) {
      VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion));
      for (Thread t : oldQueue.queue)
      LockSupport.unpark(t);
      }

    /**

    • if version update has stopped, invoke this to awake all threads
      */
      public void cancel(){
      VersionQueue oldQueue = waitQueueRef.get();
      if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) {
      for (Thread t : oldQueue.queue)
      LockSupport.unpark(t);
      }

    public final int getVersion() {
    return waitQueueRef.get().version;
    }

    private static final class VersionQueue {
    final private ConcurrentLinkedQueue queue;
    int version;
    final boolean isCancelQueue;

    VersionQueue(int curVersion){
        this(curVersion, false);
    }
    
    VersionQueue(int curVersion, boolean isCancelQueue) {
        this.version = curVersion;
        this.isCancelQueue = isCancelQueue;
        queue = new ConcurrentLinkedQueue();
    }
    

    }

    /**
    *

    • @param assignVersion is myVersion available
    • @param myVersion wait for this version
    • @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid) * @return if timeout, or be canceled and doesn't reach myVersion, returns false * @throws InterruptedException */ protected boolean awaitImpl(boolean assignVersion, int myVersion, long nanosTimeout) throws InterruptedException { boolean timeOutEnable = nanosTimeout > 0;
      long lastTime = System.nanoTime();
      VersionQueue newQueue = waitQueueRef.get();//A
      if (assignVersion && newQueue.version - myVersion >= 0)
      return true;
      while (true) {
      VersionQueue submitQueue = newQueue;//B
      submitQueue.queue.add(Thread.currentThread());//C
      while (true) {
      newQueue = waitQueueRef.get();//D
      if (newQueue != submitQueue){//E: it's a new cycle
      if(assignVersion == false)
      return true;
      else if(newQueue.version - myVersion >= 0)
      return true;
      else if (newQueue.isCancelQueue)//F: be canceled
      return false;
      else//just like invoking awaitImpl again
      break;
      }
      if (timeOutEnable) {
      if (nanosTimeout <= 0)
      return false;
      LockSupport.parkNanos(this, nanosTimeout);
      long now = System.nanoTime();
      nanosTimeout -= now - lastTime;
      lastTime = now;
      } else
      LockSupport.park(this);
      if (Thread.interrupted())
      throw new InterruptedException();
      }
      }
      }
      }
      代码分析
      先分析一下awaitImpl方法,A和D是该方法的关键点,决定着它属于哪一个批次,对应哪一个版本。这里有个小细节,在nexeCycle,cancel解除阻塞时,该线程可能并不在队列中,因为插入队列发生在C处,这在A和D之后(虽然看起来C在D之前,但D取到的queue要在下一次循环时才被当作submitQueue),所以,在E处再进行了一次判断,开始解除阻塞时,旧队列肯定被新队列所替换,newQueue != submitQueue一定为真,就会不调用park进行阻塞了,也就不需要解除阻塞,所以即使解除阻塞时,该线程不在队列中也是没问题的。

再看E处,当进入一个新的cycle时(当前队列与提交的队列不同),a)如果没指定版本,或者到达或超过了指定版本,则返回true;b)如果当前调用了cancel,则当前队列的isCancelQueue将为true,则不继续傻等,返回false;c)或者还未到达指定版本,break,插入到当前队列中,继续等待指定版本的到达。

如果没有进入E处的IF内,则当前线程会被阻塞,直到超时,然后返回false;或被中断,然后抛出InterruptedException;或被解除阻塞,重新进行E处的判定。

这里还有个小细节,既然cancel时,把当前的队列设置了isCancelQueue,那么之后指定版本的await会不会也直接返回了呢?其实不会的,因为它若要执行F处的判断,则先必需通过E处的判定,这意味着,当前队列已经不是提交时的那个设置了isCancelQueue的队列了。

代码中对于cancel的处理,其实并不保证cancel后,之前的await都会被解除阻塞并返回,如果cancel后,紧接着又调用了nextCycle,那么可能某线程感知不到cancel的调用,唤醒后又继续等待指定的版本。cancel的目的是在于不让线程傻等,既然恢复版本更新了,那就继续等待吧。

如果自己维护版本号,则应该保证递增。另外,版本号的设计,考虑到了int溢出的情况,版本的前后判断,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,这样,版本号就相当于循环使用了,只要两个比较的版本号的差不超过int的最大值,那么都是正确的,int的最大值可是20多亿,几乎不可能出现跨度这么大的两个版本号的比较,所以,认为它是正确的。

小结
本文讲到了一个非阻塞同步算法设计时的小技巧,如果多个变量之间要维护某种特定关系,那么可以将它们封装到一个类中,再用CAS更新这个类的引用,这样就达到了:要么都被更新,要么都没被更新,保持了多个变量之间的一致性。同时需要注意的是,每次更新都必需创建新的包装对象,假如有其它更好的办法,应该避免使用该方法。

文章转自 并发编程网-ifeve.com

时间: 2024-11-16 21:58:44

非阻塞同步算法实战(二)-BoundlessCyclicBarrier的相关文章

非阻塞同步算法实战(一)

感谢trytocache投递本文. 前言 本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试.所走的弯路,经验和教训. 背景介绍 上个月,我被安排独自负责一个聊天系统的服务端,因为一些原因,我没使用现成的开源框架,网络那块直接使用AIO,收数据时,因为只会从channel里过来,所以不需要考虑同步问题:但是发送数据时,因为有聊天消息的转发,所以必需处理这个同步问题.AIO中,是处理完一个注册的操作后,再执行

非阻塞同步算法实战(三)-LatestResultsProvider

感谢trytocatch投递本文. 前言 阅读本文前,需要读者对happens-before比较熟悉,了解非阻塞同步的一些基本概念.本文主要为happens-before法则的灵活运用,和一些解决问题的小技巧,分析问题的方式. 背景介绍 原始需求为:本人当时在编写一个正则替换工具,里面会动态地显示所有的匹配结果(包括替换预览),文本.正则表达式.参数,这些数据的其中一项发生了变化,结果就应该被更新,为了提供友好的交互体验,数据变化时,应该是发起一个异步请求,由另一个独立的线程来完成运算,完成后通

【整理】Socket编程之非阻塞connect(二)

      socket api 存在一批核心接口,而这一批核心接口就是几个看似简单的函数,尽管实际上这些函数没有一个是简单.connect 函数就是这些核心接口中的一个函数,它完成主动连接的过程.  connect 函数的功能对于 TCP 来说就是完成面向连接的协议的连接过程,它的函数原型:  linux下 ? 1 2 3 #include<sys/socket.h> #include<sys/types.h> int connect(int sockfd, const stru

tornado的使用让你的异步请求非阻塞

也许有同学很迷惑:tornado不是标榜异步非阻塞解决10K问题的嘛?但是我却发现不是torando不好,而是你用错了.比如最近发现一个事情:某网站打开页面很慢,服务器cpu/内存都正常.网络状态也良好. 后来发现,打开页面会有很多请求后端数据库的访问,有一个mongodb的数据库业务api的rest服务.但是它的tornado却用错了,一步步的来研究问题: 说明 以下的例子都有2个url,一个是耗时的请求,一个是可以或者说需要立刻返回的请求,我想就算一个对技术不熟,从道理上来说的用户, 他希望

并发集合(二)使用非阻塞线程安全的列表

使用非阻塞线程安全的列表 列表(list)是最基本的集合.一个列表有不确定的元素数量,并且你可以添加.读取和删除任意位置上的元素.并发列表允许不同的线程在同一时刻对列表的元素进行添加或删除,而不会产生任何数据不一致(问题). 在这个指南中,你将学习如何在你的并发应用程序中使用非阻塞列表.非阻塞列表提供这些操作:如果操作不能立即完成(比如,你想要获取列表的元素而列表却是空的),它将根据这个操作抛出异常或返回null值.Java 7引进实现了非阻塞并发列表的ConcurrentLinkedDeque

同步与异步、阻塞与非阻塞

这是两对概念,用在不同的语境会有一些不同的含义,不能一概而论. 整体来说,同步就是两种东西通过一种机制实现步调一致,异步是两种东西不必步调一致.   一.同步调用与异步调用: 在用在调用场景中,无非是对调用结果的不同处理. 我理解同步调用就是调用一但返回,就能知道结果,而异步是返回时不一定知道结果,还得通过其他机制来获知结果,如: a.   状态b.   通知c.   回调函数 这里的同步调用不一定会阻塞,例如立即返回失败的结果. 而异步调用立即返回时,你还拿不到结果的.   二.同步线程与异步

并发、并行、同步、异步、阻塞、非阻塞

最近在写爬虫 ,对于这几个概念比较模糊,所以特意学习了一下. 进程(process):进程是具有一定独立功能的程序关于某个数据集合上的一次运行活动,进程是系统进 行资源分配和调度的一个独立单位. 线程(thread):线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的 基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存 器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源. 一个线程可以创建和撤销另一个线

Java网络编程从入门到精通(31):非阻塞I/O简介

在网络应用中,一般可以采用同步I/O(阻塞I/O)和非阻塞I/O两种方式进行数据通讯.这两种方式并非互相排斥和互相取代.我们可以在平时的应用中单独采用其中一种通讯方式,也可以混合使用这两种通讯方式.在本文中就什么是非阻塞I/O以及为什么要使用这种通讯方式进行了介绍,在下一篇文章中给出了一个简单的例子来演示在网络应用中如何使用非阻塞I/O进行通讯. 一.什么是非阻塞I/O 我们可以将同步I/O称为阻塞I/O,非阻塞I/O称为异步I/O.在本书中采用了比较常用的叫法:同步I/O和非阻塞I/O.虽然它

关于网络IO中的同步、异步、阻塞、非阻塞

在高并发编程当中,我们经常会遇到一些异步.非阻塞等一些概念,一些常用的技术比如异步的httpclient.netty nio.nginx.node.js等,它们的原理大都跟异步.非阻塞有关.特别是在服务器开发中,并发的请求处理是个大问题,阻塞式的函数会导致资源浪费和时间延迟.通过事件注册.异步函数,开发人员可以提高资源的利用率,性能也会改善.其nginx和node.js处理并发都是采用的事件驱动异步非阻塞模式.其中nginx中处理并发用的是epoll,poll,queue等方式,node.js使