LinkBlockingQueue改進
問題背景
https://github.com/apache/dubbo/pull/9722/files
使用線程池的同學對于标題中的隊列想必都有過使用,但上述隊列使用不當時則會造成程式OOM,那怎麼來控制呢?
使用ArrayBlockingQueue?如何來評估長度?
是否有一個完美的解決方案呢,MemorySafeLinkedBlockingQueue則通過對記憶體的限制判斷盡面控制隊列的容量,完成解決了可能存在的OOM問題。
擷取記憶體大小(注:機關大B;支援準實時更新):
Runtime.getRuntime().freeMemory()//JVM中已經申請到的堆記憶體中還未使用的大小
Runtime.getRuntime().maxMemory()// JVM可從作業系統申請到的最大記憶體值 -Xxm
Runtime.getRuntime().totalMemory()// JVM已從作業系統申請到的記憶體大小 —Xxs可設定該值大小-初始堆的大小
線程池在excute任務時,放隊列,放不進去,使用新線程運作任務。這個放不進行,是使用的offer??非阻塞方法嗎?
參考:https://blog.csdn.net/weixin_43108539/article/details/125190023
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//拿到32位的int
int c = ctl.get();
//工作線程數<核心線程數
if (workerCountOf(c) < corePoolSize) {
//進入if,代表可以建立 核心 線程數
if (addWorker(command, true))
return;
//如果沒進入if,代表建立核心線程數失敗,重新擷取 ctl
c = ctl.get();
}
//判斷線程池為Running狀态,将任務添加入阻塞隊列,使用offer
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次判斷是否為Running狀态,若不是Running狀态,remove任務
if (! isRunning(recheck) && remove(command))
reject(command);
//如果線程池在Running狀态,線程池數量為0
else if (workerCountOf(recheck) == 0)
//阻塞隊列有任務,但是沒有工作線程,添加一個任務為空的工作線程處理阻塞隊列中的任務
addWorker(null, false);
}
//阻塞隊列已滿,建立非核心線程,拒絕政策-addWorker中有判斷核心線程數是否超過最大線程數
else if (!addWorker(command, false))
reject(command);
}
空閑記憶體計算
package com.zte.sdn.oscp.queue;
import cn.hutool.core.thread.NamedThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class MemoryLimitCalculator {
private static volatile long maxAvailable;
private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);
private static void refresh() {
maxAvailable = Runtime.getRuntime().freeMemory();
}
private static void checkAndScheduleRefresh() {
if (!refreshStarted.get()) {
// immediately refresh when first call to prevent maxAvailable from being 0
// to ensure that being refreshed before refreshStarted being set as true
// notice: refresh may be called for more than once because there is no lock
refresh();
if (refreshStarted.compareAndSet(false, true)) {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator"));
// check every 50 ms to improve performance
scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
refreshStarted.set(false);
scheduledExecutorService.shutdown();
}));
}
}
}
/**
* Get the maximum available memory of the current JVM.
*
* @return maximum available memory
*/
public static long maxAvailable() {
checkAndScheduleRefresh();
return maxAvailable;
}
/**
* Take the current JVM's maximum available memory
* as a percentage of the result as the limit.
*
* @param percentage percentage
* @return available memory
*/
public static long calculate(final float percentage) {
if (percentage <= 0 || percentage > 1) {
throw new IllegalArgumentException();
}
checkAndScheduleRefresh();
return (long) (maxAvailable() * percentage);
}
/**
* By default, it takes 80% of the maximum available memory of the current JVM.
*
* @return available memory
*/
public static long defaultLimit() {
checkAndScheduleRefresh();
return (long) (maxAvailable() * 0.8);
}
}
記憶體安全隊列
package com.zte.sdn.oscp.queue;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 8032578371739960142L;
public static int THE_256_MB = 256 * 1024 * 1024;
private int maxFreeMemory;
private Rejector<E> rejector;
public MemorySafeLinkedBlockingQueue() {
this(THE_256_MB);
}
public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
final int maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
/**
* set the max free memory.
*
* @param maxFreeMemory the max free memory
*/
public void setMaxFreeMemory(final int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}
/**
* get the max free memory.
*
* @return the max free memory limit
*/
public int getMaxFreeMemory() {
return maxFreeMemory;
}
/**
* set the rejector.
*
* @param rejector the rejector
*/
public void setRejector(final Rejector<E> rejector) {
this.rejector = rejector;
}
/**
* determine if there is any remaining free memory.
*
* @return true if has free memory
*/
public boolean hasRemainedMemory() {
return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
}
@Override
public void put(final E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
} else {
rejector.reject(e, this);
}
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e, timeout, unit);
}
@Override
public boolean offer(final E e) {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e);
}
}
拒絕政策
注意其中的rejector是拒絕政策,預設的DiscardPolicy什麼也不處理;
而DiscardOldPolicy的處理邏輯很簡單
public class DiscardOldestPolicy<E> implements Rejector<E> {
@Override
public void reject(final E e, final Queue<E> queue) {
queue.poll();
queue.offer(e);
}
}
AbortPolicy則直接抛出異常
public class AbortPolicy<E> implements Rejector<E> {
@Override
public void reject(final E e, final Queue<E> queue) {
throw new RejectException("no more memory can be used !");
}
}
個人建議增加日志列印即可。