Java并发编程详解

线程的基础知识

关于线程的基础知识,比如线程的创建(Thread,Runnable),进程和线程的区别,以及线程的sleep、synchronized、wait、interrupt、join、yield等方法就不详细讲解了。有需要的可以参考海子大神的文章。

线程池

创建线程池

在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:

123
Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUEExecutors.newSingleThreadExecutor();   //创建容量为1的缓冲池Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

线程池的使用

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
package map;

import java.util.Date;import java.util.concurrent.*;

/** * Created by benjamin on 12/24/15. */public class ExecutorsUse {

public static void main(String[] args) throws InterruptedException {        fixedThreadPool();        singleThreadPool();        newCachedThreadPool();        scheduledThreadPool();        singleThreadScheduledPool();        customThreadPool();    }

/**     * 固定大小的线程池     */    private static void fixedThreadPool() throws InterruptedException {        // 获取线程池最优大小        int fixNum = Runtime.getRuntime().availableProcessors() + 1;        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(fixNum);        Thread thread1 = new FixedThread();        Thread thread2 = new FixedThread();        Thread thread3 = new FixedThread();        Thread thread4 = new FixedThread();        Thread thread5 = new FixedThread();        fixedThreadPool.execute(thread1);        fixedThreadPool.execute(thread2);        fixedThreadPool.execute(thread3);        fixedThreadPool.execute(thread4);        fixedThreadPool.execute(thread5);

// 关闭线程池,不让其他线程加入,但是不终止线程的运行        fixedThreadPool.shutdown();        // 会等待线程池的线程都执行结束,才执行下面的语句        fixedThreadPool.awaitTermination(5, TimeUnit.MINUTES);        System.out.println("线程执行结束...");

/**         * 执行结果:         * pool-1-thread-1正在执行第: 0次         * pool-1-thread-4正在执行第: 0次         * pool-1-thread-3正在执行第: 0次         * pool-1-thread-5正在执行第: 0次         * pool-1-thread-2正在执行第: 0次         * pool-1-thread-5正在执行第: 1次         * pool-1-thread-3正在执行第: 1次         * pool-1-thread-3正在执行第: 2次         * pool-1-thread-4正在执行第: 1次         * pool-1-thread-4正在执行第: 2次         * pool-1-thread-1正在执行第: 1次         * pool-1-thread-5正在执行第: 2次         * pool-1-thread-2正在执行第: 1次         * pool-1-thread-2正在执行第: 2次         * pool-1-thread-1正在执行第: 2次         * 线程执行结束...         */    }

/**     * 单任务线程池     */    private static void singleThreadPool() throws InterruptedException {        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();        Thread thread1 = new FixedThread();        Thread thread2 = new FixedThread();        Thread thread3 = new FixedThread();        Thread thread4 = new FixedThread();        Thread thread5 = new FixedThread();        singleThreadExecutor.execute(thread1);        singleThreadExecutor.execute(thread2);        singleThreadExecutor.execute(thread3);        singleThreadExecutor.execute(thread4);        singleThreadExecutor.execute(thread5);

singleThreadExecutor.shutdown();        // 关闭线程池,不让其他线程加入,但是不终止线程的运行        singleThreadExecutor.shutdown();        // 会等待线程池的线程都执行结束,才执行下面的语句        singleThreadExecutor.awaitTermination(5, TimeUnit.MINUTES);        System.out.println("线程执行结束...");

/**         * 执行结果:         * pool-1-thread-1正在执行第: 0次         * pool-1-thread-1正在执行第: 1次         * pool-1-thread-1正在执行第: 2次         * pool-1-thread-1正在执行第: 0次         * pool-1-thread-1正在执行第: 1次         * pool-1-thread-1正在执行第: 2次         * pool-1-thread-1正在执行第: 0次         * pool-1-thread-1正在执行第: 1次         * pool-1-thread-1正在执行第: 2次         * pool-1-thread-1正在执行第: 0次         * pool-1-thread-1正在执行第: 1次         * pool-1-thread-1正在执行第: 2次         * pool-1-thread-1正在执行第: 0次         * pool-1-thread-1正在执行第: 1次         * pool-1-thread-1正在执行第: 2次         * 线程执行结束...         */    }

/**     * 可变尺寸的线程池     */    private static void newCachedThreadPool() throws InterruptedException {        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();        Thread thread1 = new FixedThread();        Thread thread2 = new FixedThread();        Thread thread3 = new FixedThread();        Thread thread4 = new FixedThread();        Thread thread5 = new FixedThread();        newCachedThreadPool.execute(thread1);        newCachedThreadPool.execute(thread2);        newCachedThreadPool.execute(thread3);        newCachedThreadPool.execute(thread4);        newCachedThreadPool.execute(thread5);

newCachedThreadPool.shutdown();        // 关闭线程池,不让其他线程加入,但是不终止线程的运行        newCachedThreadPool.shutdown();        // 会等待线程池的线程都执行结束,才执行下面的语句        newCachedThreadPool.awaitTermination(5, TimeUnit.MINUTES);        System.out.println("线程执行结束...");

/**         * pool-1-thread-1正在执行第: 0次         * pool-1-thread-5正在执行第: 0次         * pool-1-thread-4正在执行第: 0次         * pool-1-thread-3正在执行第: 0次         * pool-1-thread-2正在执行第: 0次         * pool-1-thread-2正在执行第: 1次         * pool-1-thread-3正在执行第: 1次         * pool-1-thread-4正在执行第: 1次         * pool-1-thread-4正在执行第: 2次         * pool-1-thread-5正在执行第: 1次         * pool-1-thread-5正在执行第: 2次         * pool-1-thread-1正在执行第: 1次         * pool-1-thread-3正在执行第: 2次         * pool-1-thread-2正在执行第: 2次         * pool-1-thread-1正在执行第: 2次         * 线程执行结束...         */    }

/**     * 延迟连接池     * @throws InterruptedException     */    private static void scheduledThreadPool() throws InterruptedException {        //创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5);        Thread thread1 = new FixedThread();        Thread thread2 = new FixedThread();        Thread thread3 = new FixedThread();        Thread thread4 = new FixedThread();        Thread thread5 = new FixedThread();        scheduledExecutorService.schedule(thread1, 5, TimeUnit.SECONDS);        scheduledExecutorService.schedule(thread2, 5, TimeUnit.SECONDS);        scheduledExecutorService.schedule(thread3, 5, TimeUnit.SECONDS);        scheduledExecutorService.schedule(thread4, 5, TimeUnit.SECONDS);        scheduledExecutorService.schedule(thread5, 5, TimeUnit.SECONDS);        System.out.println("现在开始的时间是: " + new Date());        // 关闭线程池,不让其他线程加入,但是不终止线程的运行        scheduledExecutorService.shutdown();        // 会等待线程池的线程都执行结束,才执行下面的语句        scheduledExecutorService.awaitTermination(5, TimeUnit.MINUTES);        System.out.println("线程执行结束...");

/**         * 现在开始的时间是: Thu Dec 24 18:05:44 CST 2015         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-4正在执行第: 0次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-1正在执行第: 0次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-2正在执行第: 0次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-2正在执行第: 1次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-1正在执行第: 1次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-3正在执行第: 0次         * pool-1-thread-4正在执行第: 1次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-1正在执行第: 2次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-2正在执行第: 2次         * pool-1-thread-5正在执行第: 0次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-4正在执行第: 2次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-5正在执行第: 1次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-5正在执行第: 2次         * pool-1-thread-3正在执行第: 1次         * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____         * pool-1-thread-3正在执行第: 2次         * 线程执行结束...         */    }

/**     * 单任务延迟连接池     */    private static void singleThreadScheduledPool() {        ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor();        /**         * 其他都一样,这里就不写了         */    }

/**     * 自定义连接池     * 自定义连接池稍微麻烦些,不过通过创建的ThreadPoolExecutor线程池对象,     * 可以获取到当前线程池的尺寸、正在执行任务的线程数、工作队列等等。     */    private static void customThreadPool() {        // 创建等待队列        BlockingQueue bqueue = new ArrayBlockingQueue(20);        // 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 2, TimeUnit.MILLISECONDS, bqueue);        // ThreadPoolExecutor参数解释        // corePoolSize - 池中所保存的线程数,包括空闲线程。        // maximumPoolSize - 池中允许的最大线程数。        // keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。        // unit - keepAliveTime 参数的时间单位。        // workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。        /**         * 其他都一样,这里就不写了         */    }

}

class FixedThread extends Thread {    @Override    public void run() {        for (int i = 0; i < 3; i ++) {            System.out.println("现在运行的时间是: " + new Date() + "_____");            System.out.println(Thread.currentThread().getName() + "正在执行第: " + i + "次");        }    }}

在工作中如何设置线程池的大小呢?

设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。
幸运的是,要设置线程池的大小也并不困难,只需要避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空间的处理器无法执行工作,从而降低吞吐率。
要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU?多大的内存?任务是计算密集型、I/O密集型还是二者皆可?它们是否需要像JDBC连接这样的稀缺资源?如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为 Ncpu+1 时,通常能实现最优的利用率。
线程池的最优大小等于:

Nthreads = Ncpu * Ucpu * (1+W/C)

给定下列定义:

Ncpu是CPU的数目,一般可以通过这个公式获取
int N_CPUS = Runtime.getRuntime().availableProcessors()

Ucpu:CPU的利用率,范围为 0<=Ucpu<=1
W/C:是等待时间和计算时间的比值

一般需要根据任务的类型来配置线程池大小:

  如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

  如果是IO密集型任务,参考值可以设置为2*NCPU

  当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

Callable和Future以及FutureTask的实例应用

  
    有的时候我们的应用需要拿到线程执行完毕后的返回值,这个时候就需要用到Callable和Future以及FutureTask了。下面是一个实例,copy就可以运行,也可以看实例有详细注释说明。
  

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
package map;

import java.util.Random;import java.util.concurrent.*;

/** * Created by benjamin on 12/25/15. */public class CallableAndFuture {

public static void main(String[] args) throws ExecutionException, InterruptedException {        testCallableAndFuture();        testCallableAndFuture2();        testCallableAndFuture3();        testCallableAndFutureTask();    }

/**     * 一个简单地Callable 和 future的使用简介     * @throws ExecutionException     * @throws InterruptedException     */    private static void testCallableAndFuture() throws ExecutionException, InterruptedException {        Callable<Integer> call = new Callable<Integer>() {            public Integer call() throws Exception {                Thread.sleep(1000);                return new Random().nextInt(100);            }        };

FutureTask<Integer> future = new FutureTask<Integer>(call);        new Thread(future).start();        System.out.println("执行开始");        // 一直等到拿到值才继续往下走        System.out.println(future.get());        System.out.println("执行结束");

/**         * result         * 执行开始           24           执行结束         */    }

/**     * 在线程池中使用Callable来执行对应的任务.     * @throws ExecutionException     * @throws InterruptedException     */    private static void testCallableAndFuture2() throws ExecutionException, InterruptedException {        ExecutorService executorService = Executors.newCachedThreadPool();        Future<Integer> future = executorService.submit(new Callable<Integer>() {            public Integer call() throws Exception {                Thread.sleep(1000);                return new Random().nextInt(100);            }        });        System.out.println("线程开始");        System.out.println(future.get());        System.out.println("线程结束");

/**         * 执行结果和上面一样         */    }

/**     * 执行多个callable 和 future任务,并且得到返回值     * @throws InterruptedException     * @throws ExecutionException     */    private static void testCallableAndFuture3() throws InterruptedException, ExecutionException {        ExecutorService threadPool = Executors.newCachedThreadPool();        CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(threadPool);        for (int i = 1; i < 5; i++) {            final int taskID = i;            cs.submit(new Callable<Integer>() {                public Integer call() throws Exception {                    System.out.println("子线程在进行计算");                    Thread.sleep(taskID * 1000);                    return taskID;                }            });        }        System.out.println("线程开始");        for (int i = 1; i < 5; i++) {            System.out.println(cs.take().get());        }        System.out.println("线程结束");

/**         * 执行的结果         * 子线程在进行计算           子线程在进行计算           线程开始           子线程在进行计算           子线程在进行计算           1 (每隔一秒输出下一个数,这表明get()是有一个值输出就立刻返回再继续等待下面的输出,最后全部输出完成向下执行)           2           3           4           线程结束         */    }

/**     * futureTask的使用     * FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口     * 所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。     */    private static void testCallableAndFutureTask() {        ExecutorService threadPool = Executors.newCachedThreadPool();        Task task = new Task();        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);        threadPool.submit(futureTask);        threadPool.shutdown();

try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }

System.out.println("主线程在执行任务");

try {            System.out.println("task执行结果: " + futureTask.get());        } catch (InterruptedException e) {            e.printStackTrace();        } catch (ExecutionException e) {            e.printStackTrace();        }

System.out.println("所有任务执行完毕");    }}

/** * 任务task */class Task implements Callable<Integer> {

public Integer call() throws Exception {        System.out.println("子线程在进行计算");        Thread.sleep(3000);        int sum = 0;        for (int i = 0; i < 100; i++) sum += i;        return sum;    }}

参考文章: 海子大神

时间: 2024-09-12 13:18:20

Java并发编程详解的相关文章

Java多线程编程详解

编程|多线程|详解 一:理解多线程多线程是这样一种机制,它允许在程序中并发执行多个指令流,每个指令流都称为一个线程,彼此间互相独立. 线程又称为轻量级进程,它和进程一样拥有独立的执行控制,由操作系统负责调度,区别在于线程没有独立的存储空间,而是和所属进程中的其它线程共享一个存储空间,这使得线程间的通信远较进程简单.多个线程的执行是并发的,也就是在逻辑上"同时",而不管是否是物理上的"同时".如果系统只有一个CPU,那么真正的"同时"是不可

php的socket编程详解_php技巧

php的socket编程算是比较难以理解的东西吧,不过,我们只要理解socket几个函数之间的关系,以及它们所扮演的角色,那么理解起来应该不是很难了,在笔者看来,socket编程,其实就是建立一个网络服务的客户端和服务端,这和mysql的客户端和服务端是一样的,你只要理解mysql的客户端和服务端是怎么一回事,你就应该能够理解下面我要讲的东西吧. 关于socket编程所涉及到的网络协议,什么TCP啊,UDP啊,什么socket三次握手等等,这些网络协议网上有很详细的解释,这里不讲,只截个sock

PHP SOCKET编程详解_php技巧

1. 预备知识 一直以来很少看到有多少人使用php的socket模块来做一些事情,大概大家都把它定位在脚本语言的范畴内吧,但是其实php的socket模块可以做很多事情,包括做ftplist,http post提交,smtp提交,组包并进行特殊报文的交互(如smpp协议),whois查询.这些都是比较常见的查询. 特别是php的socket扩展库可以做的事情简直不会比c差多少. php的socket连接函数 1.集成于内核的socket 这个系列的函数仅仅只能做主动连接无法实现端口监听相关的功能

python之Socket网络编程详解_python

什么是网络? 网络是由节点和连线构成,表示诸多对象及其相互联系.在数学上,网络是一种图,一般认为专指加权图.网络除了数学定义外,还有具体的物理含义,即网络是从某种相同类型的实际问题中抽象出来的模型.在计算机领域中,网络是信息传输.接收.共享的虚拟平台,通过它把各个点.面.体的信息联系到一起,从而实现这些资源的共享.网络是人类发展史来最重要的发明,提高了科技和人类社会的发展. 网络通信的三要素 IP地址 用来表示一台独立的主机 特殊的IP地址 127.0.0.1或称localhost(表示本地回环

java 多线程-锁详解及示例代码_java

自 Java 5 开始,java.util.concurrent.locks 包中包含了一些锁的实现,因此你不用去实现自己的锁了.但是你仍然需要去了解怎样使用这些锁. 一个简单的锁 让我们从 java 中的一个同步块开始: public class Counter{ private int count = 0; public int inc(){ synchronized(this){ return ++count; } } } 可以看到在 inc()方法中有一个 synchronized(th

Java并发编程总结——慎用CAS详解_java

一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

JAVA并发编程有界缓存的实现详解_java

JAVA并发编程有界缓存的实现 1.有界缓存的基类 package cn.xf.cp.ch14; /** * *功能:有界缓存实现基类 *时间:下午2:20:00 *文件:BaseBoundedBuffer.java *@author Administrator * * @param <V> */ public class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head

linux多线程编程详解教程

 这篇文章主要介绍了linux多线程编程详解教程,提供线程通过信号量实现通信的代码,大家参考使用吧 线程分类   线程按照其调度者可以分为用户级线程和核心级线程两种.   (1)用户级线程  用户级线程主要解决的是上下文切换的问题,它的调度算法和调度过程全部由用户自行选择决定,在运行时不需要特定的内核支持.在这里,操作系统往往会提供一个用户空间的线程库,该线程库提供了线程的创建.调度.撤销等功能,而内核仍然仅对进程进行管理.如果一个进程中的某一个线程调用了一个阻塞的系统调用,那么该进程包括该进程

Java并发控制机制详解_java

在一般性开发中,笔者经常看到很多同学在对待java并发开发模型中只会使用一些基础的方法.比如Volatile,synchronized.像Lock和atomic这类高级并发包很多人并不经常使用.我想大部分原因都是来之于对原理的不属性导致的.在繁忙的开发工作中,又有谁会很准确的把握和使用正确的并发模型呢? 所以最近基于这个思想,本人打算把并发控制机制这部分整理成一篇文章.既是对自己掌握知识的一个回忆,也是希望这篇讲到的类容能帮助到大部分开发者.  并行程序开发不可避免地要涉及多线程.多任务的协作和