更新于 

监测服务

之前已经完成了Curl监测与FGDC检查两大工具类的构建,接下来将构建监测服务类,分为监测服务接口类、监测服务实现类与异步监测服务类

整个监测过程可以描述为:

获取待检测对象 → 多线程并行化监测 → 监测结果入库

线程池配置

详细代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
@EnableAsync
public class AsyncConfiguration {

@Bean("async")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数:线程池创建时候初始化的线程数
executor.setCorePoolSize(32);
// 最大线程数:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setMaxPoolSize(64);
// 缓冲队列:用来缓冲执行任务的队列
executor.setQueueCapacity(512);
// 允许线程的空闲时间60秒:当超过了核心线程之外的线程在空闲时间到达之后会被销毁
executor.setKeepAliveSeconds(60);
// 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
executor.setThreadNamePrefix("asyncExecutor-");
// 缓冲队列满了之后的拒绝策略:由调用线程处理(一般是主线程)
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}

}

SpringBoot使用的多线程方式非常简单,在需要异步的方法上加@Async("线程池名称")注解

ThreadPoolExecutor的拒绝策略

AbortPolicy

ThreadPoolExecutor中默认的拒绝策略为AbortPolicy:

1
2
3
4
5
6
7
8
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

即直接抛出异常,并不再执行任务

CallerRunsPolicy

该策略会在任务被拒绝添加后,调用当前线程池的所在的线程去执行被拒绝的任务:

1
2
3
4
5
6
7
8
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}

该策略会导致主线程堵塞

DiscardPolicy

该策略会将拒绝的任务直接丢弃,同时不抛出异常:

1
2
3
4
5
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy

该策略会在出现拒绝添加的任务时,从任务队列总弹出最先加入的任务(即Oldest),空出一个位置,然后再次执行execute方法把任务加入队列:

1
2
3
4
5
6
7
8
9
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

监测服务接口类

1
2
3
4
public interface MonitorService {
public List<FunctionBean> GetFunctionList(int index);
public void FunctionMonitor(int index) throws InterruptedException;
}

监测服务实现类

详细代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Service
public class MonitorServiceImpl implements MonitorService {
@Autowired
private MySQLMapper mySQLMapper;

@Autowired
private MonitorAsyncServer monitorAsyncServer;

@Override
public List<FunctionBean> GetFunctionList(int index) {
try {
return mySQLMapper.GetFunctionList(index);
}catch (Exception e) {
e.printStackTrace();
return null;
}
}

@Override
public void FunctionMonitor(int index) throws InterruptedException {

String monitorIP = new GetMonitorIP().GetIP();//获取监测点IP
String osName = System.getProperties().getProperty("os.name");//获取监测点操作系统名

SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
Calendar cal = Calendar.getInstance();

List<FunctionBean> functionList = GetFunctionList(index);//获取待检测对象列表

CountDownLatch countDownLatch = new CountDownLatch(functionList.size());

System.out.println("\n-----------------------------------------------------------\n" +
"FunctionMonitor StartTime : " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime())+
"\n ( "+monitorIP+" - "+osName+" )");
System.out.println("Monitor index-Size : " +index+" - "+functionList.size());

for (FunctionBean function : functionList) {
monitorAsyncServer.FunctionMonitor(countDownLatch, monitorIP, osName,function);
}

try {
countDownLatch.await(settingProperties.getMonitor().getCountDownLatchTimeout(), TimeUnit.MINUTES);
} catch (InterruptedException e) {
System.out.println("Exception: await interrupted exception");
} finally {
System.out.println("countDownLatch: " + countDownLatch.getCount()+" (should be 0)");
}
System.out.println("Monitor Finish at "+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()));
}
}

在监测服务实现类中调用异步检测服务类MonitorAsyncServer实现对象的多线程异步监测。

为保证所有线程运行完后再执行下一步,加入CountDownLatch使代码修改为阻塞式:

  • countDownLatch这个类使一个线程等待其他线程各自执行完毕后再执行。
  • 是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。

异步检测服务类

在方法前加上注释 @Async("async") 声明此方法可使用多线程

详细代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Service
public class MonitorAsyncServer {

@Autowired
MySQLMapper mySQLMapper;
@Autowired
private SettingProperties settingProperties;

@Async("async")
public void FunctionMonitor(CountDownLatch countDownLatch, String ip, String osName, FunctionBean function) {
//FGDC监测
FGDCPerformance fp = new FGDCChecker(function.getFunction_URL()).GetFGDCPerformance();

//Curl监测
CurlCmd cc = new CurlCmd(function.getFunction_URL(), function.getFunction_type(),function.getObject_type(),
settingProperties.getCurl().getConnectionTimeOut(),settingProperties.getCurl().getOperationTimeOut(),osName);
try {
CurlResponse cr = cc.doCurlCmd();

//处理并入库
UpdateFunction(function);
AddFunctionPerformanceHistory(ip,function,fp,cr);
//System.out.println("线程池" + Thread.currentThread().getName()+" - "+countDownLatch.getCount());
countDownLatch.countDown();

} catch (InterruptedException ex) {

ex.printStackTrace();
countDownLatch.countDown();
}
}
}

多线程监测测试结果: