java并发

1: 并发编程基础

  • 如何减少上下文切换:: 使用无锁并发编程 CAS算法 使用最少线程 使用协程
    使用无锁并发编程: 如将数据的ID按照HASH算法取摸分段,不同的线程处理不同的数据.(分段锁)
    CAS算法: Java的Atomic包使用的CAS算法来更新数据,而不需要加锁.
    使用最少线程: 避免创建不需要的线程.比如任务少的时候,但是线程数太多.
    使用协程 在单线程里实现多任务调度,并在单线程里维持多个任务的切换.
  • 避免死锁的几个方法:(1)避免一个线程同时获取多个锁.(2)避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源.(3)尝试使用定时锁,使用lock.tryLock(timeout)来代替内部锁.(4)对于数据库锁,加锁和解锁必须在一个数据库链接里.否则会出现解锁失败的情况.
  • volatile比synchronize轻量级.但只保证了可见性,没有保证原子性, 并且volatile不会引起线程的上下文切换和调度.
术语 英文单词 术语描述
内存屏障 memory barriers 是一组处理器指令,用于实现对内存操作的顺序限制
缓冲行 cache line 缓存中可以分配的最小存储单位.处理器填写缓存线时会加载整个缓存线,需要使用多个主内存读周期
原子操作 atomic operations 不可中断的一个或一系列操作
缓存行填充 cache line fill 当处理器识别到从内存中读取操作数是可缓存的,处理器读取整个缓存行到合适的缓存(L1 L2 L3的或所有)
缓存命中 cache hit 如果进行高速缓存行填充操作的内存位置仍然是下次处理器访问的地址时,处理从缓存中读取而不是从内存读取
写命中 write hit 当处理器将操作数写回到一个内存缓存区时,他首先会检查这个缓存的内存地址是否在缓存中,如果存在一个高效的缓存行,则处理器将这个操作数写回到缓存而不是写回到内存
写缺失 write misses the cache 一个有效的缓存行被写入到不存在的内存区域
  • Java中每个对象都可以作为锁, 具体表现为三种形式 (1)对于普通方法,锁是当前的实例(2)对于静态方法,锁是当前类的Class对象(3)对于同步方法块,锁是Synchronize括号里配置的对象.
  • JVM基于进入和退出Monitor对象来实现方法同步和代码同步, 代码同步: 使用monitorenter 和 monitorexit指令实现.而方法同步使用另外一种方式.细节在JVM规范里面没讲,但是,方法同步同样可以使用这两个指令来实现. monitorenter指令在编译后插入到同步代码块开始的位置,而monitorexit是插入到方法结束处和异常处
    • java1.6中 ,锁一共有四中状态: 无锁状态,偏向锁状态,轻量级锁状态和重量级状态 (依次从低到高)
  • 偏向锁:,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁. 当一个线程访问同步代码块并获取锁时,会在对象头和帧栈中记录里存储锁偏向的线程ID,以后该线程在进入和退出同步快时不需要进行CAS操作来加锁和解锁,只需要简单的测试一下对象头的MarkWord里是否存储着指向当前线程的偏向锁.如果测试成功.表示该线程已经获取锁.
    偏向锁使用了一种等到竞争出现才释放锁的机制,所以其他线程尝试竞争偏向锁时,持有偏向锁的线程才回释放,
  • 轻量级锁: 线程在执行同步代码块之前,JVM会先在当前线程的帧栈中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中. 称为: Displaced Mark Word,然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针.如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁.
优点 缺点 使用场景
偏向锁 加锁和解锁不需要额外消耗,和执行非同步方法相比存在纳秒级差别 如果线程间存在锁竞争,会带来额外的锁撤销消耗 适用于只有一个线程访问同步块的场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争的线程,会使用自旋消耗CPU 追求响应时间,同步快执行速度快
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间慢 追求吞吐量,同步快执行速度长
  • CAS实现原子操作的三大问题:(1)ABA问题(2)循环时间长开销大(3)只能保证一个贡献变量的原子操作. ABA问题在java1.5之后提供了一个AtomicStampedReference来解决ABA问题.这个类的compareAndSet首先检查当前引用是否等于预期引用,并检查当前标志是否等于预期标志.如果全部相等,则以原子方式更新.
  • 同步: 指程序中用于控制不同线程间操作发生相对顺序的机制.
  • volatile:(1) 可见性, 对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入(2)原子性: 对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性.
  • 公平锁和非公平锁:(1)公平锁和非公平锁释放时最后都要写一个volatile变量state (2)公平锁获取时,首先去读volatile变量(state) (3) 非公平锁获取时,首先会用CAS更新volatile变量,这个操作同时具有volatile读和volatile写的内存语义
  • concurrent包的实现示意图:
    ---------------------------------------------------------------
    Lock | 同步器 | 阻塞队列 | Executor | 并发容器|
    ---------------------------------------------------------------
    AQS | 非阻塞数据结构 | 原子变量类 |
    ---------------------------------------------------------------
    \ volatile变量的读/写 CAS
    ---------------------------------------------------------------
  • final域的内存语义: 对final域的读和写更像是普通变量的访问.
  • 抛出InterruptedExeception的线程SleepThread. 其中断标识位被清除,而一直忙碌的线程BusyThread,中断标志位没有清除
public class Demo {

    public static void main(String[] args) {

        Thread sleepThread = new Thread(new SleepRunner(),"SleepThread");
        sleepThread.setDaemon(true);

        Thread busyThread = new Thread(new BusyRunner(),"BusyThread");
        busyThread.setDaemon(true);

        sleepThread.start();
        busyThread.start();

        sleep(5);
        sleepThread.interrupt();
        busyThread.interrupt();

        System.out.println("Sleep: "+sleepThread.isInterrupted());
        System.out.println("Busy: "+busyThread.isInterrupted());

    }
    static class SleepRunner implements Runnable{
        @Override
        public void run() {
            while (true){
                sleep(10);
            }
        }
    }
    static class BusyRunner implements Runnable{
        @Override
        public void run() {
            while (true);
        }
    }
}

Exception in thread "SleepThread" java.lang.IllegalStateException: java.lang.InterruptedException: sleep interrupted
    at java9demo/com.java9.utils.ConcurrentUtils.sleep(ConcurrentUtils.java:34)
    at java9demo/com.java9.artcp.Demo$SleepRunner.run(Demo.java:30)
    at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.InterruptedException: sleep interrupted
    at java.base/java.lang.Thread.sleep(Native Method)
    at java.base/java.lang.Thread.sleep(Thread.java:340)
    at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:401)
    at java9demo/com.java9.utils.ConcurrentUtils.sleep(ConcurrentUtils.java:32)
    ... 2 more
Sleep: false
Busy: true

  • 过期的suspend resume stop: suspend: 在调用后,线程不会释放已经占有的资源(比如锁),而是占着资源进入睡眠状态,这样容易引发死锁问题, stop方法会在终结一个线程时不会保证线程的资源正确释放,通常是没有给予线程完成资源释放工作的机会.因此会导致程序可能工作在不确定的状态下.
    static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
    public static void main(String[] args) {

        Thread printThread = new Thread(new Runner(),"PrintThread");
        printThread.setDaemon(true);
        printThread.start();
        sleep(3);

        printThread.suspend();
        System.out.println("main suspend at: "+LocalTime.now().format(formatter));
        sleep(3);
        printThread.resume();
        System.out.println("main resume at: "+LocalTime.now().format(formatter));
        sleep(3);
        printThread.stop();
        System.out.println("main stop at: "+LocalTime.now().format(formatter));
        sleep(3);
    }
    static class Runner implements Runnable{
        @Override
        public void run() {
            while (true){
                System.out.println(Thread.currentThread().getName()+" Run at: "+ LocalTime.now().format(formatter));
                sleep(1);
            }
        }
    }
  • 终止线程: (1)中断方式,(2)利用一个boolean变量来控制是否需要停止任务并终止该线程.

    public static void main(String[] args) {
        Runner one = new Runner();
        Thread countThread = new Thread(one,"CountThread");
        countThread.start();
        sleep(1);
        countThread.interrupt();
        Runner two = new Runner();
        countThread = new Thread(two,"CountThread");
        countThread.start();
        sleep(1);
        two.cancel();
    }
    static class Runner implements Runnable{
        private long i;
        private volatile boolean on = true;
        public void cancel(){on = false;}
        @Override
        public void run() {
            while (on && !Thread.currentThread().isInterrupted()){
                i++;
            }
            System.out.println("count i= "+i);
        }
    }
    
  • 对同步快的实现使用了monitorenter 和moniterexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZE来完成的. 无论采用哪种方式.其本质是对一个对象的监视器(moniter)进行获取,而这个获取是排他的.也就是同一时刻只能有一个线程获取到synchronize所保护的监视器
public static void main(String[] args) {
        synchronized (Demo.class){

        }
        m();
    }
    public static synchronized void m(){

    }
/------------------------------------------------------------------------------------------
public com.java9.artcp.Demo();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=1, locals=1, args_size=1
         0: aload_0
         1: invokespecial #1                  // Method java/lang/Object."<init>":()V
         4: return
      LineNumberTable:
        line 3: 0
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0       5     0  this   Lcom/java9/artcp/Demo;

  public static void main(java.lang.String[]);
    descriptor: ([Ljava/lang/String;)V
    flags: ACC_PUBLIC, ACC_STATIC
    Code:
      stack=2, locals=3, args_size=1
         0: ldc           #2                  // class com/java9/artcp/Demo
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_1
         6: monitorexit
         7: goto          15
        10: astore_2
        11: aload_1
        12: monitorexit
        13: aload_2
        14: athrow
        15: invokestatic  #3                  // Method m:()V
        18: return
      Exception table:
         from    to  target type
             5     7    10   any
            10    13    10   any
      LineNumberTable:
        line 6: 0
        line 8: 5
        line 9: 15
        line 10: 18
      LocalVariableTable:
        Start  Length  Slot  Name   Signature
            0      19     0  args   [Ljava/lang/String;
      StackMapTable: number_of_entries = 2
        frame_type = 255 / full_frame /
          offset_delta = 10
          locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
          stack = [ class java/lang/Throwable ]
        frame_type = 250 / chop /
          offset_delta = 4

  public static synchronized void m();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
    Code:
      stack=0, locals=0, args_size=0
         0: return
      LineNumberTable:
        line 13: 0
}
SourceFile: "Demo.java"
  • wait and notify的两种方式: 分为 等待方(消费者)通知方(生产者)
    等待方的原则:(1) 获取对象锁 (2)如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件(3)条件满足则执行对应的逻辑.
    通知方(生产者): (1)获得对象锁(2)改变条件(3)通知所有等待在对象上的线程
`等待方(消费者)`
synchronized (对象){
    while(条件不满足){
        对象.wait();
    }
    对应的处理逻辑.
}
`通知方(生产者)`
synchronized (对象){
    改变条件
    对象.notifyAll();
}
  • Thread.join():当前线程A等待thread线程终止之后才从thread.join返回.例子是: 创建了10个线程,编号0-9,每个线程调用前一个线程的join方法,也就是线程0结束了,线程1才能从join方法中返回,而线程0需要等待main线程结束.
public static void main(String[] args) {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; ++i) {
            Thread thread = new Thread(new Domion(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
    }
    static class Domion implements Runnable{
        private Thread thread;

        public Domion(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+" terminate.");
        }
    }
/----------------------------------------------------------------------------------
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.
  • ThreadLocal的用法:例如在AOP中,可以在方法调用前的切入点执行begin而在方法调用之后切入点执行end()方法.
public class Profiler {
    private static final ThreadLocal<Long> TIME_THREADLOCAL = ThreadLocal.withInitial(System::currentTimeMillis);
    public static final void begin(){
        TIME_THREADLOCAL.set(System.currentTimeMillis());
    }
    public static final long end(){
        return System.currentTimeMillis() - TIME_THREADLOCAL.get();
    }

    public static void main(String[] args) {
        Profiler.begin();
        sleep(1);
        System.out.println("Cost: "+Profiler.end()+" mills");
    }
}

一个简单的线程池实例

public interface ThreadPool<Job extends Runnable> {

    void execute(Job job);

    void shutdown();

    void addWorkers(int num);

    void removeWorker(int num);

    int getJobSize();
}
package com.java9.artcp;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
    //线程池最大限制数
    private static final int MAX_WORKER_NUMBERS = 10;
    //线程池默认数量
    private static final int DEFAULT_WORKER_NUMBERS = 5;
    //线程池最小数量
    private static final int MIN_WORKER_NUMBERS = 5;
    //这是一个工作列表,回想里面插入工作.
    private final LinkedList<Job> jobs = new LinkedList<>();
    //工作者列表
    private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
    //工作者线程数量
    private int workerNum = DEFAULT_WORKER_NUMBERS;
    //线程编号生成
    private AtomicLong threadNum = new AtomicLong();

    public DefaultThreadPool(){
        initWorkers(DEFAULT_WORKER_NUMBERS);
    }
    public DefaultThreadPool(int num){
        workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
        initWorkers(num);
    }
    @Override
    public void execute(Job job) {
        if(job != null){
            //添加一个工作,然后进行通知
            synchronized (jobs){
                jobs.add(job);
                jobs.notify();
            }
        }
    }

    @Override
    public void shutdown() {
        workers.forEach(Worker::shutdown);
    }

    @Override
    public void addWorkers(int num) {
        synchronized (jobs){
            //限制新增的worker数量不能超过最大值
            if(num + this.workerNum > MAX_WORKER_NUMBERS){
                num = MAX_WORKER_NUMBERS - this.workerNum;
            }
            initWorkers(num);
            this.workerNum += num;
        }
    }

    @Override
    public void removeWorker(int num) {
        synchronized (jobs){
            if(num >= this.workerNum){
                throw new IllegalArgumentException("beyond workNum");
            }
            //按照给定的数量停止worker
            int count =0;
            while (count < num){
                Worker worker = workers.get(count);
                if(workers.remove(worker)){
                    worker.shutdown();
                    count++;
                }
            }
            this.workerNum -= count;
        }
    }

    @Override
    public int getJobSize() {
        return jobs.size();
    }

    //初始化线程工作者
    private void initWorkers(int num){
        for (int i = 0; i < num; ++i) {
            Worker worker = new Worker();
            workers.add(worker);
            Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet());
            thread.start();
        }
    }

    //工作者,负责消费任务
    class Worker implements Runnable{
        //是否工作
        private volatile boolean running = true;

        public void shutdown(){running = false;}

        @Override
        public void run() {
            while (running){
                Job job = null;
                synchronized (jobs){
                    //如果工作者列表为空,那么久wait
                    while (jobs.isEmpty()){
                        try {
                            jobs.wait();
                        } catch (InterruptedException e) {
                            //感知到外部对workerThread的中断操作,返回
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                    //取出一个job
                    job = jobs.removeFirst();
                }
                if(job != null){
                    try {
                        job.run();
                    } catch (Exception e) {
                        //忽略运行中的异常
                    }
                }
            }
        }
    }
}

  • 同步队列器(AbstractQueuedSynchronizer) [AQS]: 是用来固件锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同状态,通过内置的FIFO队列来完成资源获取线程的排队工作. 主要使用的方式是继承 ,子类通过继承同步器来实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态的更改.这时就需要使用同步器提供的三个方法: (getState, setState(int newState)和 compareAndSetState(int expect,int update))来进行操作,因为他们能够保证状态的改变是安全的,子类推荐被定义为自定义组件的静态内部类,同步器自身没有实现任何同步接口,他仅仅定义了若干同步状态获取和释放的方法来供自定义同步组件使用 同步器既可以支持独占式获取同步状态,也可以支持共享式的获取同步状态 --> 实现的不同形式: ReentrantLock ReentrantReadWriteLock和CountDownLatch等.
  • 同步器是实现锁的关键, 锁是面向使用者的,它自定义了使用者与锁交互的接口(比如可以允许两个线程并行访问).隐藏了实现细节 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待和唤醒等底层操作
方法名称 描述
boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后进行CAS设置同步状态
boolean tryRelase(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
boolean int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功反之获取失败
boolean tryRelaseShared(int arg) 共享式释放同步状态
boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
package com.java9.artcp;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {
    //静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer{
        //是否处于独占状态
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        //当状态为0的时候获取锁
        @Override
        protected boolean tryAcquire(int acquires) {
            if(compareAndSetState(0,1)){
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //释放锁,将状态设置为0
        @Override
        protected boolean tryRelease(int release) {
            if(getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        Condition newCondition(){return new ConditionObject();}
    }
    //仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
/---------------------------------------------------------------------------------------------------------
private static Mutex mutex = new Mutex();
    private static Condition condi1 = mutex.newCondition();
    private static Condition condi2 = mutex.newCondition();
    private static Condition condi3 = mutex.newCondition();
    private static volatile long count = 1;
    public static void main(String[] args) {
        Print1 print1 = new Print1();
        Print2 print2 = new Print2();
        Print3 print3 = new Print3();
        print3.start();
        print2.start();
        print1.start();

    }
    private static class Print1 extends Thread{
        @Override
        public void run() {
            while (true){
                mutex.lock();
                try {
                    while (count != 1){
                        condi1.await();
                    }
                    System.out.println("111");
                    sleep(1000);
                    count=2;
                    condi2.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    mutex.unlock();
                }
            }
        }
    }
    private static class Print2 extends Thread{
        @Override
        public void run() {
            while (true){
                mutex.lock();
                try {
                    while (count != 2){
                        condi2.await();
                    }
                    System.out.println("222");
                    sleep(1000);
                    count=3;
                    condi3.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    mutex.unlock();
                }
            }
        }
    }
    private static class Print3 extends Thread{
        @Override
        public void run() {
            while (true){
                mutex.lock();
                try {
                    while (count != 3){
                        condi3.await();
                    }
                    System.out.println("333");
                    sleep(1000);
                    count=1;
                    condi1.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    mutex.unlock();
                }
            }
        }
    }

在获取同步状态时,同步器维护了一个同步队列(FIFP),获取状态失败的线程都会被加入到队列中并在队列中进行自旋,移出队列(或停止自旋)的条件是前驱节点为头结点且成功获取了同步状态.在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态.然后唤醒头结点 的后继节点

  • 设计一个同步工具,该工具在同一时刻,只允许至多两个线程同时访问,超过2个线程的访问将被阻塞
public class TwinsLock implements Lock {
    private static class Sync extends AbstractQueuedSynchronizer{
        public Sync(int count){
            if(count <= 0) throw new IllegalArgumentException("count must large than zero");
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for(;;){
                int current = getState();
                int newCount = current-reduceCount;
                if(newCount<0 || compareAndSetState(current,newCount)){
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for(;;){
                int current = getState();
                int newCount = current+returnCount;
                if(compareAndSetState(current,newCount)){
                    return true;
                }
            }
        }
        Condition getCondition(){return new ConditionObject();}
    }
    private final Sync sync = new Sync(2);
    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) > 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.getCondition();
    }
}
/-------------------------------------------------------------------------------------------
private static TwinsLock lock = new TwinsLock();

    public static void main(String[] args) {
        //开启10个线程
        for (int i = 10; i < 100; ++i) {
            Worker worker=new Worker();
            worker.setDaemon(true);
            worker.setName("thread-"+i);
            worker.start();
        }
        //每隔一秒换行
        for (int i = 0; i < 10; ++i) {
            sleep(1);
            System.out.println();
        }
    }
    static class Worker extends Thread{

        @Override
        public void run() {
            while (true){
                lock.lock();
                try {
                    sleep(1000);
                    System.out.println("--> "+Thread.currentThread().getName());
                    sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            }
        }
    }
  • 阻塞队列(BlockingQueu):
方法/处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(2) put(e) offer(e,time,unit)
移除方法 remove(e) poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
  • ArrayBlockingQueue : 一个由数组组成的有界阻塞队列
  • LinkedBlockingQueue :一个由链表组成的有界阻塞队列,最大长度Integer.MAX_VALUE
  • PriorityBlockingQueue : 支持优先级排序的无界阻塞队列
  • DelayQueue : 一个一个使用优先级队列实现的无界阻塞队列
  • SynchronousQueue : 一个不存储元素的阻塞队列
  • LinkedTransferQueue: 一个由链表构成的无界阻塞队列
  • LinkedBlockingDeque 一个由链表组成的双向阻塞队列
    • ForkJoin 框架
public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(1,10000);
        ForkJoinTask<Integer> res = forkJoinPool.submit(task);
        try {
            System.out.println(res.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
    static class CountTask extends RecursiveTask<Integer>{
        private static final int THRESHOLD = 2000;//阈值
        private int start,end;

        public CountTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            //如果任务足够小就计算任务
            boolean canCompute = (end-start) <= THRESHOLD;
            if(canCompute){
                for (int i=start;i<=end;i++){
                    sum += i;
                }
            }else {
                //如果任务大于阈值,就分裂成2个子任务
                int middle = (start+end)/2;
                CountTask left = new CountTask(start,middle);
                CountTask right = new CountTask(middle+1,end);
                //执行子任务
                left.fork();right.fork();
                //等待子任务执行完,并得到其结果
                int leftResult = left.join();
                int rightResult = right.join();
                //合并子任务
                sum = leftResult + rightResult;
            }
            return sum;
        }
    }
  • 原子类 :AtomicIntegerArray : 原子更新数组 , AtomicLongArray ,AtomicReferenceArray:原子更新引用类型数组里的元素.
    需要注意的是: 数组value通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份,所以当AtomicInterArra对内部的数组元素进行修改时,不会影响传入的数组.

    static int[] value = new int[]{1,2};
    static AtomicIntegerArray ai = new AtomicIntegerArray(value);
    public static void main(String[] args) {
        ai.getAndSet(0,3);
        System.out.println(ai.get(0)); //3
        System.out.println(value[0]); //1
    }
  • CyclicBarrier 的例子:
public class CyclicBarrierDemo implements Runnable{
    //创建4个屏障,处理完之后执行当前类的run方法
    private CyclicBarrier c = new CyclicBarrier(4,this);
    private Executor executor = Executors.newFixedThreadPool(4);
    private ConcurrentHashMap<String,Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
    public  void count(){
        for (int i = 0; i < 4; ++i) {
            executor.execute(() -> {
                System.out.println("-> ");
                //计算当前的sheet的流水数据.
                sheetBankWaterCount.put(Thread.currentThread().getName(),1);
                //计算完插入一个屏障
                try {
                    c.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }
    @Override
    public void run() {
        int result = sheetBankWaterCount.values().stream().reduce(0,Integer::sum);
        //输出结果
        sheetBankWaterCount.put("result",result);
        System.out.println("result: "+result);
    }

    public static void main(String[] args) {

        CyclicBarrierDemo demo = new CyclicBarrierDemo();
        demo.count();

    }
}

时间: 2024-11-03 07:16:46

java并发的相关文章

Java并发集合的实现原理

本文简要介绍Java并发编程方面常用的类和集合,并介绍下其实现原理. AtomicInteger 可以用原子方式更新int值.类 AtomicBoolean.AtomicInteger.AtomicLong 和 AtomicReference 的实例各自提供对相应类型单个变量的访问和更新.基本的原理都是使用CAS操作: boolean compareAndSet(expectedValue, updateValue); 如果此方法(在不同的类间参数类型也不同)当前保持expectedValue,

如何使用Contemplate ThreadSafe发现并判断Java并发问题

事实证明,要发挥多核硬件所带来的收益是很困难和有风险的.当使用并发正确和安全地编写Java软件时,我们需要很仔细地进行思考.因为错误使用并发会导致偶尔才出现的缺陷,这些缺陷甚至能够躲过最严格的测试环境. 静态分析工具提供了一种方式,可以在代码执行之前探查并修正并发错误.它能够在代码执行之前分析程序的源码或编译形成的字节码,进而发现隐藏在代码之中的缺陷. Contemplate的ThreadSafe Solo是一个商用的Eclipse静态分析插件,其目的就是专门用来发现并诊断隐藏在Java程序之中

Java并发基础实践:退出任务II

在本系列的上一篇中所述的退出并发任务的方式都是基于JDK 5之前的API,本文将介绍使用由JDK 5引入的并发工具包中的API来退出任务.(2013.10.08最后更新) 在本系列的前一篇中讲述了三种退出并发任务的方式--停止线程:可取消的任务:中断,但都是基于JDK 5之前的API.本篇将介绍由JDK 5引入的java.concurrent包中的Future来取消任务的执行. 1. Future模式 Future是并发编程中的一种常见设计模式,它相当于是Proxy模式与Thread-Per-M

Java并发基础实践:退出任务I

计划写一个"Java并发基础实践"系列,算作本人对Java并发学习与实践的简单总结.本文是该系列的第一篇,介绍了退出并发任务的最简单方法. 在一个并发任务被启动之后,不要期望它总是会执行完成.由于时间限制,资源限制,用户操作,甚至是任务中的异常(尤其是运行时异常),...都可能造成任务不能执行完成.如何恰当地退出任务是一个很常见的问题,而且实现方法也不一而足. 1. 任务 创建一个并发任务,递归地获取指定目录下的所有子目录与文件的绝对路径,最后再将这些路径信息保存到一个文件中,如代码清

Java并发编程相关面试问题

基础概念 1.什么是原子操作?在Java Concurrency API中有哪些原子类(atomic classes)? 原子操作(atomic operation)意为"不可被中断的一个或一系列操作" .处理器使用基于对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作. 在Java中可以通过锁和循环CAS的方式来实现原子操作. CAS操作--Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作. 原子操作是

推荐阅读Java并发性领域编程最值得一读的力作《JAVA并发编程实践》

我的第一次之给<JAVA并发编程实践>写推荐序英文书名:Java Concurrency in Practice 中文书名:JAVA并发编程实践 这是一本入围17届Jolt大奖的书,虽然最终他没有获奖,但是这只是与政治有关的.:) 推荐序原文如下: http://book.csdn.net/bookfiles/398/10039814644.shtml 在汗牛充栋的 Java 图书堆中,关于并发性的书籍却相当稀少,然而这本书的出现,将极大地弥补了这一方面的空缺.即使并发性编程还没进入到您的 J

Java并发编程:从根源上解析volatile关键字的实现

Java并发编程:volatile关键字解析 1.解析概览 内存模型的相关概念 并发编程中的三个概念 Java内存模型 深入剖析volatile关键字 使用volatile关键字的场景 2.内存模型的相关概念 缓存一致性问题.通常称这种被多个线程访问的变量为共享变量. 也就是说,如果一个变量在多个CPU中都存在缓存(一般在多线程编程时才会出现),那么就可能存在缓存不一致的问题. 为了解决缓存不一致性问题,通常来说有以下2种解决方法: 通过在总线加LOCK#锁的方式 通过缓存一致性协议 这2种方式

Java并发编程中的生产者与消费者模型简述_java

概述对于多线程程序来说,生产者和消费者模型是非常经典的模型.更加准确的说,应该叫"生产者-消费者-仓库模型".离开了仓库,生产者.消费者就缺少了共用的存储空间,也就不存在并非协作的问题了. 示例定义一个场景.一个仓库只允许存放10件商品,生产者每次可以向其中放入一个商品,消费者可以每次从其中取出一个商品.同时,需要注意以下4点: 1.  同一时间内只能有一个生产者生产,生产方法需要加锁synchronized. 2.  同一时间内只能有一个消费者消费,消费方法需要加锁synchroni

Java 并发/多线程教程(五)-相同线程

       本系列译自jakob jenkov的Java并发多线程教程,个人觉得很有收获.由于个人水平有限,不对之处还望矫正!        相同线程是一并发框架模型,是一个单线程系统向外扩展成多个单线程的系统.这样的结果就是多个单线程并行运行. 为什么是单线程系统?         你也许会感到好奇,为什么当今还有人设计单线程系统.单线程系统之所以这么普及,是因为单线程系统相对于多线程并发系统更为简单.单线程系统不需要与其他线程共享任何数据.这就使得单线程系统可以使用非并发的数据结构,可以更

Java并发(三)使用显式的Lock和Condition对象

    在之前的Java并发(一)wait()与notifyAll()一文中的例子中,我们使用了wait()和notifyAll()来模拟了给汽车打蜡和抛光的情景.在JavaSE5中,还提供了java.util.concurrent.locks.Condition对象供我们使用.你可以在Condition上调用await()来挂起一个任务.当外部条件发生变化,意味着某个任务应该继续执行时,你可以通过调用signal()来通知这个任务,或者调用signalAll()来唤醒所有在这个Conditio