监测服务
之前已经完成了Curl监测与FGDC检查两大工具类的构建,接下来将构建监测服务类,分为监测服务接口类、监测服务实现类与异步监测服务类
整个监测过程可以描述为:
获取待检测对象 → 多线程并行化监测 → 监测结果入库
线程池配置
详细代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration @EnableAsync public class AsyncConfiguration {
@Bean("async") public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(32); executor.setMaxPoolSize(64); executor.setQueueCapacity(512); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("asyncExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; }
}
|
SpringBoot使用的多线程方式非常简单,在需要异步的方法上加@Async("线程池名称")注解
ThreadPoolExecutor的拒绝策略
AbortPolicy
ThreadPoolExecutor中默认的拒绝策略为AbortPolicy:
1 2 3 4 5 6 7 8
| public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
|
即直接抛出异常,并不再执行任务
CallerRunsPolicy
该策略会在任务被拒绝添加后,调用当前线程池的所在的线程去执行被拒绝的任务:
1 2 3 4 5 6 7 8
| public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
|
该策略会导致主线程堵塞
DiscardPolicy
该策略会将拒绝的任务直接丢弃,同时不抛出异常:
1 2 3 4 5
| public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
|
DiscardOldestPolicy
该策略会在出现拒绝添加的任务时,从任务队列总弹出最先加入的任务(即Oldest),空出一个位置,然后再次执行execute方法把任务加入队列:
1 2 3 4 5 6 7 8 9
| public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
|
监测服务接口类
1 2 3 4
| public interface MonitorService { public List<FunctionBean> GetFunctionList(int index); public void FunctionMonitor(int index) throws InterruptedException; }
|
监测服务实现类
详细代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Service public class MonitorServiceImpl implements MonitorService { @Autowired private MySQLMapper mySQLMapper;
@Autowired private MonitorAsyncServer monitorAsyncServer;
@Override public List<FunctionBean> GetFunctionList(int index) { try { return mySQLMapper.GetFunctionList(index); }catch (Exception e) { e.printStackTrace(); return null; } }
@Override public void FunctionMonitor(int index) throws InterruptedException {
String monitorIP = new GetMonitorIP().GetIP(); String osName = System.getProperties().getProperty("os.name");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); Calendar cal = Calendar.getInstance();
List<FunctionBean> functionList = GetFunctionList(index);
CountDownLatch countDownLatch = new CountDownLatch(functionList.size());
System.out.println("\n-----------------------------------------------------------\n" + "FunctionMonitor StartTime : " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())+ "\n ( "+monitorIP+" - "+osName+" )"); System.out.println("Monitor index-Size : " +index+" - "+functionList.size());
for (FunctionBean function : functionList) { monitorAsyncServer.FunctionMonitor(countDownLatch, monitorIP, osName,function); }
try { countDownLatch.await(settingProperties.getMonitor().getCountDownLatchTimeout(), TimeUnit.MINUTES); } catch (InterruptedException e) { System.out.println("Exception: await interrupted exception"); } finally { System.out.println("countDownLatch: " + countDownLatch.getCount()+" (should be 0)"); } System.out.println("Monitor Finish at "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())); } }
|
在监测服务实现类中调用异步检测服务类MonitorAsyncServer实现对象的多线程异步监测。
为保证所有线程运行完后再执行下一步,加入CountDownLatch使代码修改为阻塞式:
- countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
- 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
异步检测服务类
在方法前加上注释 @Async("async") 声明此方法可使用多线程
详细代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Service public class MonitorAsyncServer {
@Autowired MySQLMapper mySQLMapper; @Autowired private SettingProperties settingProperties;
@Async("async") public void FunctionMonitor(CountDownLatch countDownLatch, String ip, String osName, FunctionBean function) { FGDCPerformance fp = new FGDCChecker(function.getFunction_URL()).GetFGDCPerformance();
CurlCmd cc = new CurlCmd(function.getFunction_URL(), function.getFunction_type(),function.getObject_type(), settingProperties.getCurl().getConnectionTimeOut(),settingProperties.getCurl().getOperationTimeOut(),osName); try { CurlResponse cr = cc.doCurlCmd(); UpdateFunction(function); AddFunctionPerformanceHistory(ip,function,fp,cr); countDownLatch.countDown();
} catch (InterruptedException ex) {
ex.printStackTrace(); countDownLatch.countDown(); } } }
|
多线程监测测试结果: