天天看點

令仔學多線程系列(二)----自定義Queue隊列

    之前做了一個新的需求,需要從每一次search請求傳回中擷取相關的資料,然後把這些擷取到的資料做異步處理,寫入緩存并同步到資料庫中。如何做異步在這就想到了用隊列的方式來實作,一開始是用的BlockingQueue,一遍從隊尾push,另一邊從隊首取資料。

    但是在這個地方用BlockingQueue的時候就會有點問題,首先是如果不給這個隊列設定大小的話,時間長了很可能會吧記憶體給搞癱了,但是如果給BlockingQueue設定了大小的話(ps:當時設定的是2000),我們的主流程是search,其他的業務功能的開發不能夠影響到search的運作(PS:每個平台對search傳回的時間都有限制的),當隊列滿了之後,再多來的任務就會被挂起,一直等到隊列中有空餘位置才會被執行。這樣的話我們整個的流程就會Down掉。

    是以就自己封裝了一個Queue,當隊列滿了之後,多餘的資料就會被扔掉,當然不是所有的業務場景都适合使用。僅供參考借鑒。

    下面是具體的代碼:

package com.flight.inter.otaadapter.manage;

import java.util.LinkedList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by ling.zhang on 16/4/26.
 */
public class LocalCacheQueue<T> {
    private Lock lock=new ReentrantLock();
    private int maxSize;
    private int currentSize;
    private int size;
    private LinkedList<T> requestQueue;

    public LocalCacheQueue(int size){
        this.size=size;
        this.maxSize=2*size;
        currentSize=0;
        requestQueue=new LinkedList<>();
    }

    public T pop(){
        try{
            lock.lock();
            T getOne=requestQueue.pop();
            if(getOne!=null){
                currentSize--;
            }
            return getOne;
        }catch (Exception e){
            return null;
        }finally {
            lock.unlock();
        }
    }

    public void push(T one){
        try{
            lock.lock();
            if(currentSize>size)
                ltrim();
            requestQueue.push(one);
            currentSize++;
        }catch (Exception e){
        }finally {
            lock.unlock();
        }
    }

    private void ltrim(){
        int needClearSize=currentSize-size;
        for(int i=0;i<needClearSize;i++){
            try {
                T getOne = requestQueue.removeLast();
                currentSize--;
            }catch (Exception e){

            }
        }
    }

}