Marble原理之线程中断

本章节依赖于【Marble使用】,阅读本章节前请保证已经充分了解Marble。
中断特性从Marble-Agent 2.0.5开始支持。

线程中断使用

  1. 引入marble-agent jar包

    <dependency>
    <groupId>com.github.jeff-dong</groupId>
    <artifactId>marble-agent</artifactId>
    <version>最新版</version>
    </dependency>
  2. JOB执行代码适当位置添加中断标志, 下面给出示例代码
    @Component("job1")
    public class Job1 extends MarbleJob {
    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(Job1.class);
    
    @Override
    public void execute(String param) throws Exception {
        logger.info("JOB1开始执行 ...");
        int i = 0;
        while (true) {
            i++;
            //1、用中断状态码进行判断
            if (Thread.interrupted()) {
                logger.info("JOB1-[{}]-[{}]被打断啦", param, Thread.currentThread().getName());
                return;
            }
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                //2、捕获终端异常后return结束
                return;
            }
            logger.info("JOB1-[{}]-[{}]-{}-------", param, Thread.currentThread().getName(), i);
        }
    }
    }
    
  3. Marble OFFLINE进行线程中断

3.1 手动调度线程中断

3.2 选择要中断的服务器进行终端尝试

3.3 查看中断日志(同步JOB)

中断实现及原理

Java的线程中断

Java的线程中断机制是一种协作机制,线程中断并不能立即停掉线程执行,相反,可能线程永远都不会响应。
java的线程中断模型只是通过修改线程的中断标志(interrupt)进行中断通知,不会有其它额外操作,因此线程是否最终中断取决于线程的执行逻辑。因此,如果想让线程按照自己的想法中断,要代码中事先进行中断的“埋点”处理。

有人可能会想到Thread的stop方法进行中断,由于此方法可能造成不可预知的结果,已经被抛弃

Marble进行线程中断实现

需求收集
  1. 以JOB为维度进行线程中断;
  2. 尽量做到实时响应;
  3. 存在集群中多台机器,要支持指定某台机器中的线程中断;
  4. 允许多次中断尝试;
  5. 中断请求不能依赖于JOB当前状态。可能已经停止调度的JOB也要手动中断执行中的线程;
  6. 透明和扩展不同JOB的中断(提供用户中断的"后处理"扩展);
需求分析及实现

【以JOB为维度进行线程中断】

Marble的JOB标志为 schedulerName-appId-jobName组成,目前Marble每个JOB调度时间和频率都是个性化,目前调度完成就销毁。但要做到任何时间进行执行中的线程中断就要:
1.1 存储JOB的运行线程,随时准备中断;
1.2 在缓存的JOB数量/时间和性能间做权衡,不能过多也不能过少;
1.3 制定缓存已满时的抛弃策略,避免缓存被占满新的线程永远无法中断;
1.4 要同步JOB和异步JOB透明处理(感觉不出差异);

实现:
Marble的线程池中定义支持并发的MAP进行JOB维度的线程缓存,此外指定每个JOB下缓存的线程数量。如下:

public class ThreadPool {
    ...
    private Multimap<String, Object> threadMultimap = Multimaps.synchronizedMultimap(HashMultimap.<String, Object>create());
    //multimap的单个key的最大容量
    private static final int THREADMULTIMAP_SIZE = 50;
    ...
}

Marble-Agent在同步/异步JOB生成新的线程对象时进行放入MAP缓存,如果缓存(50个)已满采用如下策略进行处理:
1. 尝试清理当前map中的非活跃线程;
2. 尝试清理当前map中已经完成的线程(同步线程有效);
3. 如果还未清理出空间,移除最久的线程;

public ThreadPool multimapPut(String key, Object value) {
        if (StringUtils.isNotBlank(key)) {
            Collection collection = threadMultimap.get(key);
            if (collection != null && collection.size() >= THREADMULTIMAP_SIZE) {
                //替换最久的
                Iterator<Object> it = collection.iterator();
                //首先进行 非活跃线程清理
                while (it.hasNext()) {
                    Object tempObj = it.next();
                    if(tempObj instanceof MarbleThread){
                        MarbleThread mt = (MarbleThread)tempObj;
                        //不活跃删除
                        if(!mt.isThreadAlive()){
                            it.remove();
                        }
                    }else if(tempObj instanceof MarbleThreadFeature){
                        MarbleThreadFeature mf = (MarbleThreadFeature) tempObj;
                        //完成的线程删除
                        if(mf.isDone()){
                            it.remove();
                        }
                    }
                }
                //仍然>最大值,删除最久未使用
                if(collection.size() >= THREADMULTIMAP_SIZE){
                    while (it.hasNext()) {
                        it.next();
                        it.remove();
                        break;
                    }
                }
                threadMultimap.put(key, value);
                return this;
            }
        }
        threadMultimap.put(key, value);
        return this;
    }

此外,为了能在JVM关闭时进行线程中断,添加JVM hook进行中断调用处理(包括线程池的销毁)。
除此之外,还有个小问题,由于线程池使用的是有界的阻塞队列,此种情况下,线程中断时可能有的线程存在于阻塞队列中,单纯的中断无效,对于此类情况,要首先判断阻塞队列中是否存在要中断的线程,存在的话进行队列的移除操作。

【尽量做到实时响应】
只能通过用户在具体的线程逻辑中进行埋点处理,Marble在框架层面除了及时把用户的中断请求送达之外,没有其它措施。

【存在集群中多台机器,要支持指定某台机器中的线程中断】
Marble OFFLINE的中断页面支持机器的选择,用户进行选择后,Marble会有针对性的进行机器的中断RPC发送。

【允许多次中断尝试】
OFFLINE未对中断次数进行限制,目前支持多次中断请求发送。

【中断请求不能依赖于JOB当前状态】
考虑到用户对历史线程的中断请求,Marble未把中断操作绑定在JOB状态上,任何JOB都可以进行终端尝试。

【透明扩展不同JOB的中断】
Marble目前支持同步和异步JOB,两类JOB的中断处理并不一致,比如同步job的中断是通过FeatureTask的cancel实现,异步JOB是通过Thread的interrupt实现,此外线程被中断后Marble希望能更进一步提供一个统一的“后处理”操作给用户自己实现,比如用户可能需要在线程被中断后进行一些后续的log记录等。

为了代码层面一致透明,且友好的实现“后处理”的封装,Marble使用了代理模式,在Thread和FeatureTask上添加了一层“代理类”,由代理进行具体的中断操作。
同步JOB代理类:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThreadFeature<V> implements RunnableFuture<V> {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThreadFeature.class);
    private MarbleJob marbleJob;
    private String param;
    private FutureTask<Result> futureTask;

    public MarbleThreadFeature(final MarbleJob marbleJob, final String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
        futureTask = new FutureTask<>(new Callable<Result>() {
            @Override
            public Result call() throws Exception {
                return marbleJob.executeSync(param);
            }
        });
    }

    @Override
    public void run() {
        futureTask.run();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return futureTask.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return futureTask.isCancelled();
    }

    @Override
    public boolean isDone() {
        return futureTask.isDone();
    }

    @Override
    public V get() throws InterruptedException, ExecutionException {
        return (V) futureTask.get();
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return (V) futureTask.get(timeout, unit);
    }

    public void stop(String operator) {
        if (futureTask != null && !futureTask.isCancelled()) {
            logger.info("Thread-feature[{}] is interrupted", futureTask.getClass().getName());
            futureTask.cancel(true);
        }else if(marbleJob != null){
            boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(marbleJob);
            logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", marbleJob.getClass().getSimpleName(),removeResult);
        }
        //中断后处理
        if(marbleJob != null){
            marbleJob.afterInterruptTreatment();
        }
    }

}

异步JOB代理类:


/**
 * @author <a href="dongjianxing@aliyun.com">jeff</a>
 * @version 2017/4/19 16:31
 */
public class MarbleThread implements Runnable {

    private ClogWrapper logger = ClogWrapperFactory.getClogWrapper(MarbleThread.class);
    private MarbleJob marbleJob;
    private String param;
    private Thread runThread;

    public MarbleThread(MarbleJob marbleJob, String param) {
        super();
        this.marbleJob = marbleJob;
        this.param = param;
    }

    @Override
    public void run() {
        runThread = Thread.currentThread();
        try {
            marbleJob.execute(param);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public boolean isThreadAlive() {
        return (runThread != null && runThread.isAlive());
    }

    public String getThreadName() {
        return runThread != null ? runThread.getName() : "";
    }

    public void stop() {
        //首先尝试在阻塞队列中删除
        boolean removeResult = ((ThreadPoolExecutor) ThreadPool.getFixedInstance().getExecutorService()).getQueue().remove(this);
        logger.info("Hanging MarbleJob[{}] is removed from the queue success?{}", this.getClass().getSimpleName(), removeResult);
        if (runThread != null && !runThread.isInterrupted()) {
            logger.info("Thread[{}] is interrupted", runThread.getName());
            runThread.interrupt();
        }
        //中断后处理
        if (marbleJob != null) {
            marbleJob.afterInterruptTreatment();
        }
    }
}
时间: 2024-09-17 22:18:21

Marble原理之线程中断的相关文章

Marble原理之线程池

本章节依赖于[Marble使用],阅读本章节前请保证已经充分了解Marble 线程池概述 由于Marble属于框架性项目,用户接入Marble不关心Marble的实现机制.因此Marble在做相关处理时对资源的消耗要可控,不能因为Marble的原因导致接入的应用不可用(比如资源耗尽). 此外,Marble-Agent每次收到RPC调度为了不阻塞都会新开线程进行JOB执行,对线程的使用非常频繁,因此必须使用同一的线程池进行Marble的资源使用收口. 对于线程池 Java已经做了很好的封装,大部分

Java线程中断的本质深入理解

    Java的中断是一种协作机制.也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己. 一.Java中断的现象 首先,看看Thread类里的几个方法:  public static boolean interrupted 测试当前线程是否已经中断.线程的中断状态 由该方法清除.换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况

线程中断方法interrupt() 与 cancel()

(一).关于interrupt()     interrupt()并不直接中断线程,而是设定一个中断标识,然后由程序进行中断检查,确定是否中断.     1. sleep() & interrupt()    线程A正在使用sleep()暂停着: Thread.sleep(100000);    如果要取消他的等待状态,可以在正在执行的线程里(比如这里是B)调用a.interrupt();    令线程A放弃睡眠操作,这里a是线程A对应到的Thread实例执行interrupt()时,并不需要获

Java线程中断的本质深入理解(转)

  一.Java中断的现象 首先,看看Thread类里的几个方法: public static boolean interrupted 测试当前线程是否已经中断.线程的中断状态 由该方法清除.换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外). public boolean isInterrupted() 测试线程是否已经中断.线程的中断状态 不受该方法的影响. public void 

关于线程中断的总结

在Core Java中有这样一句话:"没有任何语言方面的需求要求一个被中断的程序应该终止.中断一个线程只是为了引起该线程的注意,被中断线程可以决定如何应对中断 " 中断是一种协作机制.当一个线程中断另一个线程时,被中断的线程不一定要立即停止正在做的事情.相反,中断是礼貌地请求另一个线程在它愿意并且方便的时候停止它正在做的事情.有些方法,例如 Thread.sleep(),很认真地对待这样的请求,但每个方法不是一定要对中断作出响应.对于中断请求,不阻塞但是仍然要花较长时间执行的方法可以轮

《Java程序员面试秘笈》—— 1.5 线程中断的控制

1.5 线程中断的控制 通过上一节,你已经学会了如何去中断执行中的线程,也学会了如何在线程对象中去控制这个中断.上一个例子中使用的机制,可以使用在线程很容易被中断的情况下.但是,如果线程实现了复杂的算法并且分布在几个方法中,或者线程里有递归调用的方法,我们就得使用一个更好的机制来控制线程的中断.为了达到这个目的,Java提供了InterruptedException异常.当检查到线程中断的时候,就抛出这个异常,然后在run()中捕获并处理这个异常. 在本节中,我们将实现线程类来完成下面的内容,它

深入Java线程中断的本质与编程原则的概述_java

在历史上,Java试图提供过抢占式限制中断,但问题多多,例如前文介绍的已被废弃的Thread.stop.Thread.suspend和 Thread.resume等.另一方面,出于Java应用代码的健壮性的考虑,降低了编程门槛,减少不清楚底层机制的程序员无意破坏系统的概率.     如今,Java的线程调度不提供抢占式中断,而采用协作式的中断.其实,协作式的中断,原理很简单,就是轮询某个表示中断的标记,我们在任何普通代码的中都可以实现. 例如下面的代码:    volatile bool isI

Java线程中断的本质深入理解_java

一.Java中断的现象 首先,看看Thread类里的几个方法: public static boolean interrupted 测试当前线程是否已经中断.线程的中断状态 由该方法清除.换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外). public boolean isInterrupted() 测试线程是否已经中断.线程的中断状态 不受该方法的影响. public void in

jdbc pring-c3p0连接线程中断,求高手指教,在线等

问题描述 c3p0连接线程中断,求高手指教,在线等 [ts-merchant-bill-dbfix-job]2015-03-07 19:37:21,617 INFO [com.mchange.v2.log.MLog] MLog clients using log4j logging. [ts-merchant-bill-dbfix-job]2015-03-07 19:37:21,650 INFO [com.mchange.v2.c3p0.C3P0Registry] Initializing c3