天天看點

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

作者:夢想躺平的菠菜

這兩天同時在幾個地方被安利了這兩個BlockingQueue的實作類,号稱能很好的解決OOM問題,本着好奇的目光點進去看了後,感覺甚是有趣

BlockingQueue介紹

BlockingQueue是jdk中自帶的一個接口,它繼承了Queue接口,并新增了put(e),take()兩個方法

他是一個帶阻塞功能的隊列,當入隊列時,若隊列已滿,則阻塞調 用者;當出隊列時,若隊列為空,則阻塞調用者。

主要方法有

//入隊,如果隊列滿,等待直到隊列有空間
void put(E e) throws InterruptedException;
//出隊,如果隊列空,等待直到隊列不為空,傳回頭部元素
E take() throws InterruptedException;
//入隊,如果隊列滿,最多等待指定的時間,如果逾時還是滿,傳回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//出隊,如果隊列空,最多等待指定的時間,如果逾時還是空,傳回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;           

在java的Concurrent包中,有衆多的實作類

  • ArrayBlockingQueue、
  • LinkedBlockingQueue、
  • DelayQueue。
  • PriorityBlockingQueue
  • SynchronousQueue

等,大家或多或少地見過用過。

主角登場

這次講的主角是MemoryLimitedLinkedBlockingQueue和MemorySafeLinkedBlockingQueue,也是BlockingQueue的實作類,據說最初出自開源Apache ShenYu項目。

隊列實作類的名字很長,但是也讓人很好了解,最大特點都是跟記憶體的使用有關,一個是記憶體的使用限制,一個是記憶體的使用安全,感覺起來是不是很相似?

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

那為什麼要這兩個實作呢,都可以在哪些場景下使用呢

我們可能都看過阿裡巴巴的java開發規範,其中有一項就是線程池不能通過Executors來建立,有一個理由是像FixedThreadPool和SingleThreadPool這種方式建立的線程池允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,進而導緻OOM異常。

我們檢視Executors的源碼:

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
							0L, TimeUnit.MILLISECONDS,
							new LinkedBlockingQueue<Runnable>());           

用的是LinkedBlockingQueue,并沒有指定長度,也就是說可以向隊列裡無限添加資料了。怎麼一想,是不是真的挺危險。

那怎麼辦呢,一般來說,需要我們在初始化的時候指定一個合理的值,但是這個值到底是多少,可能我們需要根據使用的場景和過往的經驗來給值了。

這樣一琢磨,這裡面就确實值得玩味,或許換個思路用上面兩主角來嘗試解決這個問題

MemoryLimitedLinkedBlockingQueue

根據送出者的描述,它可以用來Executors中代替LinkedBlockingQueue隊列,可避免OOM的問題,比如,限制這個隊列可以使用的最大記憶體為 100M,超過使用記憶體限制就不允許繼續往裡添加。這樣是不是就解決問題了

說起來容易,那就把家夥亮出來讓我們開開眼吧

以下是完整類代碼

package org.apache.shenyu.common.concurrent;

import java.lang.instrument.Instrumentation;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}.
 *
 * @see org.apache.shenyu.common.concurrent.MemoryLimiter
 * @see org.apache.shenyu.common.concurrent.MemoryLimitCalculator
 */
public class MemoryLimitedLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = -6106022470621447542L;

    private final MemoryLimiter memoryLimiter;

    public MemoryLimitedLinkedBlockingQueue(final Instrumentation inst) {
        this(Integer.MAX_VALUE, inst);
    }

    public MemoryLimitedLinkedBlockingQueue(final long memoryLimit,
                                            final Instrumentation inst) {
        super(Integer.MAX_VALUE);
        this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
    }

    public MemoryLimitedLinkedBlockingQueue(final Collection<? extends E> c,
                                            final long memoryLimit,
                                            final Instrumentation inst) {
        super(c);
        this.memoryLimiter = new MemoryLimiter(memoryLimit, inst);
    }

    /**
     * set the memory limit.
     *
     * @param memoryLimit the memory limit
     */
    public void setMemoryLimit(final long memoryLimit) {
        memoryLimiter.setMemoryLimit(memoryLimit);
    }

    /**
     * get the memory limit.
     *
     * @return the memory limit
     */
    public long getMemoryLimit() {
        return memoryLimiter.getMemoryLimit();
    }

    /**
     * get the current memory.
     *
     * @return the current memory
     */
    public long getCurrentMemory() {
        return memoryLimiter.getCurrentMemory();
    }

    /**
     * get the current remain memory.
     *
     * @return the current remain memory
     */
    public long getCurrentRemainMemory() {
        return memoryLimiter.getCurrentRemainMemory();
    }

    @Override
    public void put(final E e) throws InterruptedException {
        memoryLimiter.acquireInterruptibly(e);
        super.put(e);
    }

    @Override
    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
        return memoryLimiter.acquire(e, timeout, unit) && super.offer(e, timeout, unit);
    }

    @Override
    public boolean offer(final E e) {
        return memoryLimiter.acquire(e) && super.offer(e);
    }

    @Override
    public E take() throws InterruptedException {
        final E e = super.take();
        memoryLimiter.releaseInterruptibly(e);
        return e;
    }

    @Override
    public E poll(final long timeout, final TimeUnit unit) throws InterruptedException {
        final E e = super.poll(timeout, unit);
        memoryLimiter.releaseInterruptibly(e, timeout, unit);
        return e;
    }

    @Override
    public E poll() {
        final E e = super.poll();
        memoryLimiter.release(e);
        return e;
    }

    @Override
    public boolean remove(final Object o) {
        final boolean success = super.remove(o);
        if (success) {
            memoryLimiter.release(o);
        }
        return success;
    }

    @Override
    public void clear() {
        super.clear();
        memoryLimiter.reset();
    }
}           

MemoryLimitedLinkedBlockingQueue繼承自 LinkedBlockingQueue,然後重寫了它的幾個核心方法。内部定義了一個MemoryLimiter對象,在每一次對隊列的操作都會使用到這個對象。

MemoryLimiter源碼截圖:

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

裡面有兩個成員變量類型比較陌生

LongAdder:作用和AtomicLong是一樣,都是一個實作了原子操作的累加器。在阿裡巴巴的java開發手冊也推薦用LongAdder來替代AtomicLong,可以減少樂觀鎖的重試次數

Instrumentation: 查詢百度介紹是這樣的,

開發者可以建構一個獨立于應用程式的代理程式(Agent),用來監測和協助運作在 JVM 上的程式,甚至能夠替換和修改某些類的定義。有了這樣的功能,開發者就可以實作更為靈活的運作時虛拟機監控和 Java 類操作了,這樣的特性實際上提供了一種虛拟機級别支援的 AOP 實作方式,使得開發者無需對 JDK 做任何更新和改動,就可以實作某些 AOP 的功能了。

在 Java SE 6 裡面,instrumentation 包被賦予了更強大的功能:啟動後的 instrument、本地代碼(native code)instrument,以及動态改變 classpath 等等。這些改變,意味着 Java 具有了更強的動态控制、解釋能力,它使得 Java 語言變得更加靈活多變。

看起來感覺很牛逼也比較難懂,我們接着看在put操作隊列時都具體做了些什麼

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

getObjectSize看起來是擷取對象e的大小,有個這個神器,再結合這個隊列的描述,就比較容易了解作者的意圖了。

往隊列裡put時,先檢查目前隊列已使用記憶體和要put的對象記憶體大小,如果兩者的和超過記憶體限制大小,就一直阻塞等待,否則就放行并更新隊列已使用記憶體大小

從隊列中take時,從已使用記憶體大小中減去對象e的記憶體大小值

需要注意的是,上述代碼其實是有一個bug的,此處我們略過不談

MemoryLimitedLinkedBlockingQueue整個思路就是這麼簡單粗暴,接下來我們看下MemorySafeLinkedBlockingQueue是什麼鬼

MemorySafeLinkedBlockingQueue

從作者的送出說明來看,它也能解決OOM的問題,并且不依賴Instrumentation,還比MemoryLimitedLinkedBlockingQueue要更簡單好用。

以下是完整類代碼

/**
 * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
 * does not depend on {@link java.lang.instrument.Instrumentation} and is easier to use than
 * {@link org.apache.shenyu.common.concurrent.MemoryLimitedLinkedBlockingQueue}.
 */
public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 8032578371749960142L;

    private int maxFreeMemory;

    private Rejector<E> rejector;

    public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
        super(Integer.MAX_VALUE);
        this.maxFreeMemory = maxFreeMemory;
        //default as DiscardPolicy to ensure compatibility with the old version
        this.rejector = new DiscardPolicy<>();
    }

    public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
                                         final int maxFreeMemory) {
        super(c);
        this.maxFreeMemory = maxFreeMemory;
        //default as DiscardPolicy to ensure compatibility with the old version
        this.rejector = new DiscardPolicy<>();
    }

    /**
     * set the max free memory.
     *
     * @param maxFreeMemory the max free memory
     */
    public void setMaxFreeMemory(final int maxFreeMemory) {
        this.maxFreeMemory = maxFreeMemory;
    }

    /**
     * get the max free memory.
     *
     * @return the max free memory limit
     */
    public int getMaxFreeMemory() {
        return maxFreeMemory;
    }

    /**
     * set the rejector.
     *
     * @param rejector the rejector
     */
    public void setRejector(final Rejector<E> rejector) {
        this.rejector = rejector;
    }

    /**
     * determine if there is any remaining free memory.
     *
     * @return true if has free memory
     */
    public boolean hasRemainedMemory() {
        return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
    }

    @Override
    public void put(final E e) throws InterruptedException {
        if (hasRemainedMemory()) {
            super.put(e);
        }
        rejector.reject(e, this);
    }

    @Override
    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
        if (!hasRemainedMemory()) {
            rejector.reject(e, this);
            return false;
        }
        return super.offer(e, timeout, unit);
    }

    @Override
    public boolean offer(final E e) {
        if (!hasRemainedMemory()) {
            rejector.reject(e, this);
            return false;
        }
        return super.offer(e);
    }
}           

MemorySafeLinkedBlockingQueue也是繼承自 LinkedBlockingQueue,有一成員變量maxFreeMemory,預設256M。

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

琢磨一下,莫非它會擷取整個JVM 裡面的剩餘空間,當put操作計算剩餘空間不足maxFreeMemory就會阻塞?

繼續看,計算記憶體使用大小時,依賴了一個MemoryMXBean 對象。好像這個類也比較陌生,繼續百度:提供管理接口,用于監視和管理 Java 虛拟機以及 Java 虛拟機在其上運作的作業系統。

MemorySafeLinkedBlockingQueue的核心方法是:hasRemainedMemory() ,即時判斷JVM可用空間是否超過maxFreeMemory的限制。

繼續看MemoryLimitCalculator源碼:

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

核心方法是refresh,更新剩餘可用空間;

另有一定時器,以50ms每次的頻率調用refresh()方法更新剩餘可用空間值。

再來梳理一下作者的思路:

put和offer時,檢查是否有剩餘足夠的可用空間,是則允許添加

從作者的提供的示例來看,還支援動态設定maxFreeMemory的值

這樣一比較,好家夥,看起來确實比MemoryLimitedLinkedBlockingQueue從設計上來說要好那麼一些

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

總結

上面的這兩個實作類都不依賴第三方架構,其代碼直接拿來就用,而且代碼也沒幾行。其實不管是用 Instrumentation 還是 ManagementFactory,本質上都是要限制記憶體

MemoryLimitedLinkedBlockingQueue算是正向思維,我需要使用隊列時不報OOM,就解決隊列自身的使用記憶體問題

MemorySafeLinkedBlockingQueue算是逆向思維,通過感覺外界的記憶體使用情況,始終JVM預留一定大小的可用記憶體空間,來避免OOM問題。

其實從這兩個BlockingQueue的實作類的設計思路,也可以用在我們的項目中呢,比方說使用的各種容器,也可能存在OOM 的風險,我們也可以借鑒上面的思路來解決呀。這次的寫文章主要目的不是為了推廣這兩個類的使用,更重要的是學會拓寬自己的思維,有時換個方向也能取得更好的結果

最後,關于這兩個類,其實在實作也有不少的缺陷。比如有大佬提出了自己的看法,比如 Instrumetation.getObjectSize()可能并不能準确的擷取對象的實際記憶體大小,MemoryMXBean 擷取的可用記憶體沒有考慮到GC的情況

這兩個BlockingQueue,一下子就打開了我防範OOM異常的思路

繼續閱讀