Marble原理之线程池

本章节依赖于【Marble使用】,阅读本章节前请保证已经充分了解Marble

线程池概述

由于Marble属于框架性项目,用户接入Marble不关心Marble的实现机制。因此Marble在做相关处理时对资源的消耗要可控,不能因为Marble的原因导致接入的应用不可用(比如资源耗尽)。
此外,Marble-Agent每次收到RPC调度为了不阻塞都会新开线程进行JOB执行,对线程的使用非常频繁,因此必须使用同一的线程池进行Marble的资源使用收口。

对于线程池 Java已经做了很好的封装,大部分的使用场景都能覆盖,枚举如下:
1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程;
2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待;
3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行;
4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行;

线程池new线程的流程(网络盗图):

Marble线程池

线程池定义

由于Marble线程池一个很大的作用是为了控制资源使用,给Marble资源占用设定上限,Java本身提供的线程池虽然有最大线程数设置,但阻塞队列用的都是无界的,不适合做资源限定使用。因此,Marble对java线程池做了定制化。

使用有界阻塞队列

 executor = new ThreadPoolExecutor(
                tpConfig.getMaxSize(),
                tpConfig.getCoreSize(), 0, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(tpConfig.getBlockQueueSize()),
                tpConfig.getRejectPolicy()
        );

线程池自配置支持

为了方便用户进行线程池自配置,Marble提供配置文件的方式支持用户自定义线程池配置,配置方式为:在项目根目录下建立文件marble-config.properties 。文件中进行参数赋值,如下:

#线程池最大线程数
tpool_max_size=5
#线程池核心线程数
tpool_core_size=5
#线程池阻塞有界队列长度
tpool_bq_size=3
#线程池满后的处理策略。1-AbortPolicy(抛出RejectedExecutionException异常); 2-CallerRunsPolicy; 3-DiscardOldestPolicy 4-DiscardPolicy(不抛出异常)
tpool_reject_policy=1

Marble会首先在根目录下查找此配置文件,找不到会用默认配置。
tpool_max_size=20
tpool_core_size=20
tpool_bq_size=5
tpool_reject_policy=1

Marble的配置解析类如下:


/**
 * Marble 配置解析
 *
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/3/31 20:15
 */
public class MarbleConfigParser {
    private static ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleConfigParser.class);
    private static final String CONFIG = "marble-config.properties";
    private static Properties prop = new Properties();
    //默认配置
    private static final int TPOOL_MAX_SIZE = 20;//线程池最大线程数
    private static final int TPOOL_CORE_SIZE = 20;//线程池核心线程数
    private static final int TPOOL_BQ_SIZE = 5;//线程池阻塞队列大小
    private static final int TPOOL_REJECT_POLICY = 1;//线程池满的处理策略. 1-AbortPolicy(抛出RejectedExecutionException异常); 2-CallerRunsPolicy; 3-DiscardOldestPolicy 4-DiscardPolicy

    private MarbleConfigParser() {
        try {
            InputStream stream = PropertyUtils.class.getClassLoader().getResourceAsStream(CONFIG);
            if (stream == null) {
                logger.MARK("PARSE_CONFIG").warn("no marbleConfig.properties.xml is exist in the root directory of classpath, so default the config will be used.");
                return;
            }
            prop.load(stream);
        } catch (Exception e) {
            logger.MARK("PARSE_CONFIG").error("parse the marbleConfig.properties.xml in the root directory exception, detail: {}", Throwables.getStackTraceAsString(e));
        }
    }

    //解析出thread pool配置
    ThreadPoolConfig parseTPConfig() {
        ThreadPoolConfig tpConfig = null;
        try {
            Integer tpms = getInteger(prop, "tpool_max_size");
            Integer tpcs = getInteger(prop, "tpool_core_size");
            Integer tpqs = getInteger(prop, "tpool_bq_size");
            Integer tprp = getInteger(prop, "tpool_reject_policy");

            //修正参数
            tpcs = (tpcs == null || tpcs < 0 || tpcs > 500) ? TPOOL_CORE_SIZE : tpcs;
            tpms = (tpms == null || tpms < tpqs) ? tpcs : tpms;
            tpqs = (tpqs == null || tpqs < 0 || tpqs > 100) ? TPOOL_BQ_SIZE : tpqs;
            tprp = (tprp == null || tprp > 4) ? 1 : tprp;

            RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
            switch (tprp) {
                case 1:
                    handler = new ThreadPoolExecutor.AbortPolicy();
                    break;
                case 2:
                    handler = new ThreadPoolExecutor.CallerRunsPolicy();
                    break;
                case 3:
                    handler = new ThreadPoolExecutor.DiscardOldestPolicy();
                    break;
                case 4:
                    handler = new ThreadPoolExecutor.DiscardPolicy();
                    break;
            }
            tpConfig = new ThreadPoolConfig(tpms,tpcs,tpqs,handler);
        } catch (Exception e) {
            logger.MARK("PARSE_CONFIG").error("parse the thread-pool config from marbleConfig.properties.xml exception, detail: {}", Throwables.getStackTraceAsString(e));
        }
        if (tpConfig == null) {
            tpConfig = new ThreadPoolConfig(TPOOL_MAX_SIZE,TPOOL_CORE_SIZE, TPOOL_BQ_SIZE, new ThreadPoolExecutor.DiscardPolicy());
        }
        return tpConfig;
    }

    private Integer getInteger(Properties prop, String key) {
        Integer result = null;
        try {
            String value = prop.getProperty(key);
            if (value != null && value.trim().length() > 0) {
                result = Integer.parseInt(value);
            }
        } catch (Exception e) {
        }
        return result;
    }

    //单例
    private static class SingletonHolder {
        private static final MarbleConfigParser CONFIG_HELPER = new MarbleConfigParser();
    }

    public static MarbleConfigParser getInstance() {
        return MarbleConfigParser.SingletonHolder.CONFIG_HELPER;
    }

    //线程池配置
    class ThreadPoolConfig {
        private int maxSize;//线程池最大线程数
        private int coreSize;//线程池核心线程数
        private int blockQueueSize;//线程池阻塞队列大小
        private RejectedExecutionHandler rejectPolicy;//线程池拒绝策略

        ThreadPoolConfig(int maxSize, int coreSize, int blockQueueSize, RejectedExecutionHandler rejectPolicy) {
            this.maxSize = maxSize;
            this.coreSize = coreSize;
            this.blockQueueSize = blockQueueSize;
            this.rejectPolicy = rejectPolicy;
        }

        int getCoreSize() {
            return coreSize;
        }

        int getBlockQueueSize() {
            return blockQueueSize;
        }

        public int getMaxSize() {
            return maxSize;
        }

        RejectedExecutionHandler getRejectPolicy() {
            return rejectPolicy;
        }

        @Override
        public String toString() {
            return "ThreadPoolConfig{" +
                    "maxSize=" + StringUtils.safeString(maxSize) +
                    ", coreSize=" + StringUtils.safeString(coreSize) +
                    ", blockQueueSize=" + StringUtils.safeString(blockQueueSize) +
                    ", rejectPolicy=" + StringUtils.safeString(rejectPolicy.getClass().getSimpleName()) +
                    '}';
        }
    }
}

线程池使用示例

以如下线程池配置为例:
tpool_max_size=5
tpool_core_size=5
tpool_bq_size=3
tpool_reject_policy=1

下图中同一台机器(10.2.37.137)连续收到11次Marble调度 >

  • 第1~5次Marble-Agent成功从线程池中启动了5个线程进行执行;
  • 第6~8次调用,核心线程数已满,有界阻塞队列开始进行填充;
  • 第9~10次调用有界阻塞队列已被填满,最大线程数也已满,由于采用了 拒绝策略Abort,直接拒绝了10~11次的调度请求;
  • 手动进行了“线程中断”调用;
  • 第11次又成功执行;

时间: 2025-01-30 07:12:03

Marble原理之线程池的相关文章

Marble原理之线程中断

本章节依赖于[Marble使用],阅读本章节前请保证已经充分了解Marble. 中断特性从Marble-Agent 2.0.5开始支持. 线程中断使用 引入marble-agent jar包 <dependency> <groupId>com.github.jeff-dong</groupId> <artifactId>marble-agent</artifactId> <version>最新版</version> <

线程池基础知识分享

线程池的好处: 降低资源消耗:避免了频繁创建和销毁线程的资源消耗: 提高相应速度:当有新的任务到达时,不必每次都新建线程就可以立即执行: 提高线程的可管理性:线程池对线程进行统一分配.调优和监控.不允许无限制的创建线程.   线程池源码分析其实现原理 当线程池接收到一个新的提交任务,线程池如何处理这个新任务,这部分主要学习线程池的针对新任务的处理流程. 当前运行的线程数小于corePoolSize,创建新线程来执行任务,该步骤需要获取全局锁: 运行的线程数等于或大于corePoolSize,则将

多线程之线程池概述(一)

java在JDK1.5之后引入了并发计算框架,java.util.concurrent.这个框架大大减轻了简化了多线程的开发工作.一个线程大概有五种状态:新建状态(New).可运行状态(Runnable,也叫做运行状态).阻塞状态(Blocked).等待状态(Waiting).结束状态(Terminated).线程的状态只能由新建转变为了运行状态后才能被阻塞或者等待状态.线程的状态流转如图所示: 注意:这里把等待状态给细分了一下.把等待状态分为了等待池和等锁池. 线程的运行时间可以分为三个部分:

Mysql线程池优化笔记

Mysql线程池系列一(Thread pool FAQ) 首先介绍什么是mysql thread pool,干什么用的? 使用线程池主要可以达到以下两个目的: 1.在大并发的时候,性能不会因为过载而迅速下降. 2.减少性能抖动 thread pool的工作原理? 线程池使用分而治之的方法来限制和平衡并发性.与默认的thread_handling不同,线程池将连接和线程划分开,所以连接数量和执行语句的线程数不再是固定的关系,线程池可以通过 配置线程组来管理连接,然后再根据每个语句的关键字来确定是优

Java线程池架构原理和源码解析(ThreadPoolExecutor)

在前面介绍JUC的文章中,提到了关于线程池Execotors的创建介绍,在文章:<java之JUC系列-外部Tools>中第一部分有详细的说明,请参阅: 文章中其实说明了外部的使用方式,但是没有说内部是如何实现的,为了加深对实现的理解,在使用中可以放心,我们这里将做源码解析以及反馈到原理上,Executors工具可以创建普通的线程池以及schedule调度任务的调度池,其实两者实现上还是有一些区别,但是理解了ThreadPoolExecutor,在看ScheduledThreadPoolExe

ava实现线程池原理:适用于电商网站之类的交互频繁的网站

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动. 组成部分 1.线程池

CLR线程池的作用与原理浅析

线程池是一个重要的概念.不过我发现,关于这个话题的讨论似乎还缺少了点什么.作为资料的补充,以及今后文章所需要的引用,我在这里再完整而又简单地谈一下有关线程池,还有.NET中各种线程池的基础.更详细的内容就不多作展开了,有机会我们再详细讨论这方面的细节.这次,还是一个"概述"性质的,希望可以说明白这方面问题的一些概念. 线程池的作用 其实"线程池"就是用来存放"线程"的对象池. 在程序中,如果某个创建某种对象所需要的代价太高,同时这个对象又可以反复

Java 线程池架构原理和源码解析(ThreadPoolExecutor)

在前面介绍JUC的文章中,提到了关于线程池Execotors的创建介绍,在文章:<java之JUC系列-外部Tools>中第一部分有详细的说明,请参阅: 文章中其实说明了外部的使用方式,但是没有说内部是如何实现的,为了加深对实现的理解,在使用中可以放心,我们这里将做源码解析以及反馈到原理 上,Executors工具可以创建普通的线程池以及schedule调度任务的调度池,其实两者实现上还是有一些区别,但是理解了 ThreadPoolExecutor,在看ScheduledThreadPoolE

Java线程池架构(一)原理和源码解析

在前面介绍JUC的文章中,提到了关于线程池Execotors的创建介绍,在文章:<java之JUC系列-外部Tools>中第一部分有详细的说明,请参阅: 文章中其实说明了外部的使用方式,但是没有说内部是如何实现的,为了加深对实现的理解,在使用中可以放心,我们这里将做源码解析以及反馈到原理 上,Executors工具可以创建普通的线程池以及schedule调度任务的调度池,其实两者实现上还是有一些区别,但是理解了ThreadPoolExecutor,在看ScheduledThreadPoolEx