天天看點

#Java筆記#利用JedisPool實作對Redis的多線程調用

由于公司業務上的需要,前段時間做了一陣子資料提取與轉運的工作,主要運用了python和java。在開發的過程中,接觸到了一些新的技術,産生了一些新的思路,在此記錄一下。今天,就先來結合一次實際的資料提取經曆,總結一下java如何利用JedisPool實作對Redis的多線程調用。

先大緻說一下這次任務的總體思路:

由于資料量較大,是以在這裡使用了多線程。整個流程主要分為兩塊,生産者從源資料中循環讀取任務,并将任務放到redis緩沖池中;消費者從redis中擷取任務,并執行,最終将資料插入到資料庫中。

#Java筆記#利用JedisPool實作對Redis的多線程調用

Java調用redis,需要在工程中導入驅動包:jedis,點我下載下傳

在實際測試中,生産者的執行速率很快,是以這裡生産者隻開了一個線程去執行。

1、生産者循環讀取任務并放入redis緩沖池中:

public class RedisQueueProducer{
    private int pageNum = 0;
    private Jedis jedis;

    public void startWrite(Jedis jedis){
        this.jedis = jedis;
        MyExecutor executor = new MyExecutor();
        executor.start();
    }


    private class MyExecutor  extends Thread{
        @Override
        public void run() {
            super.run();
            List<String> readData;
            while (true) {
                //分頁從資料庫中讀取資料,直到資料全部取完,跳出循環
                readData = JdbcUtils.queryData(pageNum);
                if(readData.size()>0) {
                    String[] readDataArrays = new String[readData.size()];
                    jedis.lpush("sourceIds",readData.toArray(readDataArrays));
                    pageNum++;
                }else{
                    break;
                }
            }
        }
    }
}
           

這裡存入redis使用的lpush的方式,選擇這種方式的好處是讀取redis資料時,調用lpop可以直接彈出list頂部的第一條資料,該條資料也會從redis中移除,不會占用空間,即取即用。

2、消費者從redis中擷取任務并執行,存入資料庫中:

public class RedisQueueConsumer{
    private Jedis jedis;
    private HashDocumentIdZHO hashDocumentIdZHO;

    public void startRead(Jedis jedis){
        //連接配接本地的 Redis 服務
        this.jedis = jedis;
        hashDocumentIdZHO =  new HashDocumentIdZHO();

        MyExecutor executor = new MyExecutor();
        executor.start();

    }

    private class MyExecutor  extends Thread{
        @Override
        public void run() {
            super.run();
            while (true){
                String sourceId = jedis.lpop("sourceIds");
                if(sourceId!=null&&!sourceId.equals("")) {
                    if(JdbcUtils.getConn()==null){
                        JdbcUtils.connect();
                    }
                    ConvertIdResult result = hashDocumentIdZHO.getHashDocumentIdZHOSoap().convertClassicID(sourceId,1,2,true);
                    HashDoc doc = new HashDoc();
                    doc.setResult(result);
                    doc.setSourceDocId(sourceId);

                    JdbcUtils.insert(doc);
                    System.out.println("線程名稱:" + Thread.currentThread().getName() + ",hash值:" + result.getResult());

                }else{
                    try {
                        //如果沒有資料,等待30s,再執行;後來檢視jedis的api得知,如果清單沒有元素會阻塞清單直到等待逾時或發現可彈出元素為止,是以理論上這裡可以不做設定。不過并沒有實際試驗過。
                        sleep(30000);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
           

3、利用jedisPool實作多線程調用:

可能有小夥伴看到這裡會問,為什麼要用redis作為緩沖池呢?(如果沒有人問,請容我自問自答一下...)因為這裡用到了多線程,而redis本身是線程安全的,是以自然這裡就選擇了redis作為緩沖池。不過,redis本身是單線程的,如果我們需要通過java多線程從redis取資料,則需要借助jedisPool,生成一個redis連接配接池,讓一個線程對應一個redis連接配接。

public class RedisQueuePool {

    public static JedisPool jedisPool = null;

    public static void main(String[] args) {
        if(jedisPool == null){
            JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
            jedisPoolConfig.setMaxTotal(10000);
            //控制一個pool最多有多少個狀态為idle(空閑)的jedis執行個體
            jedisPoolConfig.setMaxIdle(8);
            //擷取連接配接時的最大等待毫秒數(如果設定為阻塞時BlockWhenExhausted),如果逾時就抛異常, 小于零:阻塞不确定的時間,  預設-1
            jedisPoolConfig.setMaxWaitMillis(1);
            //獲得一個jedis執行個體的時候是否檢查連接配接可用性(ping());如果為true,則得到的jedis執行個體均是可用的;
            jedisPoolConfig.setTestOnBorrow(true);
            //return 一個jedis執行個體給pool時,是否檢查連接配接可用性(ping())
            jedisPoolConfig.setTestOnReturn(true);

            jedisPool = new JedisPool(jedisPoolConfig,"localhost");
        }
        //建立20個線程,執行操作
        for (int i = 0; i < 20; i++) {
            RedisQueueConsumer consumer = new RedisQueueConsumer();
            consumer.startRead(jedisPool.getResource());
        }
    }

}
           

jedisPoolConfig是jedisPool的一些屬性配置,詳細的屬性配置檢視可以移步這裡:屬性配置清單。

如果想要使用jedisPool,還需要引入commons-pool2-2.4.2.jar。

以上就是JAVA利用JedisPool實作對Redis的多線程調用的思路與解決方案了,希望能對讀到這裡的你能有所幫助~