天天看點

造輪子之MemorySafeLinkedBlockingQueue-LinkBlockingQueue改進

作者:權重不平均

LinkBlockingQueue改進

問題背景

https://github.com/apache/dubbo/pull/9722/files

使用線程池的同學對于标題中的隊列想必都有過使用,但上述隊列使用不當時則會造成程式OOM,那怎麼來控制呢?

使用ArrayBlockingQueue?如何來評估長度?

是否有一個完美的解決方案呢,MemorySafeLinkedBlockingQueue則通過對記憶體的限制判斷盡面控制隊列的容量,完成解決了可能存在的OOM問題。

擷取記憶體大小(注:機關大B;支援準實時更新):

Runtime.getRuntime().freeMemory()//JVM中已經申請到的堆記憶體中還未使用的大小
Runtime.getRuntime().maxMemory()// JVM可從作業系統申請到的最大記憶體值 -Xxm
Runtime.getRuntime().totalMemory()// JVM已從作業系統申請到的記憶體大小 —Xxs可設定該值大小-初始堆的大小
           
線程池在excute任務時,放隊列,放不進去,使用新線程運作任務。這個放不進行,是使用的offer??非阻塞方法嗎?

參考:https://blog.csdn.net/weixin_43108539/article/details/125190023

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     	//拿到32位的int
        int c = ctl.get();
     	//工作線程數<核心線程數
        if (workerCountOf(c) < corePoolSize) {
            //進入if,代表可以建立 核心 線程數
            if (addWorker(command, true))
                return;
            //如果沒進入if,代表建立核心線程數失敗,重新擷取 ctl
            c = ctl.get();
        }
        //判斷線程池為Running狀态,将任務添加入阻塞隊列,使用offer
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次判斷是否為Running狀态,若不是Running狀态,remove任務
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果線程池在Running狀态,線程池數量為0
            else if (workerCountOf(recheck) == 0)
                //阻塞隊列有任務,但是沒有工作線程,添加一個任務為空的工作線程處理阻塞隊列中的任務
                addWorker(null, false);
        }
        //阻塞隊列已滿,建立非核心線程,拒絕政策-addWorker中有判斷核心線程數是否超過最大線程數
        else if (!addWorker(command, false))
            reject(command);
    }
           

空閑記憶體計算

package com.zte.sdn.oscp.queue;

import cn.hutool.core.thread.NamedThreadFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class MemoryLimitCalculator {

    private static volatile long maxAvailable;

    private static final AtomicBoolean refreshStarted = new AtomicBoolean(false);

    private static void refresh() {
        maxAvailable = Runtime.getRuntime().freeMemory();
    }

    private static void checkAndScheduleRefresh() {
        if (!refreshStarted.get()) {
            // immediately refresh when first call to prevent maxAvailable from being 0
            // to ensure that being refreshed before refreshStarted being set as true
            // notice: refresh may be called for more than once because there is no lock
            refresh();
            if (refreshStarted.compareAndSet(false, true)) {
                ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-Memory-Calculator"));
                // check every 50 ms to improve performance
                scheduledExecutorService.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);
                Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                    refreshStarted.set(false);
                    scheduledExecutorService.shutdown();
                }));
            }
        }
    }

    /**
     * Get the maximum available memory of the current JVM.
     *
     * @return maximum available memory
     */
    public static long maxAvailable() {
        checkAndScheduleRefresh();
        return maxAvailable;
    }

    /**
     * Take the current JVM's maximum available memory
     * as a percentage of the result as the limit.
     *
     * @param percentage percentage
     * @return available memory
     */
    public static long calculate(final float percentage) {
        if (percentage <= 0 || percentage > 1) {
            throw new IllegalArgumentException();
        }
        checkAndScheduleRefresh();
        return (long) (maxAvailable() * percentage);
    }

    /**
     * By default, it takes 80% of the maximum available memory of the current JVM.
     *
     * @return available memory
     */
    public static long defaultLimit() {
        checkAndScheduleRefresh();
        return (long) (maxAvailable() * 0.8);
    }
}
           

記憶體安全隊列

package com.zte.sdn.oscp.queue;

import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MemorySafeLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {

    private static final long serialVersionUID = 8032578371739960142L;

    public static int THE_256_MB = 256 * 1024 * 1024;

    private int maxFreeMemory;

    private Rejector<E> rejector;

    public MemorySafeLinkedBlockingQueue() {
        this(THE_256_MB);
    }

    public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
        super(Integer.MAX_VALUE);
        this.maxFreeMemory = maxFreeMemory;
        //default as DiscardPolicy to ensure compatibility with the old version
        this.rejector = new DiscardPolicy<>();
    }

    public MemorySafeLinkedBlockingQueue(final Collection<? extends E> c,
                                         final int maxFreeMemory) {
        super(c);
        this.maxFreeMemory = maxFreeMemory;
        //default as DiscardPolicy to ensure compatibility with the old version
        this.rejector = new DiscardPolicy<>();
    }

    /**
     * set the max free memory.
     *
     * @param maxFreeMemory the max free memory
     */
    public void setMaxFreeMemory(final int maxFreeMemory) {
        this.maxFreeMemory = maxFreeMemory;
    }

    /**
     * get the max free memory.
     *
     * @return the max free memory limit
     */
    public int getMaxFreeMemory() {
        return maxFreeMemory;
    }

    /**
     * set the rejector.
     *
     * @param rejector the rejector
     */
    public void setRejector(final Rejector<E> rejector) {
        this.rejector = rejector;
    }

    /**
     * determine if there is any remaining free memory.
     *
     * @return true if has free memory
     */
    public boolean hasRemainedMemory() {
        return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
    }

    @Override
    public void put(final E e) throws InterruptedException {
        if (hasRemainedMemory()) {
            super.put(e);
        } else {
            rejector.reject(e, this);
        }
    }

    @Override
    public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
        if (!hasRemainedMemory()) {
            rejector.reject(e, this);
            return false;
        }
        return super.offer(e, timeout, unit);
    }

    @Override
    public boolean offer(final E e) {
        if (!hasRemainedMemory()) {
            rejector.reject(e, this);
            return false;
        }
        return super.offer(e);
    }
}
           

拒絕政策

注意其中的rejector是拒絕政策,預設的DiscardPolicy什麼也不處理;

而DiscardOldPolicy的處理邏輯很簡單

public class DiscardOldestPolicy<E> implements Rejector<E> {

    @Override
    public void reject(final E e, final Queue<E> queue) {
        queue.poll();
        queue.offer(e);
    }
}
           

AbortPolicy則直接抛出異常

public class AbortPolicy<E> implements Rejector<E> {

    @Override
    public void reject(final E e, final Queue<E> queue) {
        throw new RejectException("no more memory can be used !");
    }
}
           

個人建議增加日志列印即可。

繼續閱讀