這兩天同時在幾個地方被安利了這兩個BlockingQueue的實作類,号稱能很好的解決OOM問題,本着好奇的目光點進去看了後,感覺甚是有趣
BlockingQueue介紹
BlockingQueue是jdk中自帶的一個接口,它繼承了Queue接口,并新增了put(e),take()兩個方法
他是一個帶阻塞功能的隊列,當入隊列時,若隊列已滿,則阻塞調 用者;當出隊列時,若隊列為空,則阻塞調用者。
主要方法有
//入隊,如果隊列滿,等待直到隊列有空間
void put(E e) throws InterruptedException;
//出隊,如果隊列空,等待直到隊列不為空,傳回頭部元素
E take() throws InterruptedException;
//入隊,如果隊列滿,最多等待指定的時間,如果逾時還是滿,傳回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//出隊,如果隊列空,最多等待指定的時間,如果逾時還是空,傳回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
在java的Concurrent包中,有衆多的實作類
- ArrayBlockingQueue、
- LinkedBlockingQueue、
- DelayQueue。
- PriorityBlockingQueue
- SynchronousQueue
等,大家或多或少地見過用過。
主角登場
這次講的主角是MemoryLimitedLinkedBlockingQueue和MemorySafeLinkedBlockingQueue,也是BlockingQueue的實作類,據說最初出自開源Apache ShenYu項目。
隊列實作類的名字很長,但是也讓人很好了解,最大特點都是跟記憶體的使用有關,一個是記憶體的使用限制,一個是記憶體的使用安全,感覺起來是不是很相似?
那為什麼要這兩個實作呢,都可以在哪些場景下使用呢
我們可能都看過阿裡巴巴的java開發規範,其中有一項就是線程池不能通過Executors來建立,有一個理由是像FixedThreadPool和SingleThreadPool這種方式建立的線程池允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,進而導緻OOM異常。
我們檢視Executors的源碼:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
用的是LinkedBlockingQueue,并沒有指定長度,也就是說可以向隊列裡無限添加資料了。怎麼一想,是不是真的挺危險。
那怎麼辦呢,一般來說,需要我們在初始化的時候指定一個合理的值,但是這個值到底是多少,可能我們需要根據使用的場景和過往的經驗來給值了。
這樣一琢磨,這裡面就确實值得玩味,或許換個思路用上面兩主角來嘗試解決這個問題
MemoryLimitedLinkedBlockingQueue
根據送出者的描述,它可以用來Executors中代替LinkedBlockingQueue隊列,可避免OOM的問題,比如,限制這個隊列可以使用的最大記憶體為 100M,超過使用記憶體限制就不允許繼續往裡添加。這樣是不是就解決問題了
說起來容易,那就把家夥亮出來讓我們開開眼吧
以下是完整類代碼
package org.apache.shenyu.common.concurrent;
import java.lang.instrument.Instrumentation;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}.
*
* @see org.apache.shenyu.common.concurrent.MemoryLimiter
* @see org.apache.shenyu.common.concurrent.MemoryLimitCalculator
*/
public class MemoryLimitedLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = -6106022470621447542L;
private final MemoryLimiter memoryLimiter;
public MemoryLimitedLinkedBlockingQueue(final Instrumentation inst) {
this(Integer.MAX_VALUE, inst);
}
public MemoryLimitedLinkedBlockingQueue(final long memoryLimit,
final Instrumentation inst) {
super(Integer.MAX_VALUE);
this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
}
public MemoryLimitedLinkedBlockingQueue(final Collection<? extends E> c,
final long memoryLimit,
final Instrumentation inst) {
super(c);
this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
}
/**
* set the memory limit.
*
* @param memoryLimit the memory limit
*/
public void setMemoryLimit(final long memoryLimit) {
memoryLimiter.setMemoryLimit(memoryLimit);
}
/**
* get the memory limit.
*
* @return the memory limit
*/
public long getMemoryLimit() {
return memoryLimiter.getMemoryLimit();
}
/**
* get the current memory.
*
* @return the current memory
*/
public long getCurrentMemory() {
return memoryLimiter.getCurrentMemory();
}
/**
* get the current remain memory.
*
* @return the current remain memory
*/
public long getCurrentRemainMemory() {
return memoryLimiter.getCurrentRemainMemory();
}
@Override
public void put(final E e) throws InterruptedException {
memoryLimiter.acquireInterruptibly(e);
super.put(e);
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
return memoryLimiter.acquire(e, timeout, unit) && super.offer(e, timeout, unit);
}
@Override
public boolean offer(final E e) {
return memoryLimiter.acquire(e) && super.offer(e);
}
@Override
public E take() throws InterruptedException {
final E e = super.take();
memoryLimiter.releaseInterruptibly(e);
return e;
}
@Override
public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
final E e = super.poll(timeout, unit);
memoryLimiter.releaseInterruptibly(e, timeout, unit);
return e;
}
@Override
public E poll() {
final E e = super.poll();
memoryLimiter.release(e);
return e;
}
@Override
public boolean remove(final Object o) {
final boolean success = super.remove(o);
if (success) {
memoryLimiter.release(o);
}
return success;
}
@Override
public void clear() {
super.clear();
memoryLimiter.reset();
}
}
MemoryLimitedLinkedBlockingQueue繼承自 LinkedBlockingQueue,然後重寫了它的幾個核心方法。内部定義了一個MemoryLimiter對象,在每一次對隊列的操作都會使用到這個對象。
MemoryLimiter源碼截圖:
裡面有兩個成員變量類型比較陌生
LongAdder:作用和AtomicLong是一樣,都是一個實作了原子操作的累加器。在阿裡巴巴的java開發手冊也推薦用LongAdder來替代AtomicLong,可以減少樂觀鎖的重試次數
Instrumentation: 查詢百度介紹是這樣的,
開發者可以建構一個獨立于應用程式的代理程式(Agent),用來監測和協助運作在 JVM 上的程式,甚至能夠替換和修改某些類的定義。有了這樣的功能,開發者就可以實作更為靈活的運作時虛拟機監控和 Java 類操作了,這樣的特性實際上提供了一種虛拟機級别支援的 AOP 實作方式,使得開發者無需對 JDK 做任何更新和改動,就可以實作某些 AOP 的功能了。
在 Java SE 6 裡面,instrumentation 包被賦予了更強大的功能:啟動後的 instrument、本地代碼(native code)instrument,以及動态改變 classpath 等等。這些改變,意味着 Java 具有了更強的動态控制、解釋能力,它使得 Java 語言變得更加靈活多變。
看起來感覺很牛逼也比較難懂,我們接着看在put操作隊列時都具體做了些什麼
getObjectSize看起來是擷取對象e的大小,有個這個神器,再結合這個隊列的描述,就比較容易了解作者的意圖了。
往隊列裡put時,先檢查目前隊列已使用記憶體和要put的對象記憶體大小,如果兩者的和超過記憶體限制大小,就一直阻塞等待,否則就放行并更新隊列已使用記憶體大小
從隊列中take時,從已使用記憶體大小中減去對象e的記憶體大小值
需要注意的是,上述代碼其實是有一個bug的,此處我們略過不談
MemoryLimitedLinkedBlockingQueue整個思路就是這麼簡單粗暴,接下來我們看下MemorySafeLinkedBlockingQueue是什麼鬼
MemorySafeLinkedBlockingQueue
從作者的送出說明來看,它也能解決OOM的問題,并且不依賴Instrumentation,還比MemoryLimitedLinkedBlockingQueue要更簡單好用。
以下是完整類代碼
/**
* Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
* does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
* {@link org.apache.shenyu.common.concurrent.MemoryLimitedLinkedBlockingQueue}.
*/
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {
private static final long serialVersionUID = 8032578371749960142L;
private int maxFreeMemory;
private Rejector<E> rejector;
public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
super(Integer.MAX_VALUE);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
final int maxFreeMemory) {
super(c);
this.maxFreeMemory = maxFreeMemory;
//default as DiscardPolicy to ensure compatibility with the old version
this.rejector = new DiscardPolicy<>();
}
/**
* set the max free memory.
*
* @param maxFreeMemory the max free memory
*/
public void setMaxFreeMemory(final int maxFreeMemory) {
this.maxFreeMemory = maxFreeMemory;
}
/**
* get the max free memory.
*
* @return the max free memory limit
*/
public int getMaxFreeMemory() {
return maxFreeMemory;
}
/**
* set the rejector.
*
* @param rejector the rejector
*/
public void setRejector(final Rejector<E> rejector) {
this.rejector = rejector;
}
/**
* determine if there is any remaining free memory.
*
* @return true if has free memory
*/
public boolean hasRemainedMemory() {
return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
}
@Override
public void put(final E e) throws InterruptedException {
if (hasRemainedMemory()) {
super.put(e);
}
rejector.reject(e, this);
}
@Override
public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e, timeout, unit);
}
@Override
public boolean offer(final E e) {
if (!hasRemainedMemory()) {
rejector.reject(e, this);
return false;
}
return super.offer(e);
}
}
MemorySafeLinkedBlockingQueue也是繼承自 LinkedBlockingQueue,有一成員變量maxFreeMemory,預設256M。
琢磨一下,莫非它會擷取整個JVM 裡面的剩餘空間,當put操作計算剩餘空間不足maxFreeMemory就會阻塞?
繼續看,計算記憶體使用大小時,依賴了一個MemoryMXBean 對象。好像這個類也比較陌生,繼續百度:提供管理接口,用于監視和管理 Java 虛拟機以及 Java 虛拟機在其上運作的作業系統。
MemorySafeLinkedBlockingQueue的核心方法是:hasRemainedMemory() ,即時判斷JVM可用空間是否超過maxFreeMemory的限制。
繼續看MemoryLimitCalculator源碼:
核心方法是refresh,更新剩餘可用空間;
另有一定時器,以50ms每次的頻率調用refresh()方法更新剩餘可用空間值。
再來梳理一下作者的思路:
put和offer時,檢查是否有剩餘足夠的可用空間,是則允許添加
從作者的提供的示例來看,還支援動态設定maxFreeMemory的值
這樣一比較,好家夥,看起來确實比MemoryLimitedLinkedBlockingQueue從設計上來說要好那麼一些
總結
上面的這兩個實作類都不依賴第三方架構,其代碼直接拿來就用,而且代碼也沒幾行。其實不管是用 Instrumentation 還是 ManagementFactory,本質上都是要限制記憶體
MemoryLimitedLinkedBlockingQueue算是正向思維,我需要使用隊列時不報OOM,就解決隊列自身的使用記憶體問題
MemorySafeLinkedBlockingQueue算是逆向思維,通過感覺外界的記憶體使用情況,始終JVM預留一定大小的可用記憶體空間,來避免OOM問題。
其實從這兩個BlockingQueue的實作類的設計思路,也可以用在我們的項目中呢,比方說使用的各種容器,也可能存在OOM 的風險,我們也可以借鑒上面的思路來解決呀。這次的寫文章主要目的不是為了推廣這兩個類的使用,更重要的是學會拓寬自己的思維,有時換個方向也能取得更好的結果
最後,關于這兩個類,其實在實作也有不少的缺陷。比如有大佬提出了自己的看法,比如 Instrumetation.getObjectSize()可能并不能準确的擷取對象的實際記憶體大小,MemoryMXBean 擷取的可用記憶體沒有考慮到GC的情況