在JAVA的世界里,如果想并行的執(zhí)行一些任務(wù),可以使用ThreadPoolExecutor。
大部分情況下直接使用ThreadPoolExecutor就可以滿足要求了,但是在某些場景下,比如瞬時大流量的,為了提高響應(yīng)和吞吐量,最好還是擴展一下ThreadPoolExecutor。
全宇宙的JAVA IT人士應(yīng)該都知道ThreadPoolExecutor的執(zhí)行流程:
core線程還能應(yīng)付的,則不斷的創(chuàng)建新的線程;
core線程無法應(yīng)付,則將任務(wù)扔到隊列里面;
隊列滿了(意味著插入任務(wù)失敗),則開始創(chuàng)建MAX線程,線程數(shù)達到MAX后,隊列還一直是滿的,則拋出RejectedExecutionException.
這個執(zhí)行流程有個小問題,就是當core線程無法應(yīng)付請求的時候,會立刻將任務(wù)添加到隊列中,如果隊列非常長,而任務(wù)又非常多,那么將會有頻繁的任務(wù)入隊列和任務(wù)出隊列的操作。
根據(jù)實際的壓測發(fā)現(xiàn),這種操作也是有一定消耗的。其實JAVA提供的SynchronousQueue隊列是一個零長度的隊列,任務(wù)都是直接由生產(chǎn)者遞交給消費者,中間沒有入隊列的過程,可見JAVA API的設(shè)計者也是有考慮過入隊列這種操作的開銷。
另外,任務(wù)一多,立刻扔到隊列里,而MAX線程又不干活,如果隊列里面太多任務(wù)了,只有可憐的core線程在忙,也是會影響性能的。
當core線程無法應(yīng)付請求的時候,能不能延后入隊列這個操作呢? 讓MAX線程盡快啟動起來,幫忙處理任務(wù)。
也即是說,當core線程無法應(yīng)付請求的時候,如果當前線程池中的線程數(shù)量還小于MAX線程數(shù)的時候,繼續(xù)創(chuàng)建新的線程處理任務(wù),一直到線程數(shù)量到達MAX后,才將任務(wù)插入到隊列里
我們通過覆蓋隊列的offer方法來實現(xiàn)這個目標。
@Override
public boolean offer(Runnable o) {
int currentPoolThreadSize = executor.getPoolSize();
//如果線程池里的線程數(shù)量已經(jīng)到達最大,將任務(wù)添加到隊列中
if (currentPoolThreadSize == executor.getMaximumPoolSize()) {
return super.offer(o);
}
//說明有空閑的線程,這個時候無需創(chuàng)建core線程之外的線程,而是把任務(wù)直接丟到隊列里即可
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
//如果線程池里的線程數(shù)量還沒有到達最大,直接創(chuàng)建線程,而不是把任務(wù)丟到隊列里面
if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
return false;
}
return super.offer(o);
}
注意其中的
if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
return super.offer(o);
}
是表示core線程仍然能處理的來,同時又有空閑線程的情況,將任務(wù)插入到隊列中。 如何判斷線程池中有空閑線程呢? 可以使用一個計數(shù)器來實現(xiàn),每當execute方法被執(zhí)行的時候,計算器加1,當afterExecute被執(zhí)行后,計數(shù)器減1.
@Override
public void execute(Runnable command) {
submittedTaskCount.incrementAndGet();
//代碼未完整,待補充。。。。。
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedTaskCount.decrementAndGet();
}
這樣,當
executor.getSubmittedTaskCount() < currentPoolThreadSize
的時候,說明有空閑線程。