线程的基础知识
关于线程的基础知识,比如线程的创建(Thread,Runnable),进程和线程的区别,以及线程的sleep、synchronized、wait、interrupt、join、yield等方法就不详细讲解了。有需要的可以参考海子大神的文章。
线程池
创建线程池
在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池:
123 |
Executors.newCachedThreadPool(); //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUEExecutors.newSingleThreadExecutor(); //创建容量为1的缓冲池Executors.newFixedThreadPool(int); //创建固定容量大小的缓冲池 |
线程池的使用
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
package map; import java.util.Date;import java.util.concurrent.*; /** * Created by benjamin on 12/24/15. */public class ExecutorsUse { public static void main(String[] args) throws InterruptedException { fixedThreadPool(); singleThreadPool(); newCachedThreadPool(); scheduledThreadPool(); singleThreadScheduledPool(); customThreadPool(); } /** * 固定大小的线程池 */ private static void fixedThreadPool() throws InterruptedException { // 获取线程池最优大小 int fixNum = Runtime.getRuntime().availableProcessors() + 1; ExecutorService fixedThreadPool = Executors.newFixedThreadPool(fixNum); Thread thread1 = new FixedThread(); Thread thread2 = new FixedThread(); Thread thread3 = new FixedThread(); Thread thread4 = new FixedThread(); Thread thread5 = new FixedThread(); fixedThreadPool.execute(thread1); fixedThreadPool.execute(thread2); fixedThreadPool.execute(thread3); fixedThreadPool.execute(thread4); fixedThreadPool.execute(thread5); // 关闭线程池,不让其他线程加入,但是不终止线程的运行 fixedThreadPool.shutdown(); // 会等待线程池的线程都执行结束,才执行下面的语句 fixedThreadPool.awaitTermination(5, TimeUnit.MINUTES); System.out.println("线程执行结束..."); /** * 执行结果: * pool-1-thread-1正在执行第: 0次 * pool-1-thread-4正在执行第: 0次 * pool-1-thread-3正在执行第: 0次 * pool-1-thread-5正在执行第: 0次 * pool-1-thread-2正在执行第: 0次 * pool-1-thread-5正在执行第: 1次 * pool-1-thread-3正在执行第: 1次 * pool-1-thread-3正在执行第: 2次 * pool-1-thread-4正在执行第: 1次 * pool-1-thread-4正在执行第: 2次 * pool-1-thread-1正在执行第: 1次 * pool-1-thread-5正在执行第: 2次 * pool-1-thread-2正在执行第: 1次 * pool-1-thread-2正在执行第: 2次 * pool-1-thread-1正在执行第: 2次 * 线程执行结束... */ } /** * 单任务线程池 */ private static void singleThreadPool() throws InterruptedException { ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); Thread thread1 = new FixedThread(); Thread thread2 = new FixedThread(); Thread thread3 = new FixedThread(); Thread thread4 = new FixedThread(); Thread thread5 = new FixedThread(); singleThreadExecutor.execute(thread1); singleThreadExecutor.execute(thread2); singleThreadExecutor.execute(thread3); singleThreadExecutor.execute(thread4); singleThreadExecutor.execute(thread5); singleThreadExecutor.shutdown(); // 关闭线程池,不让其他线程加入,但是不终止线程的运行 singleThreadExecutor.shutdown(); // 会等待线程池的线程都执行结束,才执行下面的语句 singleThreadExecutor.awaitTermination(5, TimeUnit.MINUTES); System.out.println("线程执行结束..."); /** * 执行结果: * pool-1-thread-1正在执行第: 0次 * pool-1-thread-1正在执行第: 1次 * pool-1-thread-1正在执行第: 2次 * pool-1-thread-1正在执行第: 0次 * pool-1-thread-1正在执行第: 1次 * pool-1-thread-1正在执行第: 2次 * pool-1-thread-1正在执行第: 0次 * pool-1-thread-1正在执行第: 1次 * pool-1-thread-1正在执行第: 2次 * pool-1-thread-1正在执行第: 0次 * pool-1-thread-1正在执行第: 1次 * pool-1-thread-1正在执行第: 2次 * pool-1-thread-1正在执行第: 0次 * pool-1-thread-1正在执行第: 1次 * pool-1-thread-1正在执行第: 2次 * 线程执行结束... */ } /** * 可变尺寸的线程池 */ private static void newCachedThreadPool() throws InterruptedException { ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); Thread thread1 = new FixedThread(); Thread thread2 = new FixedThread(); Thread thread3 = new FixedThread(); Thread thread4 = new FixedThread(); Thread thread5 = new FixedThread(); newCachedThreadPool.execute(thread1); newCachedThreadPool.execute(thread2); newCachedThreadPool.execute(thread3); newCachedThreadPool.execute(thread4); newCachedThreadPool.execute(thread5); newCachedThreadPool.shutdown(); // 关闭线程池,不让其他线程加入,但是不终止线程的运行 newCachedThreadPool.shutdown(); // 会等待线程池的线程都执行结束,才执行下面的语句 newCachedThreadPool.awaitTermination(5, TimeUnit.MINUTES); System.out.println("线程执行结束..."); /** * pool-1-thread-1正在执行第: 0次 * pool-1-thread-5正在执行第: 0次 * pool-1-thread-4正在执行第: 0次 * pool-1-thread-3正在执行第: 0次 * pool-1-thread-2正在执行第: 0次 * pool-1-thread-2正在执行第: 1次 * pool-1-thread-3正在执行第: 1次 * pool-1-thread-4正在执行第: 1次 * pool-1-thread-4正在执行第: 2次 * pool-1-thread-5正在执行第: 1次 * pool-1-thread-5正在执行第: 2次 * pool-1-thread-1正在执行第: 1次 * pool-1-thread-3正在执行第: 2次 * pool-1-thread-2正在执行第: 2次 * pool-1-thread-1正在执行第: 2次 * 线程执行结束... */ } /** * 延迟连接池 * @throws InterruptedException */ private static void scheduledThreadPool() throws InterruptedException { //创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); Thread thread1 = new FixedThread(); Thread thread2 = new FixedThread(); Thread thread3 = new FixedThread(); Thread thread4 = new FixedThread(); Thread thread5 = new FixedThread(); scheduledExecutorService.schedule(thread1, 5, TimeUnit.SECONDS); scheduledExecutorService.schedule(thread2, 5, TimeUnit.SECONDS); scheduledExecutorService.schedule(thread3, 5, TimeUnit.SECONDS); scheduledExecutorService.schedule(thread4, 5, TimeUnit.SECONDS); scheduledExecutorService.schedule(thread5, 5, TimeUnit.SECONDS); System.out.println("现在开始的时间是: " + new Date()); // 关闭线程池,不让其他线程加入,但是不终止线程的运行 scheduledExecutorService.shutdown(); // 会等待线程池的线程都执行结束,才执行下面的语句 scheduledExecutorService.awaitTermination(5, TimeUnit.MINUTES); System.out.println("线程执行结束..."); /** * 现在开始的时间是: Thu Dec 24 18:05:44 CST 2015 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-4正在执行第: 0次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-1正在执行第: 0次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-2正在执行第: 0次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-2正在执行第: 1次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-1正在执行第: 1次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-3正在执行第: 0次 * pool-1-thread-4正在执行第: 1次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-1正在执行第: 2次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-2正在执行第: 2次 * pool-1-thread-5正在执行第: 0次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-4正在执行第: 2次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-5正在执行第: 1次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-5正在执行第: 2次 * pool-1-thread-3正在执行第: 1次 * 现在运行的时间是: Thu Dec 24 18:05:49 CST 2015_____ * pool-1-thread-3正在执行第: 2次 * 线程执行结束... */ } /** * 单任务延迟连接池 */ private static void singleThreadScheduledPool() { ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor(); /** * 其他都一样,这里就不写了 */ } /** * 自定义连接池 * 自定义连接池稍微麻烦些,不过通过创建的ThreadPoolExecutor线程池对象, * 可以获取到当前线程池的尺寸、正在执行任务的线程数、工作队列等等。 */ private static void customThreadPool() { // 创建等待队列 BlockingQueue bqueue = new ArrayBlockingQueue(20); // 创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。 ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 2, TimeUnit.MILLISECONDS, bqueue); // ThreadPoolExecutor参数解释 // corePoolSize - 池中所保存的线程数,包括空闲线程。 // maximumPoolSize - 池中允许的最大线程数。 // keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。 // unit - keepAliveTime 参数的时间单位。 // workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute 方法提交的 Runnable 任务。 /** * 其他都一样,这里就不写了 */ } } class FixedThread extends Thread { @Override public void run() { for (int i = 0; i < 3; i ++) { System.out.println("现在运行的时间是: " + new Date() + "_____"); System.out.println(Thread.currentThread().getName() + "正在执行第: " + i + "次"); } }} |
在工作中如何设置线程池的大小呢?
设置线程池的大小
线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中通常不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。
幸运的是,要设置线程池的大小也并不困难,只需要避免“过大”和“过小”这两种极端情况。如果线程池过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果线程池过小,那么将导致许多空间的处理器无法执行工作,从而降低吞吐率。
要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。在部署的系统中有多少个CPU?多大的内存?任务是计算密集型、I/O密集型还是二者皆可?它们是否需要像JDBC连接这样的稀缺资源?如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据各自的工作负载来调整。
对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为 Ncpu+1 时,通常能实现最优的利用率。
线程池的最优大小等于:
Nthreads = Ncpu * Ucpu * (1+W/C)
给定下列定义:
Ncpu是CPU的数目,一般可以通过这个公式获取
int N_CPUS = Runtime.getRuntime().availableProcessors()Ucpu:CPU的利用率,范围为 0<=Ucpu<=1
W/C:是等待时间和计算时间的比值
一般需要根据任务的类型来配置线程池大小:
如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1
如果是IO密集型任务,参考值可以设置为2*NCPU
当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。
Callable和Future以及FutureTask的实例应用
有的时候我们的应用需要拿到线程执行完毕后的返回值,这个时候就需要用到Callable和Future以及FutureTask了。下面是一个实例,copy就可以运行,也可以看实例有详细注释说明。
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
package map; import java.util.Random;import java.util.concurrent.*; /** * Created by benjamin on 12/25/15. */public class CallableAndFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { testCallableAndFuture(); testCallableAndFuture2(); testCallableAndFuture3(); testCallableAndFutureTask(); } /** * 一个简单地Callable 和 future的使用简介 * @throws ExecutionException * @throws InterruptedException */ private static void testCallableAndFuture() throws ExecutionException, InterruptedException { Callable<Integer> call = new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(1000); return new Random().nextInt(100); } }; FutureTask<Integer> future = new FutureTask<Integer>(call); new Thread(future).start(); System.out.println("执行开始"); // 一直等到拿到值才继续往下走 System.out.println(future.get()); System.out.println("执行结束"); /** * result * 执行开始 24 执行结束 */ } /** * 在线程池中使用Callable来执行对应的任务. * @throws ExecutionException * @throws InterruptedException */ private static void testCallableAndFuture2() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); Future<Integer> future = executorService.submit(new Callable<Integer>() { public Integer call() throws Exception { Thread.sleep(1000); return new Random().nextInt(100); } }); System.out.println("线程开始"); System.out.println(future.get()); System.out.println("线程结束"); /** * 执行结果和上面一样 */ } /** * 执行多个callable 和 future任务,并且得到返回值 * @throws InterruptedException * @throws ExecutionException */ private static void testCallableAndFuture3() throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newCachedThreadPool(); CompletionService<Integer> cs = new ExecutorCompletionService<Integer>(threadPool); for (int i = 1; i < 5; i++) { final int taskID = i; cs.submit(new Callable<Integer>() { public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(taskID * 1000); return taskID; } }); } System.out.println("线程开始"); for (int i = 1; i < 5; i++) { System.out.println(cs.take().get()); } System.out.println("线程结束"); /** * 执行的结果 * 子线程在进行计算 子线程在进行计算 线程开始 子线程在进行计算 子线程在进行计算 1 (每隔一秒输出下一个数,这表明get()是有一个值输出就立刻返回再继续等待下面的输出,最后全部输出完成向下执行) 2 3 4 线程结束 */ } /** * futureTask的使用 * FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口 * 所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。 */ private static void testCallableAndFutureTask() { ExecutorService threadPool = Executors.newCachedThreadPool(); Task task = new Task(); FutureTask<Integer> futureTask = new FutureTask<Integer>(task); threadPool.submit(futureTask); threadPool.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task执行结果: " + futureTask.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任务执行完毕"); }} /** * 任务task */class Task implements Callable<Integer> { public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for (int i = 0; i < 100; i++) sum += i; return sum; }} |
参考文章: 海子大神