天天看點

通用池化架構實踐之GenericKeyedObjectPool

前兩天寫了一篇文章介紹

commons-pool2

這個通用池化架構通用池化架構commons-pool2實踐,其中提到了可以池化一個對象和一組對象,一個對象用到了

GenericObjectPool

這個類,一組對象用到了

GenericKeyedObjectPool

這個類。

一開始我以為後者比較複雜,是以放棄了嘗試,今天在寫gRPC接口測試Demo,根據分片不同進行負載均衡連接配接不同節點的過程中,遇到了一個障礙。就是在服務調用gRPC的時候已經完成了自動負載均衡,我調用的SDK就需要自己實作根據不同分片連接配接不同的節點,這就用到了

GenericKeyedObjectPool

顧名思義,鍵值對象池。就是通過一個key對應一個對象類型來組合對象池,其本質上就是一個

Map

key

是自定義,

value

就是

org.apache.commons.pool2.ObjectPool

,而但對象池化類

GenericObjectPool

也是實作了這個接口。

經過查詢源碼注釋有兩點需要注意:

  1. Map

    的用的

    ConcurrentHashMap

    ,是線程安全的。
  2. 擷取和回收太頻繁,會遇到性能問題。

關于第二點,我有機會在做一期兩者的性能測試。我現在用的是gRPC的連接配接對象

io.grpc.ManagedChannel

,而且每個類對象綁定的對象是

io.grpc.stub.AbstractBlockingStub

并不會場景去連接配接池中擷取新連接配接,一個gRPC連接配接可以支撐N(資料稱該值100左右,後續我計劃50個線程公用一個連接配接)個線程的并發,是以暫時不用擔心這個性能問題。

根據上次文章的記錄的順序分成了三部分。

可池化類

首先我們需要一個可以被池化的對象,代碼同上期文章。

池化工廠類

然後就是池化工廠類,這個類需要定義

key

value

的類型,然後就是照葫蘆畫瓢,跟上期文章一樣。

package com.funtester.funpool

import com.funtester.base.interfaces.IPooled
import org.apache.commons.pool2.BaseKeyedPooledObjectFactory
import org.apache.commons.pool2.PooledObject
/**
 * 可池化工廠類
 */
abstract class KeyPoolFactory<F> extends BaseKeyedPooledObjectFactory<F, IPooled> {

    abstract IPooled init()

    @Override
    IPooled create(F k) throws Exception {
        return init()
    }

    @Override
    PooledObject<IPooled> wrap(IPooled obj) {
        return obj.reInit()
    }

    @Override
    void destroyObject(F key, PooledObject<IPooled> p) throws Exception {
        p.getObject().destory()
        super.destroyObject(key, p)
    }
}

           

這裡提一嘴,

com.funtester.funpool.KeyPoolFactory#destroyObject

方法并不是必需的,如果池化的對象除了記憶體以外不需要額外的資源釋放,就不用重寫這個方法了。還有一種情況就是對象資訊需要清除,比如

org.apache.http.client.methods.HttpGet

,需要把請求位址和請求頭等資訊清除,這個需要跟業務需求保持一緻。不一定是全都清除。

對象池

package com.funtester.funpool

import com.funtester.base.interfaces.IPooled
import org.apache.commons.pool2.impl.GenericKeyedObjectPool
import org.apache.commons.pool2.impl.GenericObjectPoolConfig

class KeyPool {

    KeyPool(KeyPoolFactory factory) {
        this.factory = factory
        this.pool = init()
    }

    private GenericKeyedObjectPool<String, IPooled> pool = init();

    private KeyPoolFactory<String> factory

    private GenericKeyedObjectPool<String, IPooled> init() {
        // 連接配接池的配置
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        // 池中的最大連接配接數
        poolConfig.setMaxTotal(8);
        // 最少的空閑連接配接數
        poolConfig.setMinIdle(0);
        // 最多的空閑連接配接數
        poolConfig.setMaxIdle(8);
        // 當連接配接池資源耗盡時,調用者最大阻塞的時間,逾時時抛出異常 機關:毫秒數
        poolConfig.setMaxWaitMillis(-1);
        // 連接配接池存放池化對象方式,true放在空閑隊列最前面,false放在空閑隊列最後
        poolConfig.setLifo(true);
        // 連接配接空閑的最小時間,達到此值後空閑連接配接可能會被移除,預設即為30分鐘
        poolConfig.setMinEvictableIdleTimeMillis(1000L * 60L * 30L);
        // 連接配接耗盡時是否阻塞,預設為true
        poolConfig.setBlockWhenExhausted(true);
        // 連接配接池建立
        return new GenericKeyedObjectPool<String, IPooled>(factory, poolConfig);
    }

}


           
/**
     * 擷取對象
     */
    IPooled get(String key) {
        try {
            return pool.borrowObject("FunTester");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return factory.create("FunTester");
    }

    /**
     * 歸還對象
     * @param iPooled
     */
    void back(String key, IPooled iPooled) {
        pool.returnObject("FunTester", iPooled)
    }

    /**
     * 執行器
     */
    def execute(String key, Closure closure) {
        IPooled client = get(key);
        try {
            closure(client);
        } finally {
            back(key, client);
        }
    }


           

Have Fun ~ Tester !