天天看點

Apache commons-pool對象池原理分析

 Apache commons-pool本質上是"對象池",即通過一定的規則來維護對象集合的容器;commos-pool在很多場景中,用來實作"連接配接池"/"任務worker池"等,大家常用的dbcp資料庫連接配接池,也是基于commons-pool實作.

    commons-pool實作思想非常簡單,它主要的作用就是将"對象集合"池化,任何通過pool進行對象存取的操作,都會嚴格按照"pool配置"(比如池的大小)實時的建立對象/阻塞控制/銷毀對象等.它在一定程度上,實作了對象集合的管理以及對象的分發.

    1) 将建立對象的方式,使用工廠模式;

    2) 通過"pool配置"來限制對象存取的時機

    3) 将對象清單儲存在隊列中(LinkedList)

    首選需要聲明,不同的"對象池"(或者連接配接池)在設計上可能存在很大的差別,但是在思想上大同小異,本文主要講解commons-pool,它和其他"連接配接池"的差別在此不多讨論.

一.對象生命周期:

Apache commons-pool對象池原理分析

二.Config詳解:

  1. maxActive: 連結池中最大連接配接數,預設為8.
  2. maxIdle: 連結池中最大空閑的連接配接數,預設為8.
  3. minIdle: 連接配接池中最少空閑的連接配接數,預設為0.
  4. maxWait: 當連接配接池資源耗盡時,調用者最大阻塞的時間,逾時将跑出異常。機關,毫秒數;預設為-1.表示永不逾時.
  5. minEvictableIdleTimeMillis: 連接配接空閑的最小時間,達到此值後空閑連接配接将可能會被移除。負值(-1)表示不移除。
  6. softMinEvictableIdleTimeMillis: 連接配接空閑的最小時間,達到此值後空閑連結将會被移除,且保留“minIdle”個空閑連接配接數。預設為-1.
  7. numTestsPerEvictionRun: 對于“空閑連結”檢測線程而言,每次檢測的連結資源的個數。預設為3.
  8. testOnBorrow: 向調用者輸出“連結”資源時,是否檢測是有有效,如果無效則從連接配接池中移除,并嘗試擷取繼續擷取。預設為false。建議保持預設值.
  9. testOnReturn:  向連接配接池“歸還”連結時,是否檢測“連結”對象的有效性。預設為false。建議保持預設值.
  10. testWhileIdle:  向調用者輸出“連結”對象時,是否檢測它的空閑逾時;預設為false。如果“連結”空閑逾時,将會被移除。建議保持預設值.
  11. timeBetweenEvictionRunsMillis:  “空閑連結”檢測線程,檢測的周期,毫秒數。如果為負值,表示不運作“檢測線程”。預設為-1.
  12. whenExhaustedAction: 當“連接配接池”中active數量達到閥值時,即“連結”資源耗盡時,連接配接池需要采取的手段, 預設為1:

    -> 0 : 抛出異常,

    -> 1 : 阻塞,直到有可用連結資源

    -> 2 : 強制建立新的連結資源

    這些屬性均可以在org.apache.commons.pool.impl.GenericObjectPool.Config中進行設定。

三.原了解析

    1) 對象池建立(參考GenericObjectPool):

  • public GenericObjectPool(PoolableObjectFactory factory, GenericObjectPool.Config config):此方法建立一個GenericObjectPool執行個體,GenericObjectPool類已經實作了和對象池有關的所有核心操作,開發者可以通過繼承或者封裝的方式來使用它.通過此構造函數,我們能夠清晰的看到,一個Pool中需要指定PoolableObjectFactory 執行個體,以及此對象池的Config資訊.PoolableObjectFactory主要用來"建立新對象",比如當對象池中的對象不足時,可以使用PoolableObjectFactory.makeObject()方法來建立對象,并傳遞給Pool管理.

    此構造函數執行個體化了一個LinkedList作為"對象池"容器,用來存取"對象".此外還會根據timeBetweenEvictionRunsMillis的值來決定是否啟動一個背景線程,此線程用來周期性掃描pool中的對象清單,已檢測"對象池中的對象"空閑(idle)的時間是否達到了閥值,如果是,則移除此對象.

if ((getMinEvictableIdleTimeMillis() > 0) &&  
        (idleTimeMilis > getMinEvictableIdleTimeMillis())) {  
    removeObject = true;  
}  
...  
if (removeObject) {  
    try {  
        _factory.destroyObject(pair.value);  
    } catch(Exception e) {  
        // ignored  
    }  
}      

 2) 對象工廠PoolableObjectFactory接口:

    commons-pool通過使用ObjectFactory(工廠模式)的方式将"對象池中的對象"的建立/檢測/銷毀等特性解耦出來,這是一個非常良好的設計思想.此接口有一個抽象類BasePoolableObjectFactory,可供開發者繼承和實作.

  • Object makeObject(): 建立一個新對象;當對象池中的對象個數不足時,将會使用此方法來"輸出"一個新的"對象",并傳遞給對象池管理.
  • void destroyObject(Object obj): 銷毀對象,如果對象池中檢測到某個"對象"idle的時間逾時,或者操作者向對象池"歸還對象"時檢測到"對象"已經無效,那麼此時将會導緻"對象銷毀";"銷毀對象"的操作設計相差甚遠,但是必須明确:當調用此方法時,"對象"的生命周期必須結束.如果object是線程,那麼此時線程必須退出;如果object是socket操作,那麼此時socket必須關閉;如果object是檔案流操作,那麼此時"資料flush"且正常關閉.
  • boolean validateObject(Object obj): 檢測對象是否"有效";Pool中不能儲存無效的"對象",是以"背景檢測線程"會周期性的檢測Pool中"對象"的有效性,如果對象無效則會導緻此對象從Pool中移除,并destroy;此外在調用者從Pool擷取一個"對象"時,也會檢測"對象"的有效性,確定不能講"無效"的對象輸出給調用者;當調用者使用完畢将"對象歸還"到Pool時,仍然會檢測對象的有效性.所謂有效性,就是此"對象"的狀态是否符合預期,是否可以對調用者直接使用;如果對象是Socket,那麼它的有效性就是socket的通道是否暢通/阻塞是否逾時等.
  • void activateObject(Object obj): "激活"對象,當Pool中決定移除一個對象傳遞給調用者時額外的"激活"操作,比如可以在activateObject方法中"重置"參數清單讓調用者使用時感覺像一個"新建立"的對象一樣;如果object是一個線程,可以在"激活"操作中重置"線程中斷标記",或者讓線程從阻塞中喚醒等;如果object是一個socket,那麼可以在"激活操作"中重新整理通道,或者對socket進行連結重建(假如socket意外關閉)等.
  • void void passivateObject(Object obj): "鈍化"對象,當調用者"歸還對象"時,Pool将會"鈍化對象";鈍化的言外之意,就是此"對象"暫且需要"休息"一下.如果object是一個socket,那麼可以passivateObject中清除buffer,将socket阻塞;如果object是一個線程,可以在"鈍化"操作中将線程sleep或者将線程中的某個對象wait.需要注意的時,activateObject和passivateObject兩個方法需要對應,避免死鎖或者"對象"狀态的混亂.

    3) ObjectPool接口與實作:

    對象池是commons-pool的核心接口,用來維護"對象清單"的存取;其中GenericObjectPool是其實作類,它已經實作了相關的功能.

  • Object borrowObject(): 從Pool擷取一個對象,此操作将導緻一個"對象"從Pool移除(脫離Pool管理),調用者可以在獲得"對象"引用後即可使用,且需要在使用結束後"歸還".如下為僞代碼:
public Object borrowObject() throws Exception {  
    Object value = null;  
    synchronized (this) {  
        if(!_pool.isEmpty()){  
            value = _pool.remove();  
        }  
          
    }         
    for(;;) {       
        //如果Pool中沒有"對象",則根據相應的"耗盡"政策  
        if(value == null) {  
            switch(whenExhaustedAction) {  
                //如果耗盡,仍繼續建立新"對象"  
                case WHEN_EXHAUSTED_GROW:  
                    value = _factory.makeObject();  
                    break;  
                //如果耗盡,則終止,此時以異常的方式退出.  
                case WHEN_EXHAUSTED_FAIL:  
                    throw new NoSuchElementException("Pool exhausted");  
                //如果耗盡,則阻塞,直到有"對象"歸還  
                case WHEN_EXHAUSTED_BLOCK:  
                    try {  
                        synchronized (value) {  
                            if (value == null) {  
                                //maxWait為Config中指定的"最大等待時間"  
                                if(maxWait <= 0) {  
                                    latch.wait();  
                                } else {  
                                        latch.wait(waitTime);  
                                }  
                            } else {  
                                break;  
                            }  
                        }  
                    } catch(InterruptedException e) {         
                        //  
                        break;  
                    }  
                      
                default://  
            }  
        }  
  
        try {  
            _factory.activateObject(latch.getPair().value);  
            if(_testOnBorrow &&  
                    !_factory.validateObject(latch.getPair().value)) {  
                throw new Exception("ValidateObject failed");  
            }  
           return value;  
        }  
        catch (Throwable e) {  
            try {  
                _factory.destroyObject(latch.getPair().value);  
            } catch (Throwable e2) {  
                //  
            }  
        }  
    }  
}      
  • void returnObject(Object obj) : "歸還"對象,當"對象"使用結束後,需要歸還到Pool中,才能維持Pool中對象的數量可控,如果不歸還到Pool,那麼将意味着在Pool之外,将有大量的"對象"存在,那麼就使用了"對象池"的意義.如下為僞代碼:
public void returnObject(Object obj) throws Exception {  
    try {  
        boolean success = true;//  
        if(_testOnReturn && !(_factory.validateObject(obj))) {  
            success = false;  
        } else {  
            _factory.passivateObject(obj);  
        }  
  
        synchronized (this) {  
            //檢測pool中已經空閑的對象個數是否達到閥值.  
            if((_maxIdle >= 0) && (_pool.size() >= _maxIdle)) {  
                success = false;  
            } else if(success) {  
                _pool.addFirst(new ObjectTimestampPair(obj));  
            }  
        }  
  
        // Destroy the instance if necessary  
        if(!success) {  
            try {  
                _factory.destroyObject(obj);  
            } catch(Exception e) {  
                // ignored  
            }  
        }  
    } catch (Exception e) {  
        //  
    }  
}      
  • void invalidateObject(Object obj): 銷毀對象,直接調用ObjectFactory.destroyObject(obj);.
  • void addObject(): 開發者可以直接調用addObject方法用于直接建立一個"對象"并添加到Pool中.

四.代碼執行個體.

    本執行個體主要用來示範一個"TCP連接配接池".

    1) ConnectionPoolFactory.java:

import org.apache.commons.pool.BasePoolableObjectFactory;  
import org.apache.commons.pool.impl.GenericObjectPool;  
import org.apache.commons.pool.impl.GenericObjectPool.Config;  
  
public class ConnectionPoolFactory {  
  
    private GenericObjectPool pool;  
  
    public ConnectionPoolFactory(Config config,String ip,int port){  
        ConnectionFactory factory = new ConnectionFactory(ip, port);  
        pool = new GenericObjectPool(factory, config);  
    }  
      
    public Socket getConnection() throws Exception{  
        return (Socket)pool.borrowObject();  
    }  
      
    public void releaseConnection(Socket socket){  
        try{  
            pool.returnObject(socket);  
        }catch(Exception e){  
            if(socket != null){  
                try{  
                    socket.close();  
                }catch(Exception ex){  
                    //  
                }  
            }  
        }  
    }  
      
    /** 
     * inner  
     * @author qing 
     * 
     */  
    class ConnectionFactory extends BasePoolableObjectFactory {  
  
        private InetSocketAddress address;  
          
        public ConnectionFactory(String ip,int port){  
            address = new InetSocketAddress(ip, port);  
        }  
          
        @Override  
        public Object makeObject() throws Exception {  
            Socket socket = new Socket();  
            socket.connect(address);  
            return socket;  
        }  
          
        public void destroyObject(Object obj) throws Exception  {  
            if(obj instanceof Socket){  
                ((Socket)obj).close();  
            }  
        }  
  
        public boolean validateObject(Object obj) {  
            if(obj instanceof Socket){  
                Socket socket = ((Socket)obj);  
                if(!socket.isConnected()){  
                    return false;  
                }  
                if(socket.isClosed()){  
                    return false;  
                }  
                return true;  
            }  
            return false;  
        }  
  
  
    }  
      
}      

2) TestMain.java(測試類):

public class TestMain {  
  
    /** 
     * @param args 
     */  
    public static void main(String[] args) {  
        Config config = new Config();  
        config.maxActive = 16;  
        config.maxWait = 30000;  
        ConnectionPoolFactory poolFactory = new ConnectionPoolFactory(config, "127.0.0.1", 8011);  
        Socket socket = null ;  
        try{  
            socket = poolFactory.getConnection();  
              
        }catch(Exception e){  
            e.printStackTrace();  
        }finally{  
            if(socket != null){  
                poolFactory.releaseConnection(socket);  
            }  
        }  
  
    }  
  
}