1.rxjava,observeOn(Schedulers.io())封装多线程,
Flowable.fromArray(ips) .observeOn(Schedulers.io()) .flatMap( ip -> Flowable.fromCallable( () -> { Boolean check = check(ip); checkResult.put(ip,check); return check; } ) ).blockingSubscribe();return checkResult;
2.线程池, 在线程池中执行saltStackUtil.restartJavaSync方法的同步,其返回值放入执行consummer
private static final ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {@Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("javaOperator"); return thread; }
});
public void restartJava(String ip, String projectName, String commitId, String jarName, String jvmArgs, Consumer<AppServerOperateResult> appServerOperateResultConsumer) throws SaltException {executorService.execute(() ->{ AppServerOperateResult appServerOperateResult = new AppServerOperateResult(); appServerOperateResult.setIp(ip); appServerOperateResult.setProjectName(projectName); appServerOperateResult.setCommitId(commitId); appServerOperateResult.setJarName(jarName); appServerOperateResult.setStartTime(new Date()); appServerOperateResult.setStatus(Constant.JAVA_APP_OPERATE_PROCESS); appServerOperateResult.setType(Constant.JAVA_APP_OPERATE_RESTART); appServerOperateResult.setRunParamater(jvmArgs); Map<String, Result<Map<String, State.ApplyResult>>> stringResultMap = null; try { stringResultMap = saltStackUtil.restartJavaSync(ip, projectName, commitId, jarName, jvmArgs); processResult(stringResultMap,appServerOperateResult); appServerOperateResultConsumer.accept(appServerOperateResult); } catch (Exception e) { MyExceptionHandler.handlerException(e,logger); } }); }
3.stream,parallel()并行流执行foreach,consummer封装返回数据到checkResult Map中
Stream.of(ips).filter(ip -> ip!=null).parallel().forEach(new Consumer<String>() {@Override public void accept(String ip) {checkResult.put(ip, check(ip, appId, appTypeCode)); } }); return checkResult;
时间: 2024-09-17 04:53:08