1、Dubbo已有線程池
dubbo在使用時,都是通過建立真實的業務線程池進行操作的。目前已知的線程池模型有兩個和java中的互相對應:
- fix: 表示建立固定大小的線程池。也是Dubbo預設的使用方式,預設建立的執行線程數為200,并且是沒有任何等待隊列的。是以在極端的情況下可能會存在問題,比如某個操作大量執行時,可能存在堵塞的情況。後面也會講相關的處理辦法。
- cache: 建立非固定大小的線程池,當線程不足時,會自動建立新的線程。但是使用這種的時候需要注意,如果突然有高TPS的請求過來,方法沒有及時完成,則會造成大量的線程建立,對系統的CPU和負載都是壓力,執行越多反而會拖慢整個系統。
2、自定義線程池
在真實的使用過程中可能會因為使用fix模式的線程池,導緻具體某些業務場景因為線程池中的線程數量不足而産生錯誤,而很多業務研發是對這些無感覺的,隻有當出現錯誤的時候才會去檢視告警或者通過客戶回報出現嚴重的問題才去檢視,結果發現是線程池滿了。是以可以在建立線程池的時,通過某些手段對這個線程池進行監控,這樣就可以進行及時的擴縮容機器或者告警。下面的這個程式就是這樣子的,會在建立線程池後進行對其監控,并且及時作出相應處理。
(1)線程池實作, 這裡主要是基于對FixedThreadPool 中的實作做擴充出線程監控的部分
package com.lagou.threadpool;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.*;
public class WachingThreadPool extends FixedThreadPool implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(WachingThreadPool.class);
// 定義線程池使用的閥值
private static final double ALARM_PERCENT = 0.90;
private final Map<URL, ThreadPoolExecutor> THREAD_POOLS = new ConcurrentHashMap<>();
public WachingThreadPool() {
// 每隔3秒列印線程使用情況
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 3, TimeUnit.SECONDS);
}
// 通過父類建立線程池
@Override
public Executor getExecutor(URL url) {
final Executor executor = super.getExecutor(url);
if (executor instanceof ThreadPoolExecutor) {
THREAD_POOLS.put(url, (ThreadPoolExecutor) executor);
}
return executor;
}
@Override
public void run() {
// 周遊線程池
for (Map.Entry<URL, ThreadPoolExecutor> entry : THREAD_POOLS.entrySet()) {
final URL url = entry.getKey();
final ThreadPoolExecutor executor = entry.getValue();
// 計算相關名額
final int activeCount = executor.getActiveCount();
final int poolSize = executor.getCorePoolSize();
double usedPercent = activeCount / (poolSize * 1.0);
LOGGER.info("線程池執行狀态:[{}/{}:{}%]", activeCount, poolSize, usedPercent * 100);
if (usedPercent > ALARM_PERCENT) {
LOGGER.error("超出警戒線! host:{} 目前使用率是:{},URL:{}", url.getIp(), usedPercent * 100, url);
}
}
}
}
(2)SPI聲明,建立檔案(固定的)
META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool
watching=包名.線程池名
(3)在服務提供方項目引入該依賴
(4)在服務提供方項目中設定使用該線程池生成器
dubbo.provider.threadpool=watching
(5)接下來需要做的就是模拟整個流程,因為該線程目前是每1秒抓一次資料,是以我們需要對該方法的提供者超過1秒的時間(比如這裡用休眠Thread.sleep ),消費者則需要啟動多個線程來并行執行,來模拟整個并發情況。
(6)在調用方則嘗試簡單通過for循環啟動多個線程來執行 檢視服務提供方的監控情況
package com.lagou;
import com.lagou.bean.ConsumerComponent;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import java.io.IOException;
public class AnnotationConsumerMain {
public static void main(String[] args) throws IOException, InterruptedException {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ConsumerConfiguration.class);
context.start();
ConsumerComponent service = context.getBean(ConsumerComponent.class);
while (true) {
for (int i = 0; i < 1000; i++) {
Thread.sleep(5);
new Thread(new Runnable() {
@Override
public void run() {
String msg = service.sayHello("world", 0);
System.out.println(msg);
}
}).start();
}
}
}
@Configuration
@PropertySource("classpath:/dubbo-consumer.properties")
//@EnableDubbo(scanBasePackages = "com.lagou.bean")
@ComponentScan("com.lagou.bean")
@EnableDubbo
static class ConsumerConfiguration {
}
}