1: 并发编程基础
如何减少上下文切换:
: 使用无锁并发编程
CAS算法
使用最少线程
使用协程
使用无锁并发编程:
如将数据的ID按照HASH算法取摸分段,不同的线程处理不同的数据.(分段锁
)
CAS算法:
Java的Atomic包使用的CAS算法来更新数据,而不需要加锁.
使用最少线程:
避免创建不需要的线程.比如任务少的时候,但是线程数太多.
使用协程
在单线程里实现多任务调度,并在单线程里维持多个任务的切换.
避免死锁的几个方法:
(1)避免一个线程同时获取多个锁.(2)避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源.(3)尝试使用定时锁,使用lock.tryLock(timeout)
来代替内部锁.(4)对于数据库锁,加锁和解锁必须在一个数据库链接里.否则会出现解锁失败的情况.
volatile比synchronize轻量级.但只保证了可见性,没有保证原子性, 并且volatile不会引起线程的上下文切换和调度.
术语 |
英文单词 |
术语描述 |
内存屏障 |
memory barriers |
是一组处理器指令,用于实现对内存操作的顺序限制 |
缓冲行 |
cache line |
缓存中可以分配的最小存储单位.处理器填写缓存线时会加载整个缓存线,需要使用多个主内存读周期 |
原子操作 |
atomic operations |
不可中断的一个或一系列操作 |
缓存行填充 |
cache line fill |
当处理器识别到从内存中读取操作数是可缓存的,处理器读取整个缓存行到合适的缓存(L1 L2 L3的或所有) |
缓存命中 |
cache hit |
如果进行高速缓存行填充操作的内存位置仍然是下次处理器访问的地址时,处理从缓存中读取而不是从内存读取 |
写命中 |
write hit |
当处理器将操作数写回 到一个内存缓存区时,他首先会检查这个缓存的内存地址是否在缓存中,如果存在一个高效的缓存行,则处理器将这个操作数写回到缓存而不是写回到内存
|
写缺失 |
write misses the cache |
一个有效的缓存行被写入到不存在的内存区域 |
Java中每个对象都可以作为锁,
具体表现为三种形式 (1)对于普通方法,锁是当前的实例(2)对于静态方法,锁是当前类的Class对象(3)对于同步方法块,锁是Synchronize括号里配置的对象
.
JVM基于进入和退出Monitor对象来实现方法同步和代码同步
, 代码同步: 使用monitorenter 和 monitorexit指令实现.而方法同步使用另外一种方式.细节在JVM规范里面没讲,但是,方法同步同样可以使用这两个指令来实现. monitorenter指令在编译后插入到同步代码块开始的位置,而monitorexit是插入到方法结束处和异常处
java1.6中 ,锁一共有四中状态: 无锁状态,偏向锁状态,轻量级锁状态和重量级状态
(依次从低到高)
偏向锁:
,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁. 当一个线程访问同步代码块并获取锁时,会在对象头和帧栈中记录里存储锁偏向的线程ID,以后该线程在进入和退出同步快时不需要进行CAS操作来加锁和解锁,只需要简单的测试一下对象头的MarkWord里是否存储着指向当前线程的偏向锁.如果测试成功.表示该线程已经获取锁.
偏向锁使用了一种等到竞争出现才释放锁的机制,所以其他线程尝试竞争偏向锁时,持有偏向锁的线程才回释放,
轻量级锁
: 线程在执行同步代码块之前,JVM会先在当前线程的帧栈中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中. 称为: Displaced Mark Word
,然后线程尝试使用CAS将对象头中的Mark Word替换为指向锁记录的指针.如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋
来获取锁.
锁 |
优点 |
缺点 |
使用场景 |
偏向锁 |
加锁和解锁不需要额外消耗,和执行非同步方法相比存在纳秒级差别 |
如果线程间存在锁竞争,会带来额外的锁撤销消耗 |
适用于只有一个线程访问同步块的场景 |
轻量级锁 |
竞争的线程不会阻塞,提高了程序的响应速度 |
如果始终得不到锁竞争的线程,会使用自旋消耗CPU |
追求响应时间,同步快执行速度快 |
重量级锁 |
线程竞争不使用自旋,不会消耗CPU |
线程阻塞,响应时间慢 |
追求吞吐量,同步快执行速度长 |
CAS实现原子操作的三大问题:
(1)ABA问题(2)循环时间长开销大(3)只能保证一个贡献变量的原子操作. ABA问题
在java1.5之后提供了一个AtomicStampedReference来解决ABA问题.这个类的compareAndSet首先检查当前引用是否等于预期引用,并检查当前标志是否等于预期标志.如果全部相等,则以原子方式更新.
同步
: 指程序中用于控制不同线程间操作发生相对顺序的机制.
volatile
:(1) 可见性, 对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入(2)原子性: 对任意单个volatile变量的读/写具有原子性,但类似于volatile++这种复合操作不具有原子性.
公平锁和非公平锁
:(1)公平锁和非公平锁释放时最后都要写一个volatile变量state (2)公平锁获取时,首先去读volatile变量(state) (3) 非公平锁获取时,首先会用CAS更新volatile变量,这个操作同时具有volatile读和volatile写的内存语义
concurrent包的实现示意图:
---------------------------------------------------------------
Lock | 同步器 | 阻塞队列 | Executor | 并发容器|
---------------------------------------------------------------
AQS | 非阻塞数据结构 | 原子变量类 |
---------------------------------------------------------------
\ volatile变量的读/写 CAS
---------------------------------------------------------------
final域的内存语义
: 对final域的读和写更像是普通变量的访问.
抛出InterruptedExeception的线程SleepThread. 其中断标识位被清除,而一直忙碌的线程BusyThread,中断标志位没有清除
public class Demo {
public static void main(String[] args) {
Thread sleepThread = new Thread(new SleepRunner(),"SleepThread");
sleepThread.setDaemon(true);
Thread busyThread = new Thread(new BusyRunner(),"BusyThread");
busyThread.setDaemon(true);
sleepThread.start();
busyThread.start();
sleep(5);
sleepThread.interrupt();
busyThread.interrupt();
System.out.println("Sleep: "+sleepThread.isInterrupted());
System.out.println("Busy: "+busyThread.isInterrupted());
}
static class SleepRunner implements Runnable{
@Override
public void run() {
while (true){
sleep(10);
}
}
}
static class BusyRunner implements Runnable{
@Override
public void run() {
while (true);
}
}
}
Exception in thread "SleepThread" java.lang.IllegalStateException: java.lang.InterruptedException: sleep interrupted
at java9demo/com.java9.utils.ConcurrentUtils.sleep(ConcurrentUtils.java:34)
at java9demo/com.java9.artcp.Demo$SleepRunner.run(Demo.java:30)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.InterruptedException: sleep interrupted
at java.base/java.lang.Thread.sleep(Native Method)
at java.base/java.lang.Thread.sleep(Thread.java:340)
at java.base/java.util.concurrent.TimeUnit.sleep(TimeUnit.java:401)
at java9demo/com.java9.utils.ConcurrentUtils.sleep(ConcurrentUtils.java:32)
... 2 more
Sleep: false
Busy: true
过期的suspend resume stop
: suspend
: 在调用后,线程不会释放已经占有的资源(比如锁),而是占着资源进入睡眠状态,这样容易引发死锁问题
, stop
方法会在终结一个线程时不会保证线程的资源正确释放,通常是没有给予线程完成资源释放工作的机会.因此会导致程序可能工作在不确定的状态下.
static DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss");
public static void main(String[] args) {
Thread printThread = new Thread(new Runner(),"PrintThread");
printThread.setDaemon(true);
printThread.start();
sleep(3);
printThread.suspend();
System.out.println("main suspend at: "+LocalTime.now().format(formatter));
sleep(3);
printThread.resume();
System.out.println("main resume at: "+LocalTime.now().format(formatter));
sleep(3);
printThread.stop();
System.out.println("main stop at: "+LocalTime.now().format(formatter));
sleep(3);
}
static class Runner implements Runnable{
@Override
public void run() {
while (true){
System.out.println(Thread.currentThread().getName()+" Run at: "+ LocalTime.now().format(formatter));
sleep(1);
}
}
}
终止线程
: (1)中断方式,(2)利用一个boolean变量来控制是否需要停止任务并终止该线程.
public static void main(String[] args) {
Runner one = new Runner();
Thread countThread = new Thread(one,"CountThread");
countThread.start();
sleep(1);
countThread.interrupt();
Runner two = new Runner();
countThread = new Thread(two,"CountThread");
countThread.start();
sleep(1);
two.cancel();
}
static class Runner implements Runnable{
private long i;
private volatile boolean on = true;
public void cancel(){on = false;}
@Override
public void run() {
while (on && !Thread.currentThread().isInterrupted()){
i++;
}
System.out.println("count i= "+i);
}
}
对同步快的实现使用了monitorenter 和moniterexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZE来完成的.
无论采用哪种方式.其本质是对一个对象的监视器(moniter)进行获取,而这个获取是排他的.也就是同一时刻只能有一个线程获取到synchronize所保护的监视器
public static void main(String[] args) {
synchronized (Demo.class){
}
m();
}
public static synchronized void m(){
}
/------------------------------------------------------------------------------------------
public com.java9.artcp.Demo();
descriptor: ()V
flags: ACC_PUBLIC
Code:
stack=1, locals=1, args_size=1
0: aload_0
1: invokespecial #1 // Method java/lang/Object."<init>":()V
4: return
LineNumberTable:
line 3: 0
LocalVariableTable:
Start Length Slot Name Signature
0 5 0 this Lcom/java9/artcp/Demo;
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class com/java9/artcp/Demo
2: dup
3: astore_1
4: monitorenter
5: aload_1
6: monitorexit
7: goto 15
10: astore_2
11: aload_1
12: monitorexit
13: aload_2
14: athrow
15: invokestatic #3 // Method m:()V
18: return
Exception table:
from to target type
5 7 10 any
10 13 10 any
LineNumberTable:
line 6: 0
line 8: 5
line 9: 15
line 10: 18
LocalVariableTable:
Start Length Slot Name Signature
0 19 0 args [Ljava/lang/String;
StackMapTable: number_of_entries = 2
frame_type = 255 / full_frame /
offset_delta = 10
locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 / chop /
offset_delta = 4
public static synchronized void m();
descriptor: ()V
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=0, args_size=0
0: return
LineNumberTable:
line 13: 0
}
SourceFile: "Demo.java"
wait and notify的两种方式
: 分为 等待方(消费者)
和通知方(生产者)
等待方的原则:
(1) 获取对象锁 (2)如果条件不满足,那么调用对象的wait方法,被通知后仍要检查条件(3)条件满足则执行对应的逻辑.
通知方(生产者)
: (1)获得对象锁(2)改变条件(3)通知所有等待在对象上的线程
`等待方(消费者)`
synchronized (对象){
while(条件不满足){
对象.wait();
}
对应的处理逻辑.
}
`通知方(生产者)`
synchronized (对象){
改变条件
对象.notifyAll();
}
Thread.join()
:当前线程A等待thread线程终止之后才从thread.join返回.例子是: 创建了10个线程,编号0-9,每个线程调用前一个线程的join方法,也就是线程0结束了,线程1才能从join方法中返回,而线程0需要等待main线程结束.
public static void main(String[] args) {
Thread previous = Thread.currentThread();
for (int i = 0; i < 10; ++i) {
Thread thread = new Thread(new Domion(previous), String.valueOf(i));
thread.start();
previous = thread;
}
}
static class Domion implements Runnable{
private Thread thread;
public Domion(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" terminate.");
}
}
/----------------------------------------------------------------------------------
0 terminate.
1 terminate.
2 terminate.
3 terminate.
4 terminate.
5 terminate.
6 terminate.
7 terminate.
8 terminate.
9 terminate.
ThreadLocal的用法:
例如在AOP中,可以在方法调用前
的切入点执行begin而在方法调用之后切入点执行end()方法.
public class Profiler {
private static final ThreadLocal<Long> TIME_THREADLOCAL = ThreadLocal.withInitial(System::currentTimeMillis);
public static final void begin(){
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static final long end(){
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
public static void main(String[] args) {
Profiler.begin();
sleep(1);
System.out.println("Cost: "+Profiler.end()+" mills");
}
}
一个简单的线程池实例
public interface ThreadPool<Job extends Runnable> {
void execute(Job job);
void shutdown();
void addWorkers(int num);
void removeWorker(int num);
int getJobSize();
}
package com.java9.artcp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> {
//线程池最大限制数
private static final int MAX_WORKER_NUMBERS = 10;
//线程池默认数量
private static final int DEFAULT_WORKER_NUMBERS = 5;
//线程池最小数量
private static final int MIN_WORKER_NUMBERS = 5;
//这是一个工作列表,回想里面插入工作.
private final LinkedList<Job> jobs = new LinkedList<>();
//工作者列表
private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>());
//工作者线程数量
private int workerNum = DEFAULT_WORKER_NUMBERS;
//线程编号生成
private AtomicLong threadNum = new AtomicLong();
public DefaultThreadPool(){
initWorkers(DEFAULT_WORKER_NUMBERS);
}
public DefaultThreadPool(int num){
workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initWorkers(num);
}
@Override
public void execute(Job job) {
if(job != null){
//添加一个工作,然后进行通知
synchronized (jobs){
jobs.add(job);
jobs.notify();
}
}
}
@Override
public void shutdown() {
workers.forEach(Worker::shutdown);
}
@Override
public void addWorkers(int num) {
synchronized (jobs){
//限制新增的worker数量不能超过最大值
if(num + this.workerNum > MAX_WORKER_NUMBERS){
num = MAX_WORKER_NUMBERS - this.workerNum;
}
initWorkers(num);
this.workerNum += num;
}
}
@Override
public void removeWorker(int num) {
synchronized (jobs){
if(num >= this.workerNum){
throw new IllegalArgumentException("beyond workNum");
}
//按照给定的数量停止worker
int count =0;
while (count < num){
Worker worker = workers.get(count);
if(workers.remove(worker)){
worker.shutdown();
count++;
}
}
this.workerNum -= count;
}
}
@Override
public int getJobSize() {
return jobs.size();
}
//初始化线程工作者
private void initWorkers(int num){
for (int i = 0; i < num; ++i) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker,"ThreadPool-Worker-"+threadNum.incrementAndGet());
thread.start();
}
}
//工作者,负责消费任务
class Worker implements Runnable{
//是否工作
private volatile boolean running = true;
public void shutdown(){running = false;}
@Override
public void run() {
while (running){
Job job = null;
synchronized (jobs){
//如果工作者列表为空,那么久wait
while (jobs.isEmpty()){
try {
jobs.wait();
} catch (InterruptedException e) {
//感知到外部对workerThread的中断操作,返回
Thread.currentThread().interrupt();
return;
}
}
//取出一个job
job = jobs.removeFirst();
}
if(job != null){
try {
job.run();
} catch (Exception e) {
//忽略运行中的异常
}
}
}
}
}
}
同步队列器(AbstractQueuedSynchronizer) [AQS]
: 是用来固件锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同状态,通过内置的FIFO队列来完成资源获取线程的排队工作. 主要使用的方式是继承
,子类通过继承同步器来实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态的更改.这时就需要使用同步器提供的三个方法: (getState, setState(int newState)和 compareAndSetState(int expect,int update))来进行操作,因为他们能够保证状态的改变是安全的,子类推荐被定义为自定义组件的静态内部类,同步器自身没有实现任何同步接口,他仅仅定义了若干同步状态获取和释放的方法来供自定义同步组件使用
同步器既可以支持独占式获取同步状态,也可以支持共享式的获取同步状态
--> 实现的不同形式: ReentrantLock ReentrantReadWriteLock和CountDownLatch等.
同步器是实现锁的关键, 锁是面向使用者的,它自定义了使用者与锁交互的接口(比如可以允许两个线程并行访问).隐藏了实现细节
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理,线程的排队,等待和唤醒等底层操作
方法名称 |
描述 |
boolean tryAcquire(int arg) |
独占式 获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后进行CAS设置同步状态 |
boolean tryRelase(int arg) |
独占式 释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
boolean int tryAcquireShared(int arg) |
共享式 获取同步状态,返回大于等于0的值,表示获取成功反之获取失败 |
boolean tryRelaseShared(int arg) |
共享式 释放同步状态 |
boolean isHeldExclusively() |
当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
package com.java9.artcp;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
//静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer{
//是否处于独占状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
//当状态为0的时候获取锁
@Override
protected boolean tryAcquire(int acquires) {
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//释放锁,将状态设置为0
@Override
protected boolean tryRelease(int release) {
if(getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition(){return new ConditionObject();}
}
//仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
/---------------------------------------------------------------------------------------------------------
private static Mutex mutex = new Mutex();
private static Condition condi1 = mutex.newCondition();
private static Condition condi2 = mutex.newCondition();
private static Condition condi3 = mutex.newCondition();
private static volatile long count = 1;
public static void main(String[] args) {
Print1 print1 = new Print1();
Print2 print2 = new Print2();
Print3 print3 = new Print3();
print3.start();
print2.start();
print1.start();
}
private static class Print1 extends Thread{
@Override
public void run() {
while (true){
mutex.lock();
try {
while (count != 1){
condi1.await();
}
System.out.println("111");
sleep(1000);
count=2;
condi2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
mutex.unlock();
}
}
}
}
private static class Print2 extends Thread{
@Override
public void run() {
while (true){
mutex.lock();
try {
while (count != 2){
condi2.await();
}
System.out.println("222");
sleep(1000);
count=3;
condi3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
mutex.unlock();
}
}
}
}
private static class Print3 extends Thread{
@Override
public void run() {
while (true){
mutex.lock();
try {
while (count != 3){
condi3.await();
}
System.out.println("333");
sleep(1000);
count=1;
condi1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
mutex.unlock();
}
}
}
}
在获取同步状态时,同步器维护了一个同步队列(FIFP),获取状态失败的线程都会被加入到队列中并在队列中进行自旋,移出队列(或停止自旋)的条件是前驱节点为头结点且成功获取了同步状态.在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态.然后唤醒头结点 的后继节点
设计一个同步工具,该工具在同一时刻,只允许至多两个线程同时访问,超过2个线程的访问将被阻塞
public class TwinsLock implements Lock {
private static class Sync extends AbstractQueuedSynchronizer{
public Sync(int count){
if(count <= 0) throw new IllegalArgumentException("count must large than zero");
setState(count);
}
@Override
protected int tryAcquireShared(int reduceCount) {
for(;;){
int current = getState();
int newCount = current-reduceCount;
if(newCount<0 || compareAndSetState(current,newCount)){
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int returnCount) {
for(;;){
int current = getState();
int newCount = current+returnCount;
if(compareAndSetState(current,newCount)){
return true;
}
}
}
Condition getCondition(){return new ConditionObject();}
}
private final Sync sync = new Sync(2);
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) > 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.getCondition();
}
}
/-------------------------------------------------------------------------------------------
private static TwinsLock lock = new TwinsLock();
public static void main(String[] args) {
//开启10个线程
for (int i = 10; i < 100; ++i) {
Worker worker=new Worker();
worker.setDaemon(true);
worker.setName("thread-"+i);
worker.start();
}
//每隔一秒换行
for (int i = 0; i < 10; ++i) {
sleep(1);
System.out.println();
}
}
static class Worker extends Thread{
@Override
public void run() {
while (true){
lock.lock();
try {
sleep(1000);
System.out.println("--> "+Thread.currentThread().getName());
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
lock.unlock();
}
}
}
}
方法/处理方式 |
抛出异常 |
返回特殊值 |
一直阻塞 |
超时退出 |
插入方法 |
add(e) |
offer(2) |
put(e) |
offer(e,time,unit) |
移除方法 |
remove(e) |
poll() |
take() |
poll(time,unit) |
检查方法 |
element() |
peek() |
不可用 |
不可用 |
ArrayBlockingQueue
: 一个由数组组成的有界阻塞队列
LinkedBlockingQueue
:一个由链表组成的有界阻塞队列,最大长度Integer.MAX_VALUE
PriorityBlockingQueue
: 支持优先级排序的无界阻塞
队列
DelayQueue
: 一个一个使用优先级队列实现的无界阻塞队列
SynchronousQueue
: 一个不存储元素的阻塞队列
LinkedTransferQueue:
一个由链表构成的无界阻塞队列
LinkedBlockingDeque
一个由链表组成的双向阻塞队列
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
CountTask task = new CountTask(1,10000);
ForkJoinTask<Integer> res = forkJoinPool.submit(task);
try {
System.out.println(res.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static class CountTask extends RecursiveTask<Integer>{
private static final int THRESHOLD = 2000;//阈值
private int start,end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end-start) <= THRESHOLD;
if(canCompute){
for (int i=start;i<=end;i++){
sum += i;
}
}else {
//如果任务大于阈值,就分裂成2个子任务
int middle = (start+end)/2;
CountTask left = new CountTask(start,middle);
CountTask right = new CountTask(middle+1,end);
//执行子任务
left.fork();right.fork();
//等待子任务执行完,并得到其结果
int leftResult = left.join();
int rightResult = right.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}
}
原子类
:AtomicIntegerArray : 原子更新数组 , AtomicLongArray ,AtomicReferenceArray:原子更新引用类型数组里的元素.
需要注意的是: 数组value通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份,所以当AtomicInterArra对内部的数组元素进行修改时,不会影响传入的数组.
static int[] value = new int[]{1,2};
static AtomicIntegerArray ai = new AtomicIntegerArray(value);
public static void main(String[] args) {
ai.getAndSet(0,3);
System.out.println(ai.get(0)); //3
System.out.println(value[0]); //1
}
public class CyclicBarrierDemo implements Runnable{
//创建4个屏障,处理完之后执行当前类的run方法
private CyclicBarrier c = new CyclicBarrier(4,this);
private Executor executor = Executors.newFixedThreadPool(4);
private ConcurrentHashMap<String,Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
public void count(){
for (int i = 0; i < 4; ++i) {
executor.execute(() -> {
System.out.println("-> ");
//计算当前的sheet的流水数据.
sheetBankWaterCount.put(Thread.currentThread().getName(),1);
//计算完插入一个屏障
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
@Override
public void run() {
int result = sheetBankWaterCount.values().stream().reduce(0,Integer::sum);
//输出结果
sheetBankWaterCount.put("result",result);
System.out.println("result: "+result);
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.count();
}
}
时间: 2024-11-03 07:16:46